Files
impala/tests/query_test/test_observability.py
Thomas Tauber-Marshall a018038df5 IMPALA-6338: Fix flaky test_profile_fragment_instances
test_profile_fragment_instances checks that, once all the results have
been returned, every fragment instance appears in the query profile
for a query that internally cancels fragment instances that are still
executing when the results have been fully returned.

Every fis is guaranteed to send a profile to the coordinator in
Finalize(), but previously fragment profiles were not applied by the
coordinator if the backend was 'done', defined as either all instances
having completed or one has entered an error state (including
cancelled).

So, the test could fail by the following sequence:
- Some fragment for a particular backend sends an update to the
  coordinator. 'returned_all_results_' is true, so the coordinator
  responds indicating the the backend should cancel its remaining
  fragments.
- Another fragment from that backend executes Finalize() and reports
  that it was cancelled. This causes the coordinator to consider the
  entire backend to be 'done'.
- A third fragment, which had not previously sent a report from the
  reporting thread, from the same backend executes Finalize(). This
  report will not be applied by the coordinator as the backend is
  considered 'done', so this fragment will not appear in the final
  profile.

The solution is to change the definition of 'done' to not include a
backend that has been cancelled but still has fragments that haven't
completed. This guarantees that for queries that complete successfully
and are cancelled internally, all fis will send a report and have it
applied by the coordinator before all results have been returned,
since if eos is true Coordinator::GetNext() calls
WaitForBackendCompletion(), which in this situation will now wait for
all fis to Finalize().

Returning results for queries that are cancelled by the user is
unaffected as the manual cancel path causes WaitForBackendCompletion().

Testing:
- Ran test_profile_fragment_instances in a loop with no failures.
  I can reliably repro the original problem with a few carefully
  placed sleeps.

Change-Id: I77773a1e3c62952003f37f88fe2b662bb11889ed
Reviewed-on: http://gerrit.cloudera.org:8080/8997
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
2018-02-02 23:22:51 +00:00

197 lines
9.3 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.impala_cluster import ImpalaCluster
import logging
import pytest
import time
class TestObservability(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
def test_merge_exchange_num_rows(self):
"""Regression test for IMPALA-1473 - checks that the exec summary for a merging
exchange with a limit reports the number of rows returned as equal to the limit,
and that the coordinator fragment portion of the runtime profile reports the number
of rows returned correctly."""
query = """select tinyint_col, count(*) from functional.alltypes
group by tinyint_col order by tinyint_col limit 5"""
result = self.execute_query(query)
assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
assert result.exec_summary[0]['num_rows'] == 5
assert result.exec_summary[0]['est_num_rows'] == 5
for line in result.runtime_profile.split('\n'):
# The first 'RowsProduced' we find is for the coordinator fragment.
if 'RowsProduced' in line:
assert '(5)' in line
break
def test_broadcast_num_rows(self):
"""Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node
in the exec summaty is correctly set as the max over all instances, not the sum."""
query = """select distinct a.int_col, a.string_col from functional.alltypes a
inner join functional.alltypessmall b on (a.id = b.id)
where a.year = 2009 and b.month = 2"""
result = self.execute_query(query)
assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
assert result.exec_summary[5]['num_rows'] == 25
assert result.exec_summary[5]['est_num_rows'] == 25
@SkipIfS3.hbase
@SkipIfLocal.hbase
@SkipIfIsilon.hbase
@SkipIfADLS.hbase
def test_scan_summary(self):
"""IMPALA-4499: Checks that the exec summary for scans show the table name."""
# HDFS table
query = "select count(*) from functional.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS'
assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny'
# KUDU table
query = "select count(*) from functional_kudu.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU'
assert result.exec_summary[scan_idx]['detail'] == 'functional_kudu.alltypestiny'
# HBASE table
query = "select count(*) from functional_hbase.alltypestiny"
result = self.execute_query(query)
scan_idx = len(result.exec_summary) - 1
assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE'
assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny'
def test_query_states(self):
"""Tests that the query profile shows expected query states."""
query = "select count(*) from functional.alltypes"
handle = self.execute_query_async(query, dict())
profile = self.client.get_runtime_profile(handle)
# If ExecuteStatement() has completed but the results haven't been fetched yet, the
# query must have at least reached RUNNING.
assert "Query State: RUNNING" in profile or \
"Query State: FINISHED" in profile, profile
results = self.client.fetch(query, handle)
profile = self.client.get_runtime_profile(handle)
# After fetching the results, the query must be in state FINISHED.
assert "Query State: FINISHED" in profile, profile
def test_query_options(self):
"""Test that the query profile shows expected non-default query options, both set
explicitly through client and those set by planner"""
# Set a query option explicitly through client
self.execute_query("set MEM_LIMIT = 8589934592")
# Make sure explicitly set default values are not shown in the profile
self.execute_query("set runtime_filter_wait_time_ms = 0")
runtime_profile = self.execute_query("select 1").runtime_profile
assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in runtime_profile
# For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1,
# RUNTIME_FILTER_MODE=0 and MT_DOP=0
assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \
"NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \
in runtime_profile
@SkipIfLocal.multiple_impalad
def test_profile_fragment_instances(self):
"""IMPALA-6081: Test that the expected number of fragment instances and their exec
nodes appear in the runtime profile, even when fragments may be quickly cancelled when
all results are already returned."""
results = self.execute_query("""
with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""")
# There are 3 scan nodes and each appears in the profile 4 times (for 3 fragment
# instances + the averaged fragment).
assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12, results.runtime_profile
# There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment
# instance + the averaged fragment).
assert results.runtime_profile.count("EXCHANGE_NODE") == 6, results.runtime_profile
# The following appear only in the root fragment which has 1 instance.
assert results.runtime_profile.count("HASH_JOIN_NODE") == 2, results.runtime_profile
assert results.runtime_profile.count("AGGREGATION_NODE") == 2, results.runtime_profile
assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2, results.runtime_profile
# IMPALA-6399: Run this test serially to avoid a delay over the wait time in fetching
# the profile.
@pytest.mark.execute_serially
def test_query_profile_thrift_timestamps(self):
"""Test that the query profile start and end time date-time strings have
nanosecond precision. Nanosecond precision is expected by management API clients
that consume Impala debug webpages."""
query = "select sleep(1000)"
handle = self.client.execute_async(query)
query_id = handle.get_handle().id
results = self.client.fetch(query, handle)
self.client.close()
MAX_WAIT = 300
start = time.time()
end = start + MAX_WAIT
while time.time() <= end:
# Sleep before trying to fetch the profile. This helps to prevent a warning when the
# profile is not yet available immediately. It also makes it less likely to
# introduce an error below in future changes by forgetting to sleep.
time.sleep(1)
tree = self.impalad_test_service.get_thrift_profile(query_id)
if not tree:
continue
# tree.nodes[1] corresponds to ClientRequestState::summary_profile_
# See be/src/service/client-request-state.[h|cc].
start_time = tree.nodes[1].info_strings["Start Time"]
end_time = tree.nodes[1].info_strings["End Time"]
# Start and End Times are of the form "2017-12-07 22:26:52.167711000"
start_time_sub_sec_str = start_time.split('.')[-1]
end_time_sub_sec_str = end_time.split('.')[-1]
if len(end_time_sub_sec_str) == 0:
elapsed = time.time() - start
logging.info("end_time_sub_sec_str hasn't shown up yet, elapsed=%d", elapsed)
continue
assert len(end_time_sub_sec_str) == 9, end_time
assert len(start_time_sub_sec_str) == 9, start_time
return True
# If we're here, we didn't get the final thrift profile from the debug web page.
# This could happen due to heavy system load. The test is then inconclusive.
# Log a message and fail this run.
dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format(
query_id, MAX_WAIT)
assert False, dbg_str
def test_query_profile_contains_instance_events(self, unique_database):
"""Test that /query_profile_encoded contains an event timeline for fragment
instances, even when there are errors."""
events = ["Fragment Instance Lifecycle Event Timeline",
"Prepare Finished",
"First Batch Produced",
"First Batch Sent",
"ExecInternal Finished"]
query = "select count(*) from functional.alltypes"
runtime_profile = self.execute_query(query).runtime_profile
for event in events:
assert event in runtime_profile