IMPALA-9225: Query option for retryable queries to spool all results before returning any to the client

If we have returned any results to the client in the original query,
query retry will be skipped to avoid incorrect results. This patch adds
a query option, spool_all_results_for_retries, for retryable queries to
spool all results before returning any to the client. It defaults to
true. If all query results cannot be contained in the allocated result
spooling space, we'll return results and thus disabled query retry on
the query.

Setting spool_all_results_for_retries to false will fallback to the
original behavior - client can fetch results when any of them are ready.
So we explicitly set it to false in the retried query since it won't be
retried. For non retryable queries or queries that don't enable results
spooling, the spool_all_results_for_retries option takes no effect.

To implement this, this patch defers the time when results are ready to
be fetched. By default, the “rows available” event happens when any
results are ready. For a retryable query, when spool_query_results and
spool_all_results_for_retries are both true, the “rows available” event
happens after all results are spooled or any errors stopping us to do
so, e.g. batch queue is full, cancellation or failures. After waiting
for the root fragment instance’s Open() finishes, the coordinator will
wait until results of BufferedPlanRootSink are ready.
BufferedPlanRootSink sets the results ready signal in its Send(),
Close(), Cancel(), FlushFinal() methods.

Tests:
- Add a test to verify that a retryable query will spool all its results
  when results spooling and spool_all_results_for_retries are enabled.
- Add a test to verify that query retry succeeds when a retryable query
  is still spooling its results (spool_all_results_for_retries=true).
- Add a test to verify that the retried query won't spool all results
  even when results spooling and spool_all_results_for_retries are
  enabled in the original query.
- Add a test to verify that the original query can be canceled
  correctly. We need this because the added logics for
  spool_all_results_for_retries are related to the cancellation code
  path.
- Add a test to verify results will be returned when all of them can't
  fit into the result spooling space, and query retry will be skipped.

Change-Id: I462dbfef9ddab9060b30a6937fca9122484a24a5
Reviewed-on: http://gerrit.cloudera.org:8080/16323
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2020-08-11 16:35:15 +08:00
committed by Impala Public Jenkins
parent 0fcf846592
commit 61dcc805e5
11 changed files with 244 additions and 8 deletions

View File

@@ -55,6 +55,13 @@ class TestQueryRetries(CustomClusterTestSuite):
"\tegular courts above the\t1\t15635\t638\t6\t32.00\t49620.16\t0.07\t0.02\tN\tO" \
"\t1996-01-30\t1996-02-07\t1996-02-03\tDELIVER IN PERSON\tMAIL\tarefully slyly ex"
# The following query has two union operands. The first operand executes quickly
# and the second one executes slowly. So we can kill one impalad when some results
# are ready and the query is still running and has more results.
_union_query = """select count(*) from functional.alltypestiny
union all
select count(*) from functional.alltypes where bool_col = sleep(50)"""
@classmethod
def get_workload(cls):
return 'functional-query'
@@ -327,7 +334,7 @@ class TestQueryRetries(CustomClusterTestSuite):
running the query, and the validate that another fetch request fails."""
query = "select * from functional.alltypes where bool_col = sleep(500)"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'batch_size': '1'})
query_options={'retry_failed_queries': 'true', 'batch_size': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
self.client.fetch(query, handle, max_rows=1)
@@ -335,12 +342,158 @@ class TestQueryRetries(CustomClusterTestSuite):
self.cluster.impalads[1].kill()
time.sleep(5)
# Assert than attempt to fetch from the query handle fails.
# Assert that attempt to fetch from the query handle fails.
try:
self.client.fetch(query, handle)
assert False
except Exception, e:
assert "Failed due to unreachable impalad" in str(e)
except Exception as e:
assert "Failed due to unreachable impalad" in str(e)
assert "Skipping retry of query_id=%s because the client has already " \
"fetched some rows" % handle.get_handle().id in str(e)
@pytest.mark.execute_serially
def test_spooling_all_results_for_retries(self):
"""Test retryable queries with spool_all_results_for_retries=true will spool all
results when results spooling is enabled."""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Fetch one row first.
results = self.client.fetch(self._union_query, handle, max_rows=1)
assert len(results.data) == 1
assert int(results.data[0]) == 8
# All results are spooled since we are able to fetch some results.
# Killing an impalad should not trigger query retry.
self.__kill_random_impalad()
time.sleep(5)
# We are still able to fetch the remaining results.
results = self.client.fetch(self._union_query, handle)
assert len(results.data) == 1
assert int(results.data[0]) == 3650
# Verify no retry happens
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is None
runtime_profile = self.client.get_runtime_profile(handle)
assert self.__get_query_id_from_profile(runtime_profile) == handle.get_handle().id
self.client.close_query(handle)
@pytest.mark.execute_serially
def test_query_retry_in_spooling(self):
"""Test retryable queries with results spooling enabled and
spool_all_results_for_retries=true can be safely retried for failures that happen when
it's still spooling the results"""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Wait until the first union operand finishes, so some results are spooled.
self.wait_for_progress(handle, 0.1, 60)
self.__kill_random_impalad()
# Still able to fetch the correct result since the query is retried.
results = self.client.fetch(self._union_query, handle)
assert len(results.data) == 2
assert int(results.data[0]) == 8
assert int(results.data[1]) == 3650
# Verify the query has been retried
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
self.client.close_query(handle)
@pytest.mark.execute_serially
def test_retried_query_not_spooling_all_results(self):
"""Test retried query can return results immediately even when results spooling and
spool_all_results_for_retries are enabled in the original query."""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Wait until the first union operand finishes and then kill one impalad.
self.wait_for_progress(handle, 0.1, 60)
# Kill one impalad so the query will be retried.
self.__kill_random_impalad()
time.sleep(5)
# Verify that we are able to fetch results of the first union operand while the query
# is still executing the second union operand.
results = self.client.fetch(self._union_query, handle, max_rows=1)
assert len(results.data) == 1
assert int(results.data[0]) == 8
# Assert that the query is still executing the second union operand.
summary = self.client.get_exec_summary(handle)
assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
self.client.close_query(handle)
@pytest.mark.execute_serially
def test_query_retry_reaches_spool_limit(self):
"""Test retryable queries with results spooling enabled and
spool_all_results_for_retries=true that reach spooling mem limit will return rows and
skip retry"""
query = "select * from functional.alltypes where bool_col = sleep(500)"
# Set lower values for spill-to-disk configs to force the above query to spill
# spooled results and hit result queue limit.
handle = self.execute_query_async(query, query_options={
'batch_size': 1,
'spool_query_results': True,
'retry_failed_queries': True,
'spool_all_results_for_retries': True,
'min_spillable_buffer_size': 8 * 1024,
'default_spillable_buffer_size': 8 * 1024,
'max_result_spooling_mem': 8 * 1024,
'max_spilled_result_spooling_mem': 8 * 1024})
# Wait until we can fetch some results
results = self.client.fetch(query, handle, max_rows=1)
assert len(results.data) == 1
# Assert that the query is still executing
summary = self.client.get_exec_summary(handle)
assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
self.assert_impalad_log_contains('INFO', 'Cannot spool all results in the allocated'
' result spooling space. Query retry will be skipped if any results have been '
'returned.', expected_count=1)
# Kill one impalad and assert that the query is not retried.
self.__kill_random_impalad()
try:
self.client.fetch(query, handle)
assert False, "fetch should fail"
except ImpalaBeeswaxException as e:
assert "Failed due to unreachable impalad" in str(e)
assert "Skipping retry of query_id=%s because the client has already " \
"fetched some rows" % handle.get_handle().id in str(e)
@pytest.mark.execute_serially
def test_original_query_cancel(self):
"""Test canceling a retryable query with spool_all_results_for_retries=true. Make sure
Coordinator::Wait() won't block in cancellation."""
for state in ['RUNNING', 'FINISHED']:
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)
# Cancel the query.
self.client.cancel(handle)
# Assert that attempt to fetch from the query handle fails with a cancellation
# error
try:
self.client.fetch(self._union_query, handle)
assert False
except Exception as e:
assert "Cancelled" in str(e)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(