mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Adds SHOW_METADATA_TABLES to the list of ignored DDL in workload management. Fixes DCHECK failure when running Impala's full test suite with 'enable_workload_mgmt'. Change-Id: I69f7de9756aa730d70cd9187c9f869d5bcf67fce Reviewed-on: http://gerrit.cloudera.org:8080/21290 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
1068 lines
50 KiB
Python
1068 lines
50 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
import os
|
|
import pytest
|
|
import string
|
|
import tempfile
|
|
|
|
from getpass import getuser
|
|
from ImpalaService import ImpalaHiveServer2Service
|
|
from random import choice, randint
|
|
from signal import SIGRTMIN
|
|
from TCLIService import TCLIService
|
|
from thrift.transport.TSocket import TSocket
|
|
from thrift.transport.TTransport import TBufferedTransport
|
|
from thrift.protocol import TBinaryProtocol
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
|
|
from tests.common.test_dimensions import create_single_exec_option_dimension
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
from tests.util.retry import retry
|
|
from tests.util.workload_management import assert_query, COMPRESSED_BYTES_SPILLED, \
|
|
BYTES_READ_CACHE_TOTAL
|
|
from time import sleep, time
|
|
|
|
|
|
class TestQueryLogTableBase(CustomClusterTestSuite):
|
|
"""Base class for all query log tests. Sets up the tests to use the Beeswax and HS2
|
|
client protocols."""
|
|
|
|
WM_DB = "sys"
|
|
QUERY_TBL = "{0}.impala_query_log".format(WM_DB)
|
|
PROTOCOL_BEESWAX = "beeswax"
|
|
PROTOCOL_HS2 = "hs2"
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryLogTableBase, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
|
|
cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
|
|
|
|
def get_client(self, protocol, query_table_name=QUERY_TBL):
|
|
"""Retrieves the default Impala client for the specified protocol. This client is
|
|
automatically closed after the test completes. Also ensures the completed queries
|
|
table has been successfully created by checking the logs to verify the create
|
|
table sql has finished."""
|
|
|
|
# These tests run very quickly and can actually complete before Impala has finished
|
|
# creating the completed queries table. Thus, to make these tests more robust, this
|
|
# code checks to make sure the table create has finished before returning.
|
|
create_re = r'\]\s+(\w+:\w+)\]\s+Analyzing query: CREATE TABLE IF NOT EXISTS {}' \
|
|
.format(query_table_name)
|
|
create_match = self.assert_impalad_log_contains("INFO", create_re)
|
|
|
|
finish_re = r'Query successfully unregistered: query_id={}' \
|
|
.format(create_match.group(1))
|
|
self.assert_impalad_log_contains("INFO", finish_re)
|
|
|
|
if protocol == self.PROTOCOL_BEESWAX:
|
|
return self.client
|
|
elif protocol == self.PROTOCOL_HS2:
|
|
return self.hs2_client
|
|
raise Exception("unknown protocol: {0}".format(protocol))
|
|
|
|
|
|
class TestQueryLogTableBeeswax(TestQueryLogTableBase):
|
|
"""Tests to assert the query log table is correctly populated when using the Beeswax
|
|
client protocol."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryLogTableBeeswax, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('protocol') == 'beeswax')
|
|
|
|
CACHE_DIR = tempfile.mkdtemp(prefix="cache_dir")
|
|
MAX_SQL_PLAN_LEN = 2000
|
|
LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
|
|
FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + str(int(time()))
|
|
FLUSH_MAX_RECORDS_QUERY_COUNT = 2
|
|
OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_max_select "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 "
|
|
"--query_log_max_sql_length={0} "
|
|
"--query_log_max_plan_length={0}"
|
|
.format(MAX_SQL_PLAN_LEN),
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_lower_max_sql_plan(self, vector):
|
|
"""Asserts that lower limits on the sql and plan columns in the completed queries
|
|
table are respected."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
rand_long_str = "".join(choice(string.ascii_letters) for _ in
|
|
range(self.MAX_SQL_PLAN_LEN))
|
|
|
|
# Run the query async to avoid fetching results since fetching such a large result was
|
|
# causing the execution to take a very long time.
|
|
handle = client.execute_async("select '{0}'".format(rand_long_str))
|
|
query_id = handle.get_handle().id
|
|
client.wait_for_finished_timeout(handle, 10)
|
|
client.close_query(handle)
|
|
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh " + self.QUERY_TBL)
|
|
|
|
res = client.execute("select length(sql),plan from {0} where query_id='{1}'"
|
|
.format(self.QUERY_TBL, query_id))
|
|
assert res.success
|
|
assert len(res.data) == 1
|
|
|
|
data = res.data[0].split("\t")
|
|
assert len(data) == 2
|
|
assert int(data[0]) == self.MAX_SQL_PLAN_LEN - 1, "incorrect sql statement length"
|
|
assert len(data[1]) == self.MAX_SQL_PLAN_LEN - data[1].count("\n") - 1, \
|
|
"incorrect plan length"
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_max_select "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_over_max_sql_plan(self, vector):
|
|
"""Asserts that very long queries have their corresponding plan and sql columns
|
|
shortened in the completed queries table."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
rand_long_str = "".join(choice(string.ascii_letters) for _ in range(16778200))
|
|
|
|
client.set_configuration_option("MAX_STATEMENT_LENGTH_BYTES", 16780000)
|
|
handle = client.execute_async("select '{0}'".format(rand_long_str))
|
|
query_id = handle.get_handle().id
|
|
client.wait_for_finished_timeout(handle, 10)
|
|
client.close_query(handle)
|
|
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh " + self.QUERY_TBL)
|
|
|
|
client.set_configuration_option("MAX_ROW_SIZE", 35000000)
|
|
res = client.execute("select length(sql),plan from {0} where query_id='{1}'"
|
|
.format(self.QUERY_TBL, query_id))
|
|
assert res.success
|
|
assert len(res.data) == 1
|
|
data = res.data[0].split("\t")
|
|
assert len(data) == 2
|
|
assert data[0] == "16777215"
|
|
|
|
# Newline characters are not counted by Impala's length function.
|
|
assert len(data[1]) == 16777216 - data[1].count("\n") - 1
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 "
|
|
"--query_log_size=0 "
|
|
"--query_log_size_in_bytes=0",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_no_query_log_select(self, vector):
|
|
"""Asserts queries are written to the completed queries table when the in-memory
|
|
query log is turned off."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Run a select query.
|
|
random_val = randint(1, 1000000)
|
|
select_sql = "select {0}".format(random_val)
|
|
res = client.execute(select_sql)
|
|
assert res.success
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh " + self.QUERY_TBL)
|
|
|
|
actual = client.execute("select sql from {0} where query_id='{1}'".format(
|
|
self.QUERY_TBL, res.query_id))
|
|
assert actual.success
|
|
assert len(actual.data) == 1
|
|
assert actual.data[0] == select_sql
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_2 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 "
|
|
"--always_use_data_cache "
|
|
"--data_cache={0}:5GB".format(CACHE_DIR),
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True,
|
|
cluster_size=1)
|
|
def test_query_log_table_query_cache(self, vector):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile. Specifically focuses on the data cache metrics."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Select all rows from the test table. Run the query multiple times to ensure data
|
|
# is cached.
|
|
warming_query_count = 3
|
|
select_sql = "select * from functional.tinytable"
|
|
for i in range(warming_query_count):
|
|
res = client.execute(select_sql)
|
|
assert res.success
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", i + 1, 60)
|
|
|
|
# Wait for the cache to be written to disk.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.io-mgr.remote-data-cache-num-writes", 1, 60)
|
|
|
|
# Run the same query again so results are read from the data cache.
|
|
res = client.execute(select_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", warming_query_count + 1, 60)
|
|
|
|
data = assert_query(self.QUERY_TBL, client, "test_query_hist_2",
|
|
res.runtime_profile)
|
|
|
|
# Since the assert_query function only asserts that the bytes read from cache
|
|
# column is equal to the bytes read from cache in the profile, there is a potential
|
|
# for this test to not actually assert anything different than other tests. Thus, an
|
|
# additional assert is needed to ensure that there actually was data read from the
|
|
# cache.
|
|
assert data[BYTES_READ_CACHE_TOTAL] != "0", "bytes read from cache total was " \
|
|
"zero, test did not assert anything"
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=5 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 ",
|
|
impala_log_dir=LOG_DIR_MAX_WRITES,
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_max_attempts_exceeded(self, vector):
|
|
"""Asserts that completed queries are only attempted 3 times to be inserted into the
|
|
completed queries table. This test deletes the completed queries table thus it must
|
|
not come last otherwise the table stays deleted. Subsequent tests will re-create
|
|
the table."""
|
|
|
|
print("USING LOG DIRECTORY: {0}".format(self.LOG_DIR_MAX_WRITES))
|
|
|
|
impalad = self.cluster.get_first_impalad()
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
res = client.execute("drop table {0} purge".format(self.QUERY_TBL))
|
|
assert res.success
|
|
impalad.service.wait_for_metric_value(
|
|
"impala-server.completed-queries.scheduled-writes", 3, 60)
|
|
impalad.service.wait_for_metric_value("impala-server.completed-queries.failure", 3,
|
|
60)
|
|
|
|
query_count = 0
|
|
|
|
# Allow time for logs to be written to disk.
|
|
sleep(5)
|
|
|
|
with open(os.path.join(self.LOG_DIR_MAX_WRITES, "impalad.ERROR")) as file:
|
|
for line in file:
|
|
if line.find('could not write completed query table="{0}" query_id="{1}"'
|
|
.format(self.QUERY_TBL, res.query_id)) >= 0:
|
|
query_count += 1
|
|
|
|
assert query_count == 1
|
|
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.max-records-writes") == 0
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.queued") == 0
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.failure") == 3
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.scheduled-writes") == 4
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.written") == 0
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_max_queued={0} "
|
|
"--query_log_write_interval_s=9999 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 "
|
|
"--cluster_id={1}"
|
|
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
|
|
FLUSH_MAX_RECORDS_CLUSTER_ID),
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_flush_max_records(self, vector):
|
|
"""Asserts that queries that have completed are written to the query log table when
|
|
the maximum number of queued records it reached."""
|
|
|
|
impalad = self.cluster.get_first_impalad()
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
rand_str = "{0}-{1}".format(vector.get_value('protocol'), time())
|
|
|
|
test_sql = "select '{0}','{1}'".format(rand_str,
|
|
self.FLUSH_MAX_RECORDS_CLUSTER_ID)
|
|
test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format(
|
|
rand_str, self.QUERY_TBL, test_sql.replace("'", r"\'"))
|
|
|
|
for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT):
|
|
res = client.execute(test_sql)
|
|
assert res.success
|
|
|
|
# Running this query results in the number of queued completed queries to exceed
|
|
# the max and thus all completed queries will be written to the query log table.
|
|
res = client.execute(test_sql_assert)
|
|
assert res.success
|
|
assert 1 == len(res.data)
|
|
assert "0" == res.data[0].split("\t")[1]
|
|
|
|
# Wait until the completed queries have all been written out because the max queued
|
|
# count was exceeded.
|
|
impalad.service.wait_for_metric_value(
|
|
"impala-server.completed-queries.max-records-writes", 1, 60)
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 3, 60)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh " + self.QUERY_TBL)
|
|
|
|
# This query will remain queued due to the long write interval and max queued
|
|
# records limit not being reached.
|
|
res = client.execute(r"select count(*) from {0} where sql like 'select \'{1}\'%'"
|
|
.format(self.QUERY_TBL, rand_str))
|
|
assert res.success
|
|
assert 1 == len(res.data)
|
|
assert "3" == res.data[0]
|
|
impalad.service.wait_for_metric_value(
|
|
"impala-server.completed-queries.queued", 2, 60)
|
|
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.max-records-writes") == 1
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.scheduled-writes") == 0
|
|
assert impalad.service.get_metric_value("impala-server.completed-queries.written") \
|
|
== self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1
|
|
assert impalad.service.get_metric_value(
|
|
"impala-server.completed-queries.queued") == 2
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=30 "
|
|
"--blacklisted_dbs=information_schema "
|
|
"--query_log_table_name={0}"
|
|
.format(OTHER_TBL),
|
|
catalogd_args="--enable_workload_mgmt "
|
|
"--blacklisted_dbs=information_schema",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_different_table(self, vector):
|
|
"""Asserts that the completed queries table can be renamed."""
|
|
|
|
client = self.get_client(vector.get_value('protocol'),
|
|
"{}.{}".format(self.WM_DB, self.OTHER_TBL))
|
|
|
|
try:
|
|
res = client.execute("show tables in {0}".format(self.WM_DB))
|
|
assert res.success
|
|
assert len(res.data) > 0, "could not find any tables in database {0}" \
|
|
.format(self.WM_DB)
|
|
|
|
tbl_found = False
|
|
for tbl in res.data:
|
|
if tbl.startswith(self.OTHER_TBL):
|
|
tbl_found = True
|
|
break
|
|
assert tbl_found, "could not find table '{0}' in database '{1}'" \
|
|
.format(self.OTHER_TBL, self.WM_DB)
|
|
finally:
|
|
client.execute("drop table {0}.{1} purge".format(self.WM_DB, self.OTHER_TBL))
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=False)
|
|
def test_query_log_table_query_select_dedicate_coordinator(self, vector):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile when dedicated coordinators are used."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
test_sql = "select * from functional.tinytable"
|
|
|
|
# Select all rows from the test table.
|
|
res = client.execute(test_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "",
|
|
res.runtime_profile)
|
|
finally:
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=False)
|
|
def test_query_log_table_query_select_mt_dop(self, vector):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile when dedicated coordinators are used along with an MT_DOP setting
|
|
greater than 0."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
test_sql = "select * from functional.tinytable"
|
|
|
|
# Select all rows from the test table.
|
|
client.set_configuration_option("MT_DOP", "4")
|
|
res = client.execute(test_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "",
|
|
res.runtime_profile)
|
|
finally:
|
|
client2.close()
|
|
|
|
|
|
class TestQueryLogTableHS2(TestQueryLogTableBase):
|
|
"""Tests to assert the query log table is correctly populated when using the HS2
|
|
client protocol."""
|
|
|
|
HS2_OPERATIONS_CLUSTER_ID = "hs2-operations-" + str(int(time()))
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryLogTableHS2, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('protocol') == 'hs2')
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id={} "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60"
|
|
.format(HS2_OPERATIONS_CLUSTER_ID),
|
|
catalogd_args="--enable_workload_mgmt",
|
|
cluster_size=2,
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_hs2_operations(self, vector):
|
|
"""Certain HS2 operations appear to Impala as a special kind of query. Specifically,
|
|
these operations have a type of unknown and a normally invalid sql syntax. This
|
|
test asserts those queries are not written to the completed queries table since
|
|
they are trivial."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
host, port = IMPALAD_HS2_HOST_PORT.split(":")
|
|
socket = TSocket(host, port)
|
|
transport = TBufferedTransport(socket)
|
|
transport.open()
|
|
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
|
hs2_client = ImpalaHiveServer2Service.Client(protocol)
|
|
|
|
# Asserts the response from an HS2 operation indicates success.
|
|
def assert_resp(resp):
|
|
assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS
|
|
|
|
# Closes an HS2 operation.
|
|
def close_op(client, resp):
|
|
close_operation_req = TCLIService.TCloseOperationReq()
|
|
close_operation_req.operationHandle = resp.operationHandle
|
|
assert_resp(hs2_client.CloseOperation(close_operation_req))
|
|
|
|
try:
|
|
# Open a new HS2 session.
|
|
open_session_req = TCLIService.TOpenSessionReq()
|
|
open_session_req.username = getuser()
|
|
open_session_req.configuration = dict()
|
|
open_sess_resp = hs2_client.OpenSession(open_session_req)
|
|
assert_resp(open_sess_resp)
|
|
|
|
# Test the get_type_info query.
|
|
get_typeinfo_req = TCLIService.TGetTypeInfoReq()
|
|
get_typeinfo_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_typeinfo_resp = hs2_client.GetTypeInfo(get_typeinfo_req)
|
|
assert_resp(get_typeinfo_resp)
|
|
close_op(hs2_client, get_typeinfo_resp)
|
|
|
|
# Test the get_catalogs query.
|
|
get_cats_req = TCLIService.TGetCatalogsReq()
|
|
get_cats_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_cats_resp = hs2_client.GetCatalogs(get_cats_req)
|
|
assert_resp(get_cats_resp)
|
|
close_op(hs2_client, get_cats_resp)
|
|
|
|
# Test the get_schemas query.
|
|
get_schemas_req = TCLIService.TGetSchemasReq()
|
|
get_schemas_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_schemas_resp = hs2_client.GetSchemas(get_schemas_req)
|
|
assert_resp(get_schemas_resp)
|
|
close_op(hs2_client, get_schemas_resp)
|
|
|
|
# Test the get_tables query.
|
|
get_tables_req = TCLIService.TGetTablesReq()
|
|
get_tables_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_tables_req.schemaName = self.WM_DB
|
|
get_tables_resp = hs2_client.GetTables(get_tables_req)
|
|
assert_resp(get_tables_resp)
|
|
close_op(hs2_client, get_tables_resp)
|
|
|
|
# Test the get_table_types query.
|
|
get_tbl_typ_req = TCLIService.TGetTableTypesReq()
|
|
get_tbl_typ_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_tbl_typ_req.schemaName = self.WM_DB
|
|
get_tbl_typ_resp = hs2_client.GetTableTypes(get_tbl_typ_req)
|
|
assert_resp(get_tbl_typ_resp)
|
|
close_op(hs2_client, get_tbl_typ_resp)
|
|
|
|
# Test the get_columns query.
|
|
get_cols_req = TCLIService.TGetColumnsReq()
|
|
get_cols_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_cols_req.schemaName = 'functional'
|
|
get_cols_req.tableName = 'parent_table'
|
|
get_cols_resp = hs2_client.GetColumns(get_cols_req)
|
|
assert_resp(get_cols_resp)
|
|
close_op(hs2_client, get_cols_resp)
|
|
|
|
# Test the get_primary_keys query.
|
|
get_pk_req = TCLIService.TGetPrimaryKeysReq()
|
|
get_pk_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_pk_req.schemaName = 'functional'
|
|
get_pk_req.tableName = 'parent_table'
|
|
get_pk_resp = hs2_client.GetPrimaryKeys(get_pk_req)
|
|
assert_resp(get_pk_resp)
|
|
close_op(hs2_client, get_pk_resp)
|
|
|
|
# Test the get_cross_reference query.
|
|
get_cr_req = TCLIService.TGetCrossReferenceReq()
|
|
get_cr_req.sessionHandle = open_sess_resp.sessionHandle
|
|
get_cr_req.parentSchemaName = "functional"
|
|
get_cr_req.foreignSchemaName = "functional"
|
|
get_cr_req.parentTableName = "parent_table"
|
|
get_cr_req.foreignTableName = "child_table"
|
|
get_cr_resp = hs2_client.GetCrossReference(get_cr_req)
|
|
assert_resp(get_cr_resp)
|
|
close_op(hs2_client, get_cr_resp)
|
|
|
|
close_session_req = TCLIService.TCloseSessionReq()
|
|
close_session_req.sessionHandle = open_sess_resp.sessionHandle
|
|
resp = hs2_client.CloseSession(close_session_req)
|
|
assert resp.status.statusCode == TCLIService.TStatusCode.SUCCESS_STATUS
|
|
finally:
|
|
socket.close()
|
|
|
|
# Execute a general query and wait for it to appear in the completed queries table to
|
|
# ensure there are no false positives caused by the assertion query executing before
|
|
# Impala has a chance to write queued queries to the completed queries table.
|
|
assert client.execute("select 1").success
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 30)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh {}".format(self.QUERY_TBL))
|
|
|
|
# Assert only the one expected query was written to the completed queries table.
|
|
assert_results = client.execute("select count(*) from {} where cluster_id='{}'"
|
|
.format(self.QUERY_TBL, self.HS2_OPERATIONS_CLUSTER_ID))
|
|
assert assert_results.success
|
|
assert assert_results.data[0] == "1"
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_mult "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
cluster_size=2,
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_query_multiple(self, vector):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile for a query that reads from multiple tables."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Select all rows from the test table.
|
|
client.set_configuration_option("MAX_MEM_ESTIMATE_FOR_ADMISSION", "10MB")
|
|
res = client.execute("select a.zip,a.income,b.timezone,c.timezone from "
|
|
"functional.zipcode_incomes a inner join functional.zipcode_timezones b on "
|
|
"a.zip = b.zip inner join functional.alltimezones c on b.timezone = c.timezone",
|
|
fetch_profile_after_close=True)
|
|
assert res.success
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "test_query_hist_mult", res.runtime_profile,
|
|
max_mem_for_admission=10485760)
|
|
finally:
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_3 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_query_insert_select(self, vector, unique_database,
|
|
unique_name):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile for a query that insert selects."""
|
|
tbl_name = "{0}.{1}".format(unique_database, unique_name)
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Create the destination test table.
|
|
assert client.execute("create table {0} (identifier INT, product_name STRING) "
|
|
.format(tbl_name)).success, "could not create source table"
|
|
|
|
# Insert select into the destination table.
|
|
res = client.execute("insert into {0} (identifier, product_name) select id, "
|
|
"string_col from functional.alltypes limit 50".format(tbl_name),
|
|
fetch_profile_after_close=True)
|
|
assert res.success, "could not insert select"
|
|
|
|
# Include the two queries run by the unique_database fixture setup.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 4, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile)
|
|
finally:
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=15 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_flush_interval(self, vector):
|
|
"""Asserts that queries that have completed are written to the query log table
|
|
after the specified write interval elapses."""
|
|
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
query_count = 10
|
|
|
|
for i in range(query_count):
|
|
res = client.execute("select sleep(1000)")
|
|
assert res.success
|
|
|
|
# At least 10 seconds have already elapsed, wait up to 10 more seconds for the
|
|
# queries to be written to the completed queries table.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", query_count, 10)
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=9999 "
|
|
"--shutdown_grace_period_s=30 "
|
|
"--shutdown_deadline_s=30",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=False)
|
|
def test_query_log_table_flush_on_shutdown(self, vector):
|
|
"""Asserts that queries that have completed but are not yet written to the query
|
|
log table are flushed to the table before the coordinator exits."""
|
|
|
|
impalad = self.cluster.get_first_impalad()
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Execute sql statements to ensure all get written to the query log table.
|
|
sql1 = client.execute("select 1")
|
|
assert sql1.success
|
|
|
|
sql2 = client.execute("select 2")
|
|
assert sql2.success
|
|
|
|
sql3 = client.execute("select 3")
|
|
assert sql3.success
|
|
|
|
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3,
|
|
60)
|
|
|
|
impalad.kill_and_wait_for_exit(SIGRTMIN)
|
|
|
|
client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol'))
|
|
|
|
try:
|
|
def assert_func(last_iteration):
|
|
results = client2.execute("select query_id,sql from {0} where query_id in "
|
|
"('{1}','{2}','{3}')".format(self.QUERY_TBL,
|
|
sql1.query_id, sql2.query_id, sql3.query_id))
|
|
|
|
success = len(results.data) == 3
|
|
if last_iteration:
|
|
assert len(results.data) == 3
|
|
|
|
return success
|
|
|
|
assert retry(func=assert_func, max_attempts=5, sleep_time_s=5)
|
|
finally:
|
|
client2.close()
|
|
|
|
|
|
class TestQueryLogTableAll(TestQueryLogTableBase):
|
|
"""Tests to assert the query log table is correctly populated when using all the
|
|
client protocols."""
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_2 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_ddl(self, vector, unique_database, unique_name):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile for a DDL query."""
|
|
create_tbl_sql = "create table {0}.{1} (id INT, product_name STRING) " \
|
|
"partitioned by (category INT)".format(unique_database, unique_name)
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
res = client.execute(create_tbl_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
|
|
# Include the two queries run by the unique_database fixture setup.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 3, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "test_query_hist_2", res.runtime_profile)
|
|
finally:
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_3 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_dml(self, vector, unique_database, unique_name):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile for a DML query."""
|
|
tbl_name = "{0}.{1}".format(unique_database, unique_name)
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Create the test table.
|
|
create_tbl_sql = "create table {0} (id INT, product_name STRING) " \
|
|
"partitioned by (category INT)".format(tbl_name)
|
|
create_tbl_results = client.execute(create_tbl_sql)
|
|
assert create_tbl_results.success
|
|
|
|
insert_sql = "insert into {0} (id,category,product_name) values " \
|
|
"(0,1,'the product')".format(tbl_name)
|
|
res = client.execute(insert_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
|
|
# Include the two queries run by the unique_database fixture setup.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 4, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile)
|
|
finally:
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_2 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 ",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_invalid_query(self, vector):
|
|
"""Asserts correct values are written to the completed queries table for a failed
|
|
query. The query profile is used as the source of expected values."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
# Assert an invalid query
|
|
unix_now = time()
|
|
try:
|
|
client.execute("{0}".format(unix_now))
|
|
except Exception as _:
|
|
pass
|
|
|
|
# Get the query id from the completed queries table since the call to execute errors
|
|
# instead of return the results object which contains the query id.
|
|
impalad = self.cluster.get_first_impalad()
|
|
impalad.service.wait_for_metric_value("impala-server.completed-queries.written", 1,
|
|
60)
|
|
|
|
result = client.execute("select query_id from {0} where sql='{1}'"
|
|
.format(self.QUERY_TBL, unix_now),
|
|
fetch_profile_after_close=True)
|
|
assert result.success
|
|
assert len(result.data) == 1
|
|
|
|
assert_query(query_tbl=self.QUERY_TBL, client=client,
|
|
expected_cluster_id="test_query_hist_2", impalad=impalad, query_id=result.data[0])
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_ignored_sqls(self, vector):
|
|
"""Asserts that expected queries are not written to the query log table."""
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
|
|
sqls = {}
|
|
sqls["use default"] = False
|
|
sqls["USE default"] = False
|
|
sqls["uSe default"] = False
|
|
sqls["--mycomment\nuse default"] = False
|
|
sqls["/*mycomment*/ use default"] = False
|
|
|
|
sqls["set all"] = False
|
|
sqls["SET all"] = False
|
|
sqls["SeT all"] = False
|
|
sqls["--mycomment\nset all"] = False
|
|
sqls["/*mycomment*/ set all"] = False
|
|
|
|
sqls["show tables"] = False
|
|
sqls["SHOW tables"] = False
|
|
sqls["ShoW tables"] = False
|
|
sqls["ShoW create table {0}".format(self.QUERY_TBL)] = False
|
|
sqls["show databases"] = False
|
|
sqls["SHOW databases"] = False
|
|
sqls["ShoW databases"] = False
|
|
sqls["show schemas"] = False
|
|
sqls["SHOW schemas"] = False
|
|
sqls["ShoW schemas"] = False
|
|
sqls["--mycomment\nshow tables"] = False
|
|
sqls["/*mycomment*/ show tables"] = False
|
|
sqls["/*mycomment*/ show tables"] = False
|
|
sqls["/*mycomment*/ show create table {0}".format(self.QUERY_TBL)] = False
|
|
sqls["/*mycomment*/ show files in {0}".format(self.QUERY_TBL)] = False
|
|
sqls["/*mycomment*/ show functions"] = False
|
|
sqls["/*mycomment*/ show data sources"] = False
|
|
sqls["/*mycomment*/ show views"] = False
|
|
sqls["show metadata tables in {0}".format(self.QUERY_TBL)] = False
|
|
|
|
sqls["describe database default"] = False
|
|
sqls["/*mycomment*/ describe database default"] = False
|
|
sqls["describe {0}".format(self.QUERY_TBL)] = False
|
|
sqls["/*mycomment*/ describe {0}".format(self.QUERY_TBL)] = False
|
|
sqls["describe history {0}".format(self.QUERY_TBL)] = False
|
|
sqls["/*mycomment*/ describe history {0}".format(self.QUERY_TBL)] = False
|
|
sqls["select 1"] = True
|
|
|
|
control_queries_count = 0
|
|
for sql, experiment_control in sqls.items():
|
|
results = client.execute(sql)
|
|
assert results.success, "could not execute query '{0}'".format(sql)
|
|
sqls[sql] = results.query_id
|
|
|
|
# Ensure at least one sql statement was written to the completed queries table
|
|
# to avoid false negatives where the sql statements that are ignored are not
|
|
# written to the completed queries table because of another issue. Does not check
|
|
# the completed-queries.written metric because, if another query that should not
|
|
# have been written to the completed queries was actually written, the metric will
|
|
# be wrong.
|
|
if experiment_control:
|
|
control_queries_count += 1
|
|
sql_results = None
|
|
for _ in range(6):
|
|
sql_results = client.execute("select * from {0} where query_id='{1}'".format(
|
|
self.QUERY_TBL, results.query_id))
|
|
control_queries_count += 1
|
|
if sql_results.success and len(sql_results.data) == 1:
|
|
break
|
|
else:
|
|
# The query is not yet available in the completed queries table, wait before
|
|
# checking again.
|
|
sleep(5)
|
|
assert sql_results.success
|
|
assert len(sql_results.data) == 1, "query not found in completed queries table"
|
|
sqls.pop(sql)
|
|
|
|
for sql, query_id in sqls.items():
|
|
log_results = client.execute("select * from {0} where query_id='{1}'"
|
|
.format(self.QUERY_TBL, query_id))
|
|
assert log_results.success
|
|
assert len(log_results.data) == 0, "found query in query log table: {0}".format(sql)
|
|
|
|
# Assert there was one query per sql item written to the query log table. The queries
|
|
# inserted into the completed queries table are the queries used to assert the ignored
|
|
# queries were not written to the table.
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", len(sqls) + control_queries_count, 60)
|
|
assert self.cluster.get_first_impalad().service.get_metric_value(
|
|
"impala-server.completed-queries.failure") == 0
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60",
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_sql_injection(self, vector):
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
impalad = self.cluster.get_first_impalad()
|
|
|
|
# Try a sql injection attack with closing double quotes.
|
|
sql1_str = "select * from functional.alltypes where string_col='product-2-3\"'"
|
|
self.__run_sql_inject(impalad, client, sql1_str, "closing quotes", 1)
|
|
|
|
# Try a sql injection attack with closing single quotes.
|
|
sql1_str = "select * from functional.alltypes where string_col=\"product-2-3'\""
|
|
self.__run_sql_inject(impalad, client, sql1_str, "closing quotes", 4)
|
|
|
|
# Try a sql inject attack with terminating quote and semicolon.
|
|
sql2_str = "select 1'); drop table {0}; select('".format(self.QUERY_TBL)
|
|
self.__run_sql_inject(impalad, client, sql2_str, "terminating semicolon", 7)
|
|
|
|
# Attempt to cause an error using multiline comments.
|
|
sql3_str = "select 1' /* foo"
|
|
self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 10, False)
|
|
|
|
# Attempt to cause an error using single line comments.
|
|
sql4_str = "select 1' -- foo"
|
|
self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 13, False)
|
|
|
|
def __run_sql_inject(self, impalad, client, sql, test_case, expected_writes,
|
|
expect_success=True):
|
|
sql_result = None
|
|
try:
|
|
sql_result = client.execute(sql)
|
|
except Exception as e:
|
|
if expect_success:
|
|
raise e
|
|
|
|
if expect_success:
|
|
assert sql_result.success
|
|
|
|
impalad.service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", expected_writes, 60)
|
|
|
|
# Force Impala to process the inserts to the completed queries table.
|
|
client.execute("refresh " + self.QUERY_TBL)
|
|
|
|
if expect_success:
|
|
sql_verify = client.execute(
|
|
"select sql from {0} where query_id='{1}'"
|
|
.format(self.QUERY_TBL, sql_result.query_id))
|
|
|
|
assert sql_verify.success, test_case
|
|
assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \
|
|
"table for test case '{1}" \
|
|
.format(sql_result.query_id, test_case)
|
|
assert sql_verify.data[0] == sql, test_case
|
|
else:
|
|
esc_sql = sql.replace("'", "\\'")
|
|
sql_verify = client.execute("select sql from {0} where sql='{1}' "
|
|
"and start_time_utc > "
|
|
"date_sub(utc_timestamp(), interval 25 seconds);"
|
|
.format(self.QUERY_TBL, esc_sql))
|
|
assert sql_verify.success, test_case
|
|
assert len(sql_verify.data) == 1, "did not find query '{0}' in query log " \
|
|
"table for test case '{1}" \
|
|
.format(esc_sql, test_case)
|
|
|
|
|
|
class TestQueryLogTableBufferPool(TestQueryLogTableBase):
|
|
"""Base class for all query log tests that set the buffer pool query option."""
|
|
|
|
SCRATCH_DIR = tempfile.mkdtemp(prefix="scratch_dir")
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryLogTableBufferPool, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('buffer_pool_limit',
|
|
None, "14.97MB"))
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
|
|
"--query_log_write_interval_s=1 "
|
|
"--cluster_id=test_query_hist_1 "
|
|
"--shutdown_grace_period_s=10 "
|
|
"--shutdown_deadline_s=60 "
|
|
"--scratch_dirs={0}:5G"
|
|
.format(SCRATCH_DIR),
|
|
catalogd_args="--enable_workload_mgmt",
|
|
impalad_graceful_shutdown=True)
|
|
def test_query_log_table_query_select(self, vector):
|
|
"""Asserts the values written to the query log table match the values from the
|
|
query profile. If the buffer_pool_limit parameter is not None, then this test
|
|
requires that the query spills to disk to assert that the spill metrics are correct
|
|
in the completed queries table."""
|
|
buffer_pool_limit = vector.get_value('buffer_pool_limit')
|
|
client = self.get_client(vector.get_value('protocol'))
|
|
test_sql = "select * from functional.tinytable"
|
|
|
|
# When buffer pool limit is not None, the test is forcing the query to spill. Thus,
|
|
# a large number of records is needed to force the spilling.
|
|
if buffer_pool_limit is not None:
|
|
test_sql = "select a.*,b.*,c.* from " \
|
|
"functional.zipcode_incomes a inner join functional.zipcode_timezones b on " \
|
|
"a.zip = b.zip inner join functional.alltimezones c on b.timezone = c.timezone"
|
|
|
|
# Set up query configuration
|
|
client.set_configuration_option("MAX_MEM_ESTIMATE_FOR_ADMISSION", "10MB")
|
|
if buffer_pool_limit is not None:
|
|
client.set_configuration_option("BUFFER_POOL_LIMIT", buffer_pool_limit)
|
|
|
|
# Select all rows from the test table.
|
|
res = client.execute(test_sql, fetch_profile_after_close=True)
|
|
assert res.success
|
|
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 1, 60)
|
|
|
|
client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol'))
|
|
try:
|
|
assert client2 is not None
|
|
data = assert_query(self.QUERY_TBL, client2, "test_query_hist_1",
|
|
res.runtime_profile, max_mem_for_admission=10485760)
|
|
finally:
|
|
client2.close()
|
|
|
|
if buffer_pool_limit is not None:
|
|
# Since the assert_query function only asserts that the compressed bytes spilled
|
|
# column is equal to the compressed bytes spilled in the profile, there is a
|
|
# potential for this test to not actually assert anything different than other
|
|
# tests. Thus, an additional assert is needed to ensure that there actually was
|
|
# data that was spilled.
|
|
assert data[COMPRESSED_BYTES_SPILLED] != "0", "compressed bytes spilled total " \
|
|
"was zero, test did not assert anything"
|