mirror of
https://github.com/apache/impala.git
synced 2026-01-25 18:01:04 -05:00
This patch adds a state machine for the QueryState class. The motivation behind this patch is to make the query lifecycle from the point of view of an executor much easier to reason about and this patch is key for a follow on patch for IMPALA-2990 where the status reporting will be per-query rather than per-fragment-instance. Currently, the state machine provides no other purpose, and it will mostly be used for IMPALA-2990. We introduce 5 possible states for the QueryState which include 3 terminal states (FINISHED, CANCELLED and ERROR) and 2 non-terminal states (PREPARING, EXECUTING). The transition from one state to the next is always handled by a single thread which is also the QueryState thread. This thread will additionally bear the purpose of sending periodic updates after IMPALA-4063, which is the primary reason behind having only this thread modify the state of the query. Counting barriers are introduced to keep a count of how many fragment instances have finished Preparing and Executing. These barriers also block until all the fragment instances have finished a respective state. The fragment instances update the query wide query status if an error is hit and unblocks the barrier if it is in the EXECUTING state. The PREPARING state blocks regardless of whether a fragment instance hit an error or not, until all the fragment instances have completed successfully or unsuccessfully, to maintain the invariant that fragment instances cannot be cancelled until the entire QueryState has finished PREPARING. The status reporting protocol has not been changed and remains exactly as it was. Testing: - Added 3 failure points in the query lifecycle using debug actions and added tests to validate the same (extension of IMPALA-7376). - Ran 'core' and 'exhaustive' tests. Future related work: 1) IMPALA-2990: Make status reporting per-query. 2) Try to logically align the FIS states with the QueryState states. 3) Consider mirroring the QueryState state machine to CoordinatorBackendState Change-Id: Iec5670a7db83ecae4656d7bb2ea372d3767ba7fe Reviewed-on: http://gerrit.cloudera.org:8080/10813 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
195 lines
8.5 KiB
Python
195 lines
8.5 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Injects failures at specific locations in each of the plan nodes. Currently supports
|
|
# two types of failures - cancellation of the query and a failure test hook.
|
|
#
|
|
import pytest
|
|
import os
|
|
import re
|
|
from collections import defaultdict
|
|
from time import sleep
|
|
|
|
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
|
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
|
|
from tests.common.skip import SkipIf, SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
|
from tests.common.test_dimensions import create_exec_option_dimension
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
|
|
FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
|
|
FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
|
|
# Map debug actions to their corresponding query options' values.
|
|
FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
|
|
'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
|
|
MT_DOP_VALUES = [0, 4]
|
|
|
|
# Queries should cover all exec nodes.
|
|
QUERIES = [
|
|
"select * from alltypes",
|
|
"select count(*) from alltypessmall",
|
|
"select count(int_col) from alltypessmall group by id",
|
|
"select 1 from alltypessmall a join alltypessmall b on a.id = b.id",
|
|
"select 1 from alltypessmall a join alltypessmall b on a.id != b.id",
|
|
"select 1 from alltypessmall order by id",
|
|
"select 1 from alltypessmall order by id limit 100",
|
|
"select * from alltypessmall union all select * from alltypessmall",
|
|
"select row_number() over (partition by int_col order by id) from alltypessmall",
|
|
"select c from (select id c from alltypessmall order by id limit 10) v where c = 1"
|
|
]
|
|
|
|
@SkipIf.skip_hbase # -skip_hbase argument specified
|
|
@SkipIfS3.hbase # S3: missing coverage: failures
|
|
@SkipIfADLS.hbase
|
|
@SkipIfIsilon.hbase # ISILON: missing coverage: failures.
|
|
@SkipIfLocal.hbase
|
|
class TestFailpoints(ImpalaTestSuite):
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestFailpoints, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('query', *QUERIES))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('action', *FAILPOINT_ACTIONS))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('location', *FAILPOINT_LOCATIONS))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_exec_option_dimension([0], [False], [0]))
|
|
|
|
# Don't create PREPARE:WAIT debug actions because cancellation is not checked until
|
|
# after the prepare phase once execution is started.
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: not (v.get_value('action') == 'CANCEL'
|
|
and v.get_value('location') == 'PREPARE'))
|
|
# Don't create CLOSE:WAIT debug actions to avoid leaking plan fragments (there's no
|
|
# way to cancel a plan fragment once Close() has been called)
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: not (v.get_value('action') == 'CANCEL'
|
|
and v.get_value('location') == 'CLOSE'))
|
|
|
|
# Run serially because this can create enough memory pressure to invoke the Linux OOM
|
|
# killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2.
|
|
@pytest.mark.execute_serially
|
|
def test_failpoints(self, vector):
|
|
query = vector.get_value('query')
|
|
action = vector.get_value('action')
|
|
location = vector.get_value('location')
|
|
vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
|
|
|
|
try:
|
|
plan_node_ids = self.__parse_plan_nodes_from_explain(query, vector)
|
|
except ImpalaBeeswaxException as e:
|
|
if "MT_DOP not supported" in str(e):
|
|
pytest.xfail(reason="MT_DOP not supported.")
|
|
else:
|
|
raise e
|
|
|
|
for node_id in plan_node_ids:
|
|
debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action])
|
|
# IMPALA-7046: add jitter to backend startup to exercise various failure paths.
|
|
debug_action += '|COORD_BEFORE_EXEC_RPC:JITTER@100@0.3'
|
|
|
|
LOG.info('Current debug action: SET DEBUG_ACTION=%s' % debug_action)
|
|
vector.get_value('exec_option')['debug_action'] = debug_action
|
|
|
|
if action == 'CANCEL':
|
|
self.__execute_cancel_action(query, vector)
|
|
elif action == 'FAIL' or action == 'MEM_LIMIT_EXCEEDED':
|
|
self.__execute_fail_action(query, vector)
|
|
else:
|
|
assert 0, 'Unknown action: %s' % action
|
|
|
|
# We should be able to execute the same query successfully when no failures are
|
|
# injected.
|
|
del vector.get_value('exec_option')['debug_action']
|
|
self.execute_query(query, vector.get_value('exec_option'))
|
|
|
|
def __parse_plan_nodes_from_explain(self, query, vector):
|
|
"""Parses the EXPLAIN <query> output and returns a list of node ids.
|
|
Expects format of <ID>:<NAME>"""
|
|
explain_result =\
|
|
self.execute_query("explain " + query, vector.get_value('exec_option'),
|
|
table_format=vector.get_value('table_format'))
|
|
node_ids = []
|
|
for row in explain_result.data:
|
|
match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row)
|
|
if match is not None:
|
|
node_ids.append(int(match.group('node_id')))
|
|
return node_ids
|
|
|
|
def test_lifecycle_failures(self):
|
|
"""Test that targeted failure injections in the query lifecycle do not cause crashes
|
|
or hangs"""
|
|
query = "select * from tpch.lineitem limit 10000"
|
|
|
|
# Fail the Prepare() phase of all fragment instances.
|
|
debug_action = 'FIS_IN_PREPARE:FAIL@1.0'
|
|
self.execute_query_expect_failure(self.client, query,
|
|
query_options={'debug_action':debug_action})
|
|
|
|
# Fail the Open() phase of all fragment instances.
|
|
debug_action = 'FIS_IN_OPEN:FAIL@1.0'
|
|
self.execute_query_expect_failure(self.client, query,
|
|
query_options={'debug_action': debug_action})
|
|
|
|
# Fail the ExecInternal() phase of all fragment instances.
|
|
debug_action = 'FIS_IN_EXEC_INTERNAL:FAIL@1.0'
|
|
self.execute_query_expect_failure(self.client, query,
|
|
query_options={'debug_action': debug_action})
|
|
|
|
# Fail the fragment instance thread creation with a 0.5 probability.
|
|
debug_action = 'FIS_FAIL_THREAD_CREATION:FAIL@0.5'
|
|
|
|
# We want to test the behavior when only some fragment instance threads fail to be
|
|
# created, so we set the probability of fragment instance thread creation failure to
|
|
# 0.5. Since there's only a 50% chance of fragment instance thread creation failure,
|
|
# we attempt to catch a query failure up to a very conservative maximum of 50 tries.
|
|
for i in range(50):
|
|
try:
|
|
self.execute_query(query,
|
|
query_options={'debug_action': debug_action})
|
|
except ImpalaBeeswaxException as e:
|
|
assert 'Query aborted:Debug Action: FIS_FAIL_THREAD_CREATION:FAIL@0.5' \
|
|
in str(e), str(e)
|
|
break
|
|
|
|
def __execute_fail_action(self, query, vector):
|
|
try:
|
|
self.execute_query(query, vector.get_value('exec_option'),
|
|
table_format=vector.get_value('table_format'))
|
|
assert 'Expected Failure'
|
|
except ImpalaBeeswaxException as e:
|
|
LOG.debug(e)
|
|
# IMPALA-5197: None of the debug actions should trigger corrupted file message
|
|
assert 'Corrupt Parquet file' not in str(e)
|
|
|
|
def __execute_cancel_action(self, query, vector):
|
|
LOG.info('Starting async query execution')
|
|
handle = self.execute_query_async(query, vector.get_value('exec_option'),
|
|
table_format=vector.get_value('table_format'))
|
|
LOG.info('Sleeping')
|
|
sleep(3)
|
|
cancel_result = self.client.cancel(handle)
|
|
self.client.close_query(handle)
|
|
assert cancel_result.status_code == 0,\
|
|
'Unexpected status code from cancel request: %s' % cancel_result
|