mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Local catalog mode has been the default and works well in downstream Impala for over 5 years. This patch turn on local catalog mode by default (--catalog_topic_mode=minimal and --use_local_catalog=true) as preferred mode going forward. Implemented LocalCatalog.setIsReady() to facilitate using local catalog mode for FE tests. Some FE tests fail due to behavior differences in local catalog mode like IMPALA-7539. This is probably OK since Impala now largely hand over FileSystem permission check to Apache Ranger. The following custom cluster tests are pinned to evaluate under legacy catalog mode because their behavior changed in local catalog mode: TestCalcitePlanner.test_calcite_frontend TestCoordinators.test_executor_only_lib_cache TestMetadataReplicas TestTupleCacheCluster TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal At TestHBaseHmsColumnOrder.test_hbase_hms_column_order, set --use_hms_column_order_for_hbase_tables=true flag for both impalad and catalogd to get consistent column order in either local or legacy catalog mode. Changed TestCatalogRpcErrors.test_register_subscriber_rpc_error assertions to be more fine grained by matching individual query id. Move most of test methods from TestRangerLegacyCatalog to TestRangerLocalCatalog, except for some that do need to run in legacy catalog mode. Also renamed TestRangerLocalCatalog to TestRangerDefaultCatalog. Table ownership issue in local catalog mode remains unresolved (see IMPALA-8937). Testing: Pass exhaustive tests. Change-Id: Ie303e294972d12b98f8354bf6bbc6d0cb920060f Reviewed-on: http://gerrit.cloudera.org:8080/23080 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
527 lines
26 KiB
Python
527 lines
26 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
import re
|
|
|
|
from getpass import getuser
|
|
from signal import SIGRTMIN
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.impala_cluster import DEFAULT_KRPC_PORT
|
|
from tests.common.impala_connection import ERROR, FINISHED, PENDING
|
|
from tests.common.test_vector import HS2
|
|
from tests.util.workload_management import assert_query, redaction_rules_file
|
|
from time import sleep
|
|
|
|
|
|
class TestQueryLive(CustomClusterTestSuite):
|
|
"""Tests to assert the query live table is correctly populated.
|
|
This test class does not extend from WorkloadManagementTestSuite due to long
|
|
--query_log_write_interval_s requirement in most test method."""
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
def setup_method(self, method):
|
|
super(TestQueryLive, self).setup_method(method)
|
|
self.wait_for_wm_init_complete()
|
|
|
|
def assert_describe_extended(self):
|
|
describe_ext_result = self.execute_query('describe extended sys.impala_query_live')
|
|
# Alter can add additional event fields. Filter them out.
|
|
describe_ext_data = [
|
|
line for line in describe_ext_result.data if 'impala.events.catalog' not in line]
|
|
assert len(describe_ext_data) == 88
|
|
system_table_re = re.compile(r'__IMPALA_SYSTEM_TABLE\s+true')
|
|
assert list(filter(system_table_re.search, describe_ext_data))
|
|
external_re = re.compile(r'EXTERNAL\s+TRUE')
|
|
assert list(filter(external_re.search, describe_ext_data))
|
|
external_table_re = re.compile(r'Table Type:\s+EXTERNAL_TABLE')
|
|
assert list(filter(external_table_re.search, describe_ext_data))
|
|
|
|
def assert_impalads(self, profile, present=[0, 1, 2], absent=[]):
|
|
for port_idx in present:
|
|
assert ":" + str(DEFAULT_KRPC_PORT + port_idx) + ":" in profile
|
|
for port_idx in absent:
|
|
assert ":" + str(DEFAULT_KRPC_PORT + port_idx) not in profile
|
|
|
|
def assert_only_coordinators(self, profile, coords=[0, 1], execs=[2]):
|
|
self.assert_impalads(profile, coords, execs)
|
|
assert "SYSTEM_TABLE_SCAN_NODE (id=0) [{} instances]".format(len(coords)) in profile
|
|
|
|
def assert_fragment_instances(self, profile, expected):
|
|
"""Asserts that the per host number of fragment instances is as expected."""
|
|
hosts = ['{}({})'.format(DEFAULT_KRPC_PORT + i, expect)
|
|
for i, expect in enumerate(expected)]
|
|
actual_hosts = re.search(r'Per Host Number of Fragment Instances: (.*)', profile)
|
|
assert actual_hosts is not None
|
|
# Split and remove hostname
|
|
actual_hosts = [host.split(':')[1] for host in actual_hosts.group(1).split(' ')]
|
|
assert len(hosts) == len(actual_hosts)
|
|
for host in hosts:
|
|
if host in actual_hosts:
|
|
actual_hosts.remove(host)
|
|
else:
|
|
assert False, "did not find host {}".format(host)
|
|
assert len(actual_hosts) == 0, "did not find all expected hosts"
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_query_live_hs2(self):
|
|
"""Asserts the query live table shows and allows filtering queries. Uses the hs2
|
|
client to connect to Impala."""
|
|
# Use a query that reads data from disk for the 1st one, as more representative and a
|
|
# better fit for assert_query.
|
|
result1 = self.hs2_client.execute("select * from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.hs2_client, 'test_query_live',
|
|
result1.runtime_profile)
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_query_live(self):
|
|
"""Asserts the query live table shows and allows filtering queries. Uses the default
|
|
client to connect to Impala."""
|
|
# Use a query that reads data from disk for the 1st one, as more representative and a
|
|
# better fit for assert_query.
|
|
result1 = self.client.execute("select * from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.client, 'test_query_live',
|
|
result1.runtime_profile)
|
|
|
|
# pending queries
|
|
result2 = self.execute_query(
|
|
"select query_id, cluster_id from sys.impala_query_live order by start_time_utc")
|
|
assert len(result2.data) == 3
|
|
assert "{}\t{}".format(result1.query_id, "test_query_live") == result2.data[0]
|
|
assert "{}\t{}".format(result2.query_id, "test_query_live") == result2.data[2]
|
|
# Expect new metrics for the impala_query_live table scanner.
|
|
assert "ActiveQueryCollectionTime: " in result2.runtime_profile
|
|
assert "PendingQueryCollectionTime: " in result2.runtime_profile
|
|
|
|
# query filtering
|
|
result3 = self.execute_query(
|
|
"select query_id from sys.impala_query_live "
|
|
"where total_time_ms > 0.0 order by start_time_utc")
|
|
assert len(result3.data) == 4
|
|
assert result1.query_id == result3.data[0]
|
|
assert result2.query_id == result3.data[2]
|
|
assert result3.query_id == result3.data[3]
|
|
|
|
result4 = self.execute_query(
|
|
"select db_name, db_user, count(*) as query_count from sys.impala_query_live "
|
|
"group by db_name, db_user order by db_name")
|
|
assert len(result4.data) == 1
|
|
assert "default\t{}\t5".format(getuser()) == result4.data[0]
|
|
|
|
result5 = self.execute_query(
|
|
'select * from sys.impala_query_live where cluster_id = "test_query_live_0"')
|
|
assert len(result5.data) == 0
|
|
|
|
result = self.execute_query("""
|
|
select count(*) from functional.alltypestiny a
|
|
inner join functional.alltypes b on a.id = b.id
|
|
inner join functional.alltypessmall c on b.id = c.id
|
|
""")
|
|
result5 = self.execute_query(
|
|
'select tables_queried from sys.impala_query_live where query_id = "'
|
|
+ result.query_id + '"')
|
|
assert len(result5.data) == 1
|
|
assert result5.data[0] == \
|
|
"functional.alltypes,functional.alltypestiny,functional.alltypessmall"
|
|
|
|
# describe query
|
|
describe_result = self.execute_query('describe sys.impala_query_live')
|
|
assert len(describe_result.data) == 56
|
|
self.assert_describe_extended()
|
|
|
|
# show create table
|
|
show_create_tbl = self.execute_query('show create table sys.impala_query_live')
|
|
assert len(show_create_tbl.data) == 1
|
|
assert 'CREATE EXTERNAL TABLE sys.impala_query_live' in show_create_tbl.data[0]
|
|
assert "'__IMPALA_SYSTEM_TABLE'='true'" in show_create_tbl.data[0]
|
|
|
|
# cannot compute stats or perform write operations
|
|
compute_stats_result = self.execute_query_expect_failure(self.client,
|
|
'compute stats sys.impala_query_live')
|
|
assert 'AnalysisException: COMPUTE STATS not supported for system table: '\
|
|
'sys.impala_query_live' in str(compute_stats_result)
|
|
|
|
create_result = self.execute_query_expect_failure(self.client,
|
|
'create table sys.impala_query_live (i int)')
|
|
assert 'AnalysisException: Table already exists: sys.impala_query_live'\
|
|
in str(create_result)
|
|
|
|
insert_result = self.execute_query_expect_failure(self.client,
|
|
'insert into sys.impala_query_live select * from sys.impala_query_live limit 1')
|
|
assert 'UnsupportedOperationException: Cannot create data sink into table of type: '\
|
|
'org.apache.impala.catalog' in str(insert_result)
|
|
assert 'SystemTable' in str(insert_result)
|
|
|
|
update_result = self.execute_query_expect_failure(self.client,
|
|
'update sys.impala_query_live set query_id = ""')
|
|
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
|
|
'but the following table is neither: sys.impala_query_live'\
|
|
in str(update_result)
|
|
|
|
delete_result = self.execute_query_expect_failure(self.client,
|
|
'delete from sys.impala_query_live')
|
|
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
|
|
'but the following table is neither: sys.impala_query_live'\
|
|
in str(delete_result)
|
|
|
|
# Drop table at the end, it's only recreated on impalad startup.
|
|
self.execute_query_expect_success(self.client, 'drop table sys.impala_query_live')
|
|
|
|
# Must come directly after "drop table sys.impala_query_live"
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live "
|
|
"--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal",
|
|
workload_mgmt=True,
|
|
default_query_options=[
|
|
('default_transactional_type', 'insert_only')],
|
|
disable_log_buffering=True)
|
|
def test_default_transactional(self):
|
|
"""Asserts the query live table works when impala is started with
|
|
default_transactional_type=insert_only."""
|
|
result = self.client.execute("select * from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.client, 'test_query_live',
|
|
result.runtime_profile)
|
|
self.assert_describe_extended()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live "
|
|
"--use_local_catalog=true",
|
|
catalogd_args="--catalog_topic_mode=minimal",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_local_catalog(self):
|
|
"""Asserts the query live table works with local catalog mode."""
|
|
result = self.client.execute("select * from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.client, 'test_query_live',
|
|
result.runtime_profile)
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live "
|
|
"--redaction_rules_file={}"
|
|
.format(redaction_rules_file()),
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_redaction(self):
|
|
"""Asserts the query live table table redacts the statement."""
|
|
result = self.client.execute(
|
|
"select *, 'supercalifragilisticexpialidocious' from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.client, 'test_query_live',
|
|
result.runtime_profile)
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_alter(self):
|
|
"""Asserts alter works on query live table."""
|
|
column_desc = 'test_alter\tstring\t'
|
|
|
|
add_column = self.execute_query(
|
|
'alter table sys.impala_query_live add columns(test_alter string)')
|
|
assert add_column.data == ['New column(s) have been added to the table.']
|
|
|
|
try:
|
|
describe_column = self.execute_query('describe sys.impala_query_live')
|
|
assert len(describe_column.data) == 57
|
|
assert column_desc in describe_column.data
|
|
|
|
select_column = self.execute_query(
|
|
'select test_alter from sys.impala_query_live limit 1')
|
|
assert select_column.data == ['NULL']
|
|
self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 56\)'
|
|
+ ' added to table IMPALA_QUERY_LIVE; check if a coordinator was upgraded')
|
|
finally:
|
|
# Ensure new column is dropped in case of test failure
|
|
drop_column = self.execute_query(
|
|
'alter table sys.impala_query_live drop test_alter')
|
|
|
|
assert drop_column.data == ['Column has been dropped.']
|
|
|
|
describe_column2 = self.execute_query('describe sys.impala_query_live')
|
|
assert len(describe_column2.data) == 56
|
|
assert column_desc not in describe_column2.data
|
|
|
|
select_column2 = self.execute_query('select * from sys.impala_query_live')
|
|
assert len(select_column2.data) > 1
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
disable_log_buffering=True)
|
|
def test_dedicated_coordinators(self):
|
|
"""Asserts scans are performed only on coordinators."""
|
|
# Use a query that reads data from disk for the 1st one, as more representative and a
|
|
# better fit for assert_query.
|
|
result = self.client.execute("select * from functional.alltypes",
|
|
fetch_profile_after_close=True)
|
|
assert_query('sys.impala_query_live', self.client, 'test_query_live',
|
|
result.runtime_profile)
|
|
|
|
client2 = self.create_client_for_nth_impalad(1)
|
|
query = "select query_id, impala_coordinator from sys.impala_query_live " \
|
|
"order by start_time_utc"
|
|
handle1 = self.execute_query_async(query)
|
|
handle2 = client2.execute_async(query)
|
|
result1 = self.client.fetch(query, handle1)
|
|
result2 = client2.fetch(query, handle2)
|
|
assert len(result1.data) == 4
|
|
assert result1.data == result2.data
|
|
|
|
profile1 = self.client.get_runtime_profile(handle1)
|
|
self.assert_only_coordinators(profile1)
|
|
profile2 = client2.get_runtime_profile(handle2)
|
|
self.assert_only_coordinators(profile2)
|
|
|
|
self.close_query(handle1)
|
|
client2.close_query(handle2)
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
disable_log_buffering=True)
|
|
def test_executor_groups(self):
|
|
"""Asserts scans are performed only on coordinators with multiple executor groups."""
|
|
# Add a (non-dedicated) coordinator and executor in a different executor group.
|
|
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra '
|
|
'--enable_workload_mgmt '
|
|
'--query_log_write_interval_s=300 '
|
|
'--shutdown_grace_period_s=0 '
|
|
'--shutdown_deadline_s=15 '
|
|
'--cluster_id=test_query_live '],
|
|
cluster_size=1,
|
|
add_executors=True,
|
|
expected_num_impalads=4)
|
|
|
|
result = self.client.execute(
|
|
"select query_id, impala_coordinator from sys.impala_query_live",
|
|
fetch_profile_after_close=True)
|
|
assert len(result.data) == 1
|
|
self.assert_only_coordinators(result.runtime_profile, coords=[0, 1], execs=[2, 3])
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_query_entries_are_unique(self):
|
|
"""Asserts queries in the query live table are unique."""
|
|
# Start a query and close it with a delay between CloseClientRequestState and
|
|
# Unregister. Then query sys.impala_query_live from multiple coordinators.
|
|
client2 = self.create_client_for_nth_impalad(1)
|
|
|
|
async_handle = self.execute_query_async("select count(*) from functional.alltypes",
|
|
{"debug_action": "CLOSED_NOT_UNREGISTERED:SLEEP@1000"})
|
|
self.close_query(async_handle)
|
|
|
|
query = "select * from sys.impala_query_live order by start_time_utc"
|
|
handle = self.execute_query_async(query)
|
|
handle2 = client2.execute_async(query)
|
|
result = self.client.fetch(query, handle)
|
|
result2 = client2.fetch(query, handle2)
|
|
assert len(result.data) == 3
|
|
assert result.data[0] == result2.data[0]
|
|
|
|
def remove_dynamic_fields(fields):
|
|
# Excludes QUERY_STATE, IMPALA_QUERY_END_STATE, QUERY_TYPE, TOTAL_TIME_MS, and
|
|
# everything after QUERY_OPTS_CONFIG as they change over the course of compiling
|
|
# and running the query.
|
|
return fields[:10] + fields[13:15] + fields[16:17]
|
|
|
|
# Compare cluster_id and query_id. Not all fields will match as they're queried on a
|
|
# live query at different times. Order is only guaranteed with minicluster on a
|
|
# single machine, where they use the same clock.
|
|
data1a = remove_dynamic_fields(result.data[1].split('\t'))
|
|
data1b = remove_dynamic_fields(result.data[2].split('\t'))
|
|
data2a = remove_dynamic_fields(result2.data[1].split('\t'))
|
|
data2b = remove_dynamic_fields(result2.data[2].split('\t'))
|
|
assert data1a == data2a
|
|
assert data1b == data2b
|
|
|
|
self.close_query(handle)
|
|
client2.close_query(handle2)
|
|
client2.close()
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live ",
|
|
workload_mgmt=True,
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
disable_log_buffering=True)
|
|
def test_missing_coordinator(self):
|
|
"""Asserts scans finish if a coordinator disappears mid-schedule. Depends on
|
|
test config of statestore_heartbeat_frequency_ms=50."""
|
|
query = "select query_id, impala_coordinator from sys.impala_query_live"
|
|
handle = self.execute_query_async(query, query_options={
|
|
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'})
|
|
|
|
# Wait for query to compile and assign ranges, then kill impalad
|
|
# in non-graceful manner during debug delay.
|
|
self.client.wait_for_impala_state(handle, PENDING, 3)
|
|
self.cluster.impalads[1].kill()
|
|
|
|
# Wait for query to pass admission control before fetching.
|
|
self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
|
|
result = self.client.fetch(query, handle)
|
|
assert len(result.data) == 1
|
|
expected_message = 'is no longer available for system table scan assignment'
|
|
self.assert_impalad_log_contains('WARNING', expected_message)
|
|
assert expected_message in self.client.get_runtime_profile(handle)
|
|
|
|
self.close_query(handle)
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_shutdown_coordinator(self):
|
|
"""Asserts query fails if a coordinator disappears after scheduling. Depends on
|
|
test config of statestore_heartbeat_frequency_ms=50."""
|
|
query = "select query_id, impala_coordinator from sys.impala_query_live"
|
|
handle = self.execute_query_async(query, query_options={
|
|
'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
|
|
|
|
# Wait for query to compile.
|
|
self.client.wait_for_impala_state(handle, PENDING, 3)
|
|
# Ensure enough time for scheduling to assign ranges.
|
|
sleep(1)
|
|
# Kill impalad in non-graceful manner during debug delay.
|
|
self.cluster.impalads[1].kill()
|
|
|
|
try:
|
|
# Wait for query to pass admission control before fetching.
|
|
self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
|
|
self.client.fetch(query, handle)
|
|
assert False, "fetch should fail"
|
|
except Exception as e:
|
|
assert "Network error: Client connection negotiation failed" in str(e)
|
|
# client closes the query on failure.
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live "
|
|
# Override flag set by
|
|
# workload_mgmt arg.
|
|
"--shutdown_grace_period_s=120",
|
|
workload_mgmt=True,
|
|
disable_log_buffering=True)
|
|
def test_graceful_shutdown_coordinator(self):
|
|
"""Asserts query succeeds if another coordinator is shutdown gracefully after
|
|
scheduling. Depends on test config of statestore_heartbeat_frequency_ms=50."""
|
|
query = "select query_id from sys.impala_query_live"
|
|
handle = self.execute_query_async(query, query_options={
|
|
'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'})
|
|
|
|
# Wait for query to compile and assign ranges, then gracefully shutdown impalad.
|
|
self.client.wait_for_impala_state(handle, PENDING, 3)
|
|
self.cluster.impalads[1].kill(SIGRTMIN)
|
|
|
|
self.client.wait_for_impala_state(handle, FINISHED, 10)
|
|
# Allow time for statestore update to propagate. Shutdown grace period is 120s.
|
|
sleep(1)
|
|
# Coordinator in graceful shutdown should not be scheduled in new queries.
|
|
shutdown = self.execute_query(query)
|
|
|
|
result = self.client.fetch(query, handle)
|
|
assert len(result.data) == 1
|
|
assert result.query_id == result.data[0]
|
|
self.client.get_runtime_profile(handle)
|
|
self.assert_impalads(self.client.get_runtime_profile(handle))
|
|
self.close_query(handle)
|
|
|
|
assert len(shutdown.data) == 2
|
|
self.assert_impalads(shutdown.runtime_profile, present=[0, 2], absent=[1])
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_live ",
|
|
workload_mgmt=True,
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
disable_log_buffering=True)
|
|
def test_multi_table_union(self):
|
|
"""Asserts only system table scan fragments are scheduled to coordinators."""
|
|
utc_timestamp = self.execute_query('select utc_timestamp()')
|
|
assert len(utc_timestamp.data) == 1
|
|
start_time = utc_timestamp.data[0]
|
|
|
|
# Wait for the query to be written to the log to ensure there's something in it."
|
|
logged = self.execute_query_expect_success(self.client,
|
|
'select count(*) from functional.alltypes')
|
|
self.cluster.get_first_impalad().service.wait_for_metric_value(
|
|
"impala-server.completed-queries.written", 2, 30, allow_greater=True)
|
|
|
|
query = """select query_id from
|
|
(select query_id, start_time_utc from sys.impala_query_live live
|
|
where start_time_utc > "{0}" union
|
|
select query_id, start_time_utc from sys.impala_query_log
|
|
where start_time_utc > "{0}")
|
|
as history order by start_time_utc""".format(start_time)
|
|
result = self.client.execute(query, fetch_profile_after_close=True)
|
|
assert len(result.data) == 3
|
|
assert utc_timestamp.query_id == result.data[0]
|
|
assert logged.query_id == result.data[1]
|
|
assert result.query_id == result.data[2]
|
|
|
|
# Unions run in a single fragment, and on the union of selected scan nodes (even
|
|
# though some may not have actual scan ranges to evaluate). So all nodes are involved
|
|
# in scans.
|
|
self.assert_fragment_instances(result.runtime_profile, [3, 2, 2])
|
|
|
|
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 "
|
|
"--cluster_id=test_query_live ",
|
|
workload_mgmt=True,
|
|
cluster_size=3,
|
|
num_exclusive_coordinators=2,
|
|
disable_log_buffering=True)
|
|
def test_multi_table_join(self, unique_database):
|
|
"""Asserts only system table scan fragments are scheduled to coordinators."""
|
|
self.execute_query('create table {}.users (user string)'.format(unique_database))
|
|
self.execute_query('insert into {}.users values ("alice"), ("bob"), ("{}")'
|
|
.format(unique_database, getuser()))
|
|
|
|
result = self.client.execute('select straight_join count(*) from {}.users, '
|
|
'sys.impala_query_live where user = db_user and start_time_utc > utc_timestamp()'
|
|
.format(unique_database), fetch_profile_after_close=True)
|
|
assert len(result.data) == 1
|
|
assert '1' == result.data[0]
|
|
|
|
# HDFS scan runs on 1 node, System Table scan on 2
|
|
assert 2 == result.runtime_profile.count('HDFS_SCAN_NODE')
|
|
assert 3 == result.runtime_profile.count('SYSTEM_TABLE_SCAN_NODE')
|
|
|
|
# impala_query_live is assigned to build side, so executor has HDFS scan fragment and
|
|
# aggregation, coordinators have System Table scan fragments, and initial coordinator
|
|
# has the root fragment.
|
|
self.assert_fragment_instances(result.runtime_profile, [2, 1, 1])
|