mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13051: Speed up, refactor query log tests
Sets faster default shutdown_grace_period_s and shutdown_deadline_s when impalad_graceful_shutdown=True in tests. Impala waits until grace period has passed and all queries are stopped (or deadline is exceeded) before flushing the query log, so grace period of 0 is sufficient. Adds them in setup_method to reduce duplication in test declarations. Re-uses TQueryTableColumn Thrift definitions for testing. Moves waiting for query log table to exist to setup_method rather than as a side-effect of get_client. Refactors workload management code to reduce if-clause nesting. Adds functional query workload tests for both the sys.impala_query_log and the sys.impala_query_live tables to assert the names and order of the individual columns within each table. Renames the python tests for the sys.impala_query_log table removing the unnecessary "_query_log_table_" string from the name of each test. Change-Id: I1127ef041a3e024bf2b262767d56ec5f29bf3855 Reviewed-on: http://gerrit.cloudera.org:8080/21358 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This commit is contained in:
committed by
Riza Suminto
parent
0d215da8d4
commit
3b35ddc8ca
@@ -34,8 +34,7 @@ from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
from tests.util.retry import retry
|
||||
from tests.util.workload_management import assert_query, COMPRESSED_BYTES_SPILLED, \
|
||||
BYTES_READ_CACHE_TOTAL
|
||||
from tests.util.workload_management import assert_query
|
||||
from time import sleep, time
|
||||
|
||||
|
||||
@@ -55,23 +54,19 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
|
||||
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
|
||||
cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
|
||||
|
||||
def get_client(self, protocol, query_table_name=QUERY_TBL):
|
||||
"""Retrieves the default Impala client for the specified protocol. This client is
|
||||
automatically closed after the test completes. Also ensures the completed queries
|
||||
table has been successfully created by checking the logs to verify the create
|
||||
table sql has finished."""
|
||||
|
||||
def setup_method(self, method):
|
||||
super(TestQueryLogTableBase, self).setup_method(method)
|
||||
# These tests run very quickly and can actually complete before Impala has finished
|
||||
# creating the completed queries table. Thus, to make these tests more robust, this
|
||||
# code checks to make sure the table create has finished before returning.
|
||||
create_re = r'\]\s+(\w+:\w+)\]\s+Analyzing query: CREATE TABLE IF NOT EXISTS {}' \
|
||||
.format(query_table_name)
|
||||
create_match = self.assert_impalad_log_contains("INFO", create_re)
|
||||
|
||||
finish_re = r'Query successfully unregistered: query_id={}' \
|
||||
.format(create_match.group(1))
|
||||
self.assert_impalad_log_contains("INFO", finish_re)
|
||||
create_match = self.assert_impalad_log_contains("INFO", r'\]\s+(\w+:\w+)\]\s+'
|
||||
r'Analyzing query: CREATE TABLE IF NOT EXISTS {}'.format(self.QUERY_TBL))
|
||||
self.assert_impalad_log_contains("INFO", r'Query successfully unregistered: '
|
||||
r'query_id={}'.format(create_match.group(1)))
|
||||
|
||||
def get_client(self, protocol):
|
||||
"""Retrieves the default Impala client for the specified protocol. This client is
|
||||
automatically closed after the test completes."""
|
||||
if protocol == self.PROTOCOL_BEESWAX:
|
||||
return self.client
|
||||
elif protocol == self.PROTOCOL_HS2:
|
||||
@@ -89,25 +84,38 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
v.get_value('protocol') == 'beeswax')
|
||||
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
return 'functional-query'
|
||||
|
||||
CACHE_DIR = tempfile.mkdtemp(prefix="cache_dir")
|
||||
MAX_SQL_PLAN_LEN = 2000
|
||||
LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
|
||||
FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + str(int(time()))
|
||||
FLUSH_MAX_RECORDS_QUERY_COUNT = 30
|
||||
OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_max_select "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 "
|
||||
"--query_log_max_sql_length={0} "
|
||||
"--query_log_max_plan_length={0}"
|
||||
.format(MAX_SQL_PLAN_LEN),
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_lower_max_sql_plan(self, vector):
|
||||
"""Asserts that lower limits on the sql and plan columns in the completed queries
|
||||
def test_table_structure(self, vector):
|
||||
"""Asserts that the log table has the expected columns."""
|
||||
self.run_test_case('QueryTest/workload-management-log', vector)
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_max_select "
|
||||
"--query_log_max_sql_length={0} "
|
||||
"--query_log_max_plan_length={0}"
|
||||
.format(MAX_SQL_PLAN_LEN),
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_lower_max_sql_plan(self, vector):
|
||||
"""Asserts that length limits on the sql and plan columns in the completed queries
|
||||
table are respected."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
rand_long_str = "".join(choice(string.ascii_letters) for _ in
|
||||
@@ -139,12 +147,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_max_select "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--cluster_id=test_max_select",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_over_max_sql_plan(self, vector):
|
||||
def test_sql_plan_too_long(self, vector):
|
||||
"""Asserts that very long queries have their corresponding plan and sql columns
|
||||
shortened in the completed queries table."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
@@ -177,15 +183,13 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 "
|
||||
"--query_log_size=0 "
|
||||
"--query_log_size_in_bytes=0",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_no_query_log_select(self, vector):
|
||||
def test_no_query_log(self, vector):
|
||||
"""Asserts queries are written to the completed queries table when the in-memory
|
||||
query log is turned off."""
|
||||
query log queue is turned off."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
|
||||
# Run a select query.
|
||||
@@ -208,14 +212,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_2 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 "
|
||||
"--always_use_data_cache "
|
||||
"--data_cache={0}:5GB".format(CACHE_DIR),
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True,
|
||||
cluster_size=1)
|
||||
def test_query_log_table_query_cache(self, vector):
|
||||
def test_query_data_cache(self, vector):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile. Specifically focuses on the data cache metrics."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
@@ -248,17 +250,15 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
# for this test to not actually assert anything different than other tests. Thus, an
|
||||
# additional assert is needed to ensure that there actually was data read from the
|
||||
# cache.
|
||||
assert data[BYTES_READ_CACHE_TOTAL] != "0", "bytes read from cache total was " \
|
||||
assert data["BYTES_READ_CACHE_TOTAL"] != "0", "bytes read from cache total was " \
|
||||
"zero, test did not assert anything"
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=5 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 ",
|
||||
"--query_log_write_interval_s=5",
|
||||
impala_log_dir=LOG_DIR_MAX_WRITES,
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_max_attempts_exceeded(self, vector):
|
||||
def test_max_attempts_exceeded(self, vector):
|
||||
"""Asserts that completed queries are only attempted 3 times to be inserted into the
|
||||
completed queries table. This test deletes the completed queries table thus it must
|
||||
not come last otherwise the table stays deleted. Subsequent tests will re-create
|
||||
@@ -303,8 +303,6 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_max_queued={0} "
|
||||
"--query_log_write_interval_s=9999 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 "
|
||||
"--cluster_id={1}"
|
||||
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
|
||||
FLUSH_MAX_RECORDS_CLUSTER_ID),
|
||||
@@ -312,7 +310,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
default_query_options=[
|
||||
('statement_expression_limit', 1024)],
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_flush_max_records(self, vector):
|
||||
def test_flush_on_queued_count_exceeded(self, vector):
|
||||
"""Asserts that queries that have completed are written to the query log table when
|
||||
the maximum number of queued records it reached. Also verifies that writing
|
||||
completed queries is not limited by default statement_expression_limit."""
|
||||
@@ -369,46 +367,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
"impala-server.completed-queries.queued") == 2
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=30 "
|
||||
"--blacklisted_dbs=information_schema "
|
||||
"--query_log_table_name={0}"
|
||||
.format(OTHER_TBL),
|
||||
catalogd_args="--enable_workload_mgmt "
|
||||
"--blacklisted_dbs=information_schema",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_different_table(self, vector):
|
||||
"""Asserts that the completed queries table can be renamed."""
|
||||
|
||||
client = self.get_client(vector.get_value('protocol'),
|
||||
"{}.{}".format(self.WM_DB, self.OTHER_TBL))
|
||||
|
||||
try:
|
||||
res = client.execute("show tables in {0}".format(self.WM_DB))
|
||||
assert res.success
|
||||
assert len(res.data) > 0, "could not find any tables in database {0}" \
|
||||
.format(self.WM_DB)
|
||||
|
||||
tbl_found = False
|
||||
for tbl in res.data:
|
||||
if tbl.startswith(self.OTHER_TBL):
|
||||
tbl_found = True
|
||||
break
|
||||
assert tbl_found, "could not find table '{0}' in database '{1}'" \
|
||||
.format(self.OTHER_TBL, self.WM_DB)
|
||||
finally:
|
||||
client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL))
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--query_log_write_interval_s=1",
|
||||
cluster_size=3,
|
||||
num_exclusive_coordinators=2,
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_query_select_dedicate_coordinator(self, vector):
|
||||
def test_dedicated_coordinator_no_mt_dop(self, vector):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile when dedicated coordinators are used."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
@@ -430,14 +394,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
client2.close()
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--query_log_write_interval_s=1",
|
||||
cluster_size=3,
|
||||
num_exclusive_coordinators=2,
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_query_select_mt_dop(self, vector):
|
||||
def test_dedicated_coordinator_with_mt_dop(self, vector):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile when dedicated coordinators are used along with an MT_DOP setting
|
||||
greater than 0."""
|
||||
@@ -461,6 +423,49 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
||||
client2.close()
|
||||
|
||||
|
||||
class TestQueryLogOtherTable(TestQueryLogTableBase):
|
||||
"""Tests to assert that query_log_table_name works with non-default value."""
|
||||
|
||||
OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
|
||||
# Used in TestQueryLogTableBase.setup_method
|
||||
QUERY_TBL = "{0}.{1}".format(TestQueryLogTableBase.WM_DB, OTHER_TBL)
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestQueryLogOtherTable, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
v.get_value('protocol') == 'beeswax')
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--blacklisted_dbs=information_schema "
|
||||
"--query_log_table_name={0}"
|
||||
.format(OTHER_TBL),
|
||||
catalogd_args="--enable_workload_mgmt "
|
||||
"--blacklisted_dbs=information_schema",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_renamed_log_table(self, vector):
|
||||
"""Asserts that the completed queries table can be renamed."""
|
||||
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
|
||||
try:
|
||||
res = client.execute("show tables in {0}".format(self.WM_DB))
|
||||
assert res.success
|
||||
assert len(res.data) > 0, "could not find any tables in database {0}" \
|
||||
.format(self.WM_DB)
|
||||
|
||||
tbl_found = False
|
||||
for tbl in res.data:
|
||||
if tbl.startswith(self.OTHER_TBL):
|
||||
tbl_found = True
|
||||
break
|
||||
assert tbl_found, "could not find table '{0}' in database '{1}'" \
|
||||
.format(self.OTHER_TBL, self.WM_DB)
|
||||
finally:
|
||||
client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL))
|
||||
|
||||
|
||||
class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
"""Tests to assert the query log table is correctly populated when using the HS2
|
||||
client protocol."""
|
||||
@@ -475,14 +480,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id={} "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60"
|
||||
"--cluster_id={}"
|
||||
.format(HS2_OPERATIONS_CLUSTER_ID),
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
cluster_size=2,
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_hs2_operations(self, vector):
|
||||
def test_hs2_metadata_operations(self, vector):
|
||||
"""Certain HS2 operations appear to Impala as a special kind of query. Specifically,
|
||||
these operations have a type of unknown and a normally invalid sql syntax. This
|
||||
test asserts those queries are not written to the completed queries table since
|
||||
@@ -605,13 +608,11 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_mult "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--cluster_id=test_query_hist_mult",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
cluster_size=2,
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_query_multiple(self, vector):
|
||||
def test_query_multiple_tables(self, vector):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile for a query that reads from multiple tables."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
@@ -636,12 +637,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_3 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--cluster_id=test_query_hist_3",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_query_insert_select(self, vector, unique_database,
|
||||
def test_insert_select(self, vector, unique_database,
|
||||
unique_name):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile for a query that insert selects."""
|
||||
@@ -670,12 +669,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
client2.close()
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=15 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--query_log_write_interval_s=15",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_flush_interval(self, vector):
|
||||
def test_flush_on_interval(self, vector):
|
||||
"""Asserts that queries that have completed are written to the query log table
|
||||
after the specified write interval elapses."""
|
||||
|
||||
@@ -694,10 +691,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=9999 "
|
||||
"--shutdown_grace_period_s=1 "
|
||||
"--shutdown_deadline_s=30",
|
||||
"--shutdown_grace_period_s=0 "
|
||||
"--shutdown_deadline_s=15",
|
||||
catalogd_args="--enable_workload_mgmt")
|
||||
def test_query_log_table_flush_on_shutdown(self, vector):
|
||||
def test_flush_on_shutdown(self, vector):
|
||||
"""Asserts that queries that have completed but are not yet written to the query
|
||||
log table are flushed to the table before the coordinator exits. Graceful shutdown
|
||||
for 2nd coordinator not needed because query_log_write_interval_s is very long."""
|
||||
@@ -734,7 +731,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
|
||||
|
||||
return success
|
||||
|
||||
assert retry(func=assert_func, max_attempts=5, sleep_time_s=5)
|
||||
assert retry(func=assert_func, max_attempts=5, sleep_time_s=3)
|
||||
finally:
|
||||
client2.close()
|
||||
|
||||
@@ -745,12 +742,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_2 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--cluster_id=test_query_hist_2",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_ddl(self, vector, unique_database, unique_name):
|
||||
def test_ddl(self, vector, unique_database, unique_name):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile for a DDL query."""
|
||||
create_tbl_sql = "create table {0}.{1} (id INT, product_name STRING) " \
|
||||
@@ -773,12 +768,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_3 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--cluster_id=test_query_hist_3",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_dml(self, vector, unique_database, unique_name):
|
||||
def test_dml(self, vector, unique_database, unique_name):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile for a DML query."""
|
||||
tbl_name = "{0}.{1}".format(unique_database, unique_name)
|
||||
@@ -808,12 +801,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_2 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 ",
|
||||
"--cluster_id=test_query_hist_2",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_invalid_query(self, vector):
|
||||
def test_invalid_query(self, vector):
|
||||
"""Asserts correct values are written to the completed queries table for a failed
|
||||
query. The query profile is used as the source of expected values."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
@@ -841,12 +832,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
expected_cluster_id="test_query_hist_2", impalad=impalad, query_id=result.data[0])
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--query_log_write_interval_s=1",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_ignored_sqls(self, vector):
|
||||
def test_ignored_sqls_not_written(self, vector):
|
||||
"""Asserts that expected queries are not written to the query log table."""
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
|
||||
@@ -935,12 +924,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
"impala-server.completed-queries.failure") == 0
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60",
|
||||
"--query_log_write_interval_s=1",
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_sql_injection(self, vector):
|
||||
def test_sql_injection_attempts(self, vector):
|
||||
client = self.get_client(vector.get_value('protocol'))
|
||||
impalad = self.cluster.get_first_impalad()
|
||||
|
||||
@@ -958,14 +945,21 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
|
||||
# Attempt to cause an error using multiline comments.
|
||||
sql3_str = "select 1' /* foo"
|
||||
self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 10, False)
|
||||
self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 11, False)
|
||||
|
||||
# Attempt to cause an error using single line comments.
|
||||
sql4_str = "select 1' -- foo"
|
||||
self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 13, False)
|
||||
self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 15, False)
|
||||
|
||||
def __run_sql_inject(self, impalad, client, sql, test_case, expected_writes,
|
||||
expect_success=True):
|
||||
# Capture coordinators "now" so we match only queries in this test case.
|
||||
start_time = None
|
||||
if not expect_success:
|
||||
utc_timestamp = self.execute_query('select utc_timestamp()')
|
||||
assert len(utc_timestamp.data) == 1
|
||||
start_time = utc_timestamp.data[0]
|
||||
|
||||
sql_result = None
|
||||
try:
|
||||
sql_result = client.execute(sql)
|
||||
@@ -993,11 +987,11 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
|
||||
.format(sql_result.query_id, test_case)
|
||||
assert sql_verify.data[0] == sql, test_case
|
||||
else:
|
||||
assert start_time is not None
|
||||
esc_sql = sql.replace("'", "\\'")
|
||||
sql_verify = client.execute("select sql from {0} where sql='{1}' "
|
||||
"and start_time_utc > "
|
||||
"date_sub(utc_timestamp(), interval 25 seconds);"
|
||||
.format(self.QUERY_TBL, esc_sql))
|
||||
"and start_time_utc > '{2}'"
|
||||
.format(self.QUERY_TBL, esc_sql, start_time))
|
||||
assert sql_verify.success, test_case
|
||||
assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \
|
||||
"table for test case '{1}" \
|
||||
@@ -1018,13 +1012,11 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
|
||||
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
||||
"--query_log_write_interval_s=1 "
|
||||
"--cluster_id=test_query_hist_1 "
|
||||
"--shutdown_grace_period_s=10 "
|
||||
"--shutdown_deadline_s=60 "
|
||||
"--scratch_dirs={0}:5G"
|
||||
.format(SCRATCH_DIR),
|
||||
catalogd_args="--enable_workload_mgmt",
|
||||
impalad_graceful_shutdown=True)
|
||||
def test_query_log_table_query_select(self, vector):
|
||||
def test_select(self, vector):
|
||||
"""Asserts the values written to the query log table match the values from the
|
||||
query profile. If the buffer_pool_limit parameter is not None, then this test
|
||||
requires that the query spills to disk to assert that the spill metrics are correct
|
||||
@@ -1066,5 +1058,5 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
|
||||
# potential for this test to not actually assert anything different than other
|
||||
# tests. Thus, an additional assert is needed to ensure that there actually was
|
||||
# data that was spilled.
|
||||
assert data[COMPRESSED_BYTES_SPILLED] != "0", "compressed bytes spilled total " \
|
||||
assert data["COMPRESSED_BYTES_SPILLED"] != "0", "compressed bytes spilled total " \
|
||||
"was zero, test did not assert anything"
|
||||
|
||||
Reference in New Issue
Block a user