mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
This change adds get_workload() to ImpalaTestSuite and removes it from all test suites that already returned 'functional-query'. get_workload() is also removed from CustomClusterTestSuite which used to return 'tpch'. All other changes besides impala_test_suite.py and custom_cluster_test_suite.py are just mass removals of get_workload() functions. The behavior is only changed in custom cluster tests that didn't override get_workload(). By returning 'functional-query' instead of 'tpch', exploration_strategy() will no longer return 'core' in 'exhaustive' test runs. See IMPALA-3947 on why workload affected exploration_strategy. An example for affected test is TestCatalogHMSFailures which was skipped both in core and exhaustive runs before this change. get_workload() functions that return a different workload than 'functional-query' are not changed - it is possible that some of these also don't handle exploration_strategy() as expected, but individually checking these tests is out of scope in this patch. Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115 Reviewed-on: http://gerrit.cloudera.org:8080/22726 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
194 lines
8.3 KiB
Python
194 lines
8.3 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Tests for column lineage.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import json
|
|
import logging
|
|
import os
|
|
import pytest
|
|
import re
|
|
import time
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.skip import SkipIfFS
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class TestLineage(CustomClusterTestSuite):
|
|
START_END_TIME = "start_end_time"
|
|
CREATE_TABLE_TIME = "create_table_time"
|
|
CREATE_TABLE_TIME_NO_HMS = "create_table_time_no_hms"
|
|
DDL_LINEAGE = "ddl_lineage"
|
|
LINEAGE = "lineage"
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestLineage, cls).setup_class()
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
super(TestLineage, cls).teardown_class()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--lineage_event_log_dir={" + START_END_TIME + "}",
|
|
tmp_dir_placeholders=[START_END_TIME])
|
|
def test_start_end_timestamp(self):
|
|
"""Test that 'timestamp' and 'endTime' in the lineage graph are populated with valid
|
|
UNIX times."""
|
|
LOG.info("lineage_event_log_dir is {0}".format(self.get_tmp_dir(self.START_END_TIME)))
|
|
before_time = int(time.time())
|
|
query = "select count(*) from functional.alltypes"
|
|
result = self.execute_query_expect_success(self.client, query)
|
|
profile_query_id = re.search(r"Query \(id=(.*)\):", result.runtime_profile).group(1)
|
|
after_time = int(time.time())
|
|
LOG.info("before_time " + str(before_time) + " after_time " + str(after_time))
|
|
|
|
# Stop the cluster in order to flush the lineage log files.
|
|
self._stop_impala_cluster()
|
|
|
|
for log_filename in os.listdir(self.get_tmp_dir(self.START_END_TIME)):
|
|
log_path = os.path.join(self.get_tmp_dir(self.START_END_TIME), log_filename)
|
|
# Only the coordinator's log file will be populated.
|
|
if os.path.getsize(log_path) > 0:
|
|
LOG.info("examining file: " + log_path)
|
|
with open(log_path) as log_file:
|
|
lineage_json = json.load(log_file)
|
|
assert lineage_json["queryId"] == profile_query_id
|
|
timestamp = int(lineage_json["timestamp"])
|
|
end_time = int(lineage_json["endTime"])
|
|
assert before_time <= timestamp
|
|
assert timestamp <= end_time
|
|
assert end_time <= after_time
|
|
else:
|
|
LOG.info("empty file: " + log_path)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--lineage_event_log_dir={" + CREATE_TABLE_TIME + "}",
|
|
tmp_dir_placeholders=[CREATE_TABLE_TIME])
|
|
def test_create_table_timestamp(self, unique_database):
|
|
for table_format in ['textfile', 'kudu', 'iceberg']:
|
|
self.run_test_create_table_timestamp(
|
|
unique_database, table_format, self.CREATE_TABLE_TIME)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--lineage_event_log_dir={" + CREATE_TABLE_TIME_NO_HMS + "}",
|
|
catalogd_args="--hms_event_polling_interval_s=0",
|
|
tmp_dir_placeholders=[CREATE_TABLE_TIME_NO_HMS])
|
|
def test_create_table_timestamp_without_hms_events(self, unique_database):
|
|
for table_format in ['textfile', 'kudu', 'iceberg']:
|
|
self.run_test_create_table_timestamp(
|
|
unique_database, table_format, self.CREATE_TABLE_TIME_NO_HMS)
|
|
|
|
def run_test_create_table_timestamp(self, unique_database, table_format, tmp_dir_id):
|
|
"""Test that 'createTableTime' in the lineage graph are populated with valid value
|
|
from HMS."""
|
|
not_enforced = ""
|
|
if table_format == "iceberg":
|
|
not_enforced = " NOT ENFORCED"
|
|
query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) {2} " \
|
|
"stored as {1} as select int_col, bigint_col from functional.alltypes".format(
|
|
unique_database, table_format, not_enforced)
|
|
result = self.execute_query_expect_success(self.client, query)
|
|
profile_query_id = re.search(r"Query \(id=(.*)\):", result.runtime_profile).group(1)
|
|
|
|
# Wait to flush the lineage log files.
|
|
time.sleep(3)
|
|
|
|
for log_filename in os.listdir(self.get_tmp_dir(tmp_dir_id)):
|
|
log_path = os.path.join(self.get_tmp_dir(tmp_dir_id), log_filename)
|
|
# Only the coordinator's log file will be populated.
|
|
if os.path.getsize(log_path) > 0:
|
|
with open(log_path) as log_file:
|
|
for line in log_file:
|
|
# Now that the test is executed multiple times we need to take a look at
|
|
# only the line that contains the expected table name.
|
|
expected_table_name =\
|
|
"{0}.lineage_test_tbl_{1}".format(unique_database, table_format)
|
|
if expected_table_name not in line: continue
|
|
|
|
lineage_json = json.loads(line)
|
|
assert lineage_json["queryId"] == profile_query_id
|
|
vertices = lineage_json["vertices"]
|
|
for vertex in vertices:
|
|
if vertex["vertexId"] == "int_col":
|
|
assert "metadata" in vertex
|
|
table_name = vertex["metadata"]["tableName"]
|
|
table_create_time = int(vertex["metadata"]["tableCreateTime"])
|
|
assert expected_table_name == table_name
|
|
assert table_create_time != -1
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--lineage_event_log_dir={" + DDL_LINEAGE + "}",
|
|
tmp_dir_placeholders=[DDL_LINEAGE])
|
|
def test_ddl_lineage(self, unique_database):
|
|
""" Test that DDLs like 'create table' have query text populated in the lineage
|
|
graph."""
|
|
query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
|
|
result = self.execute_query_expect_success(self.client, query)
|
|
profile_query_id = re.search(r"Query \(id=(.*)\):", result.runtime_profile).group(1)
|
|
|
|
# Wait to flush the lineage log files.
|
|
time.sleep(3)
|
|
|
|
for log_filename in os.listdir(self.get_tmp_dir(self.DDL_LINEAGE)):
|
|
log_path = os.path.join(self.get_tmp_dir(self.DDL_LINEAGE), log_filename)
|
|
# Only the coordinator's log file will be populated.
|
|
if os.path.getsize(log_path) > 0:
|
|
with open(log_path) as log_file:
|
|
lineage_json = json.load(log_file)
|
|
assert lineage_json["queryId"] == profile_query_id
|
|
assert lineage_json["queryText"] is not None
|
|
assert lineage_json["queryText"] == query
|
|
assert lineage_json["tableLocation"] is not None
|
|
|
|
# Test explain statements don't create lineages.
|
|
query = "explain create table {0}.lineage_test_tbl as select int_col, " \
|
|
"tinyint_col from functional.alltypes".format(unique_database)
|
|
result = self.execute_query_expect_success(self.client, query)
|
|
profile_query_id = re.search(r"Query \(id=(.*)\):", result.runtime_profile).group(1)
|
|
|
|
# Wait to flush the lineage log files.
|
|
time.sleep(3)
|
|
|
|
for log_filename in os.listdir(self.get_tmp_dir(self.DDL_LINEAGE)):
|
|
log_path = os.path.join(self.get_tmp_dir(self.DDL_LINEAGE), log_filename)
|
|
# Only the coordinator's log file will be populated.
|
|
if os.path.getsize(log_path) > 0:
|
|
with open(log_path) as log_file:
|
|
lineage_json = json.load(log_file)
|
|
assert lineage_json["queryId"] is not profile_query_id
|
|
|
|
@SkipIfFS.hbase
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--lineage_event_log_dir={" + LINEAGE + "}",
|
|
tmp_dir_placeholders=[LINEAGE])
|
|
def test_lineage_output(self, vector):
|
|
try:
|
|
self.run_test_case('QueryTest/lineage', vector)
|
|
finally:
|
|
# Clean up the test database
|
|
db_cleanup = "drop database if exists lineage_test_db cascade"
|
|
self.execute_query_expect_success(self.client, db_cleanup)
|