mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-14091: Migrate test_query_retries.py to HS2
test_query_retries.py still pinned to test using beeswax protocol by default. This patch refactor to test using hs2 protocol. Testing: - Run and pass test_query_retries.py in exhaustive mode. Change-Id: If12eeb47b843f0d1faca47994b2001e6d4c8ac58 Reviewed-on: http://gerrit.cloudera.org:8080/22939 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
2a680b302e
commit
f8a1f6046a
@@ -758,6 +758,11 @@ class ImpylaHS2Connection(ImpalaConnection):
|
||||
id = session_handle_to_session_id(cursor.session.handle)
|
||||
return "" if id is None else id
|
||||
|
||||
def session_id(self, operation_handle):
|
||||
cursor = operation_handle.get_handle()
|
||||
session_id = self.__get_session_id(cursor)
|
||||
return session_id if session_id else str(cursor.session)
|
||||
|
||||
def handle_id(self, operation_handle):
|
||||
query_id = self.get_query_id(operation_handle)
|
||||
return query_id if query_id else str(operation_handle)
|
||||
|
||||
@@ -46,7 +46,6 @@ from tests.common.skip import (
|
||||
SkipIfNotHdfsMinicluster,
|
||||
)
|
||||
from tests.common.test_dimensions import add_mandatory_exec_option
|
||||
from tests.common.test_vector import BEESWAX
|
||||
|
||||
# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
|
||||
FAILED_KRPC_PORT = 27001
|
||||
@@ -70,11 +69,6 @@ def _get_disk_fail_action(port):
|
||||
@SkipIfEC.parquet_file_size
|
||||
class TestQueryRetries(CustomClusterTestSuite):
|
||||
|
||||
@classmethod
|
||||
def default_test_protocol(cls):
|
||||
# Retry mechanism is slightly different between beeswax vs hs2 protocol.
|
||||
return BEESWAX
|
||||
|
||||
# A query that shuffles a lot of data. Useful when testing query retries since it
|
||||
# ensures that a query fails during a TransmitData RPC. The RPC failure will cause the
|
||||
# target impalad to be blacklisted and the query to be retried. The query also has to
|
||||
@@ -156,6 +150,7 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
# Launch a query, it should be retried.
|
||||
handle = self.execute_query_async(self._shuffle_heavy_query,
|
||||
query_options={'retry_failed_queries': 'true'})
|
||||
query_id = self.client.handle_id(handle)
|
||||
self.client.wait_for_impala_state(handle, RUNNING, 60)
|
||||
|
||||
# Kill a random impalad.
|
||||
@@ -203,7 +198,7 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
# Assert that the second most recently completed query is the original query and it is
|
||||
# marked as 'RETRIED'.
|
||||
assert completed_queries[1]['state'] == 'RETRIED'
|
||||
assert completed_queries[1]['query_id'] == self.client.handle_id(handle)
|
||||
assert completed_queries[1]['query_id'] == query_id
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
@@ -396,6 +391,7 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
handle = self.execute_query_async(query,
|
||||
query_options={'retry_failed_queries': 'true',
|
||||
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
|
||||
query_id = self.client.handle_id(handle)
|
||||
# Wait until the query fails.
|
||||
self.client.wait_for_impala_state(handle, ERROR, 140)
|
||||
|
||||
@@ -424,6 +420,8 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
"queries scheduled only on the coordinator (either NUM_NODES set to 1 " \
|
||||
"or when small query optimization is triggered) can currently run" in str(e)
|
||||
assert "Additional Details: Not Applicable" in str(e)
|
||||
finally:
|
||||
self.client.close_query(handle)
|
||||
|
||||
# Assert that the RPC un-reachable impalad not shows up in the list of blacklisted
|
||||
# executors from the runtime profile.
|
||||
@@ -432,8 +430,7 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
|
||||
# Assert that the query id of the original query is in the runtime profile of the
|
||||
# retried query.
|
||||
self.__validate_original_id_in_profile(retried_runtime_profile,
|
||||
self.client.handle_id(handle))
|
||||
self.__validate_original_id_in_profile(retried_runtime_profile, query_id)
|
||||
|
||||
# Validate the state of the web ui. The query must be closed before validating the
|
||||
# state since it asserts that no queries are in flight.
|
||||
@@ -484,6 +481,8 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
assert False
|
||||
except IMPALA_CONNECTION_EXCEPTION as e:
|
||||
assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)
|
||||
finally:
|
||||
self.client.close_query(handle)
|
||||
|
||||
# Assert that the killed impalad shows up in the list of blacklisted executors from
|
||||
# the runtime profile.
|
||||
@@ -788,21 +787,28 @@ class TestQueryRetries(CustomClusterTestSuite):
|
||||
# Kill an impalad, and run a query. The query should be retried.
|
||||
self.cluster.impalads[1].kill()
|
||||
query = self._count_query
|
||||
handle = self.execute_query_async(query,
|
||||
query_options={'retry_failed_queries': 'true',
|
||||
'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
|
||||
query_id = self.client.handle_id(handle)
|
||||
self.hs2_client.set_configuration({
|
||||
'retry_failed_queries': 'true',
|
||||
'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
|
||||
handle = self.hs2_client.execute_async(query)
|
||||
query_id = self.hs2_client.handle_id(handle)
|
||||
session_id = self.hs2_client.session_id(handle)
|
||||
self.__wait_until_retry_state(query_id, 'RETRIED')
|
||||
|
||||
# SetQueryInflight will complete before execute_query_async returns because it will
|
||||
# be completed before Impala acknowledges that the query has started.
|
||||
page = self.cluster.get_first_impalad().service.get_debug_webpage_json('sessions')
|
||||
session_found = False
|
||||
for session in page['sessions']:
|
||||
# Every session should have one completed query: either test setup, or the original
|
||||
# query that's being retried.
|
||||
assert session['inflight_queries'] < session['total_queries']
|
||||
if session['session_id'] == session_id:
|
||||
# Session 'session_id' should have one completed query: either test setup, or
|
||||
# the original query that's being retried.
|
||||
assert session['inflight_queries'] < session['total_queries']
|
||||
session_found = True
|
||||
self.hs2_client.close_query(handle)
|
||||
self.hs2_client.clear_configuration()
|
||||
assert session_found, "session_id {} not found at {}".format(session_id, page)
|
||||
|
||||
self.client.close_query(handle)
|
||||
# If original query state closure is skipped, the coordinator will crash on a DCHECK.
|
||||
time.sleep(2)
|
||||
assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
|
||||
|
||||
Reference in New Issue
Block a user