mirror of
https://github.com/apache/impala.git
synced 2025-12-23 21:08:39 -05:00
IMPALA-6171: Revert "IMPALA-1575: part 2: yield admission control resources"
This reverts commit fe90867d89.
Change-Id: I3eec4b5a6ff350933ffda0bb80949c5960ecdf25
Reviewed-on: http://gerrit.cloudera.org:8080/8499
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
committed by
Impala Public Jenkins
parent
19c17e64b5
commit
a772f84562
@@ -40,25 +40,24 @@ from TCLIService import TCLIService
|
||||
|
||||
LOG = logging.getLogger('admission_test')
|
||||
|
||||
# The query used for testing. It is important that this query be able to fetch many
|
||||
# rows. This allows a thread to stay active by fetching one row at a time. The
|
||||
# where clause is for debugging purposes; each thread will insert its id so
|
||||
# We set a WAIT debug action so it doesn't complete the execution of this query. The
|
||||
# limit is a parameter for debugging purposes; each thread will insert its id so
|
||||
# that running queries can be correlated with the thread that submitted them.
|
||||
QUERY = "select * from alltypes where id != %s"
|
||||
QUERY = "select * from alltypes where id != %s"# limit %s"
|
||||
|
||||
# Time to sleep (in milliseconds) between issuing queries. The default statestore
|
||||
# heartbeat is 500ms, so the lower the delay the more we can submit before the global
|
||||
# state is updated. When the delay is at least the statestore heartbeat frequency, all
|
||||
# state should be visible by every impalad by the time the next query is submitted.
|
||||
SUBMISSION_DELAY_MS = [0, 50, 100, 600]
|
||||
SUBMISSION_DELAY_MS = [50]
|
||||
|
||||
# The number of queries to submit. The test does not support fewer queries than
|
||||
# MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some validation logic
|
||||
# simple.
|
||||
NUM_QUERIES = [15, 30, 50]
|
||||
NUM_QUERIES = [50]
|
||||
|
||||
# Whether we will submit queries to all available impalads (in a round-robin fashion)
|
||||
ROUND_ROBIN_SUBMISSION = [True, False]
|
||||
ROUND_ROBIN_SUBMISSION = [True]
|
||||
|
||||
# The query pool to use. The impalads should be configured to recognize this
|
||||
# pool with the parameters below.
|
||||
@@ -87,9 +86,6 @@ _STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
|
||||
# Key in the query profile for the query options.
|
||||
PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
|
||||
|
||||
# The different ways that a query thread can end its query.
|
||||
QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE']
|
||||
|
||||
def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
|
||||
proc_mem_limit = None):
|
||||
if proc_mem_limit is not None:
|
||||
@@ -369,13 +365,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
|
||||
class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
"""Submits a number of queries (parameterized) with some delay between submissions
|
||||
(parameterized) and the ability to submit to one impalad or many in a round-robin
|
||||
fashion. Each query is submitted on a separate thread. After admission, the query
|
||||
thread will block with the query open and wait for the main thread to notify it to
|
||||
end its query. The query thread can end its query by fetching to the end, cancelling
|
||||
itself, closing itself, or waiting for the query timeout to take effect. Depending
|
||||
on the test parameters a varying number of queries will be admitted, queued, and
|
||||
rejected. After the queries are admitted, the main thread will request each admitted
|
||||
query thread to end its query and allow queued queries to be admitted.
|
||||
fashion. The queries are set with the WAIT debug action so that we have more control
|
||||
over the state that the admission controller uses to make decisions. Each query is
|
||||
submitted on a separate thread. Depending on the test parameters a varying number of
|
||||
queries will be admitted, queued, and rejected. Once queries are admitted, the query
|
||||
execution blocks and we can cancel the query in order to allow another queued query to
|
||||
be admitted.
|
||||
|
||||
The test tracks the state of the admission controller using the metrics from each
|
||||
impalad to do the following:
|
||||
@@ -383,15 +378,13 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
queued, and rejected requests should sum to the number of queries and that the
|
||||
values are reasonable given the test parameters.
|
||||
(2) While there are running queries:
|
||||
* Request the currently running queries to end and wait for the queries to end.
|
||||
Verify the metric for the number of completed queries. The threads that
|
||||
submitted those queries will keep their connections open until the entire test
|
||||
completes. This verifies that admission control is tied to the end of the query
|
||||
and does not depend on closing the connection.
|
||||
* Cancel the currently running queries (they are blocked with the WAIT debug action)
|
||||
and verify the metric for the number of completed queries. The threads that
|
||||
submitted those queries should complete.
|
||||
* Check that queued requests are then dequeued and verify using the metric for the
|
||||
number of dequeued requests. The threads that were waiting to submit the query
|
||||
should then insert themselves into a list of currently running queries and then
|
||||
wait for a notification from the main thread.
|
||||
fetch() the results (which will block).
|
||||
(3) After all queries have completed, check that the final number of admitted,
|
||||
queued, and rejected requests are reasonable given the test parameters. When
|
||||
submitting to a single impalad, we know exactly what the values should be,
|
||||
@@ -435,7 +428,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
self.executing_threads = list()
|
||||
|
||||
def teardown(self):
|
||||
# Set shutdown for all threads (cancel if needed)
|
||||
for thread in self.all_threads:
|
||||
try:
|
||||
thread.lock.acquire()
|
||||
@@ -450,9 +442,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
client.close()
|
||||
finally:
|
||||
thread.lock.release()
|
||||
|
||||
# Wait for all threads to exit
|
||||
for thread in self.all_threads:
|
||||
thread.join(5)
|
||||
LOG.debug("Join thread for query num %s %s", thread.query_num,
|
||||
"TIMED OUT" if thread.isAlive() else "")
|
||||
@@ -548,39 +537,36 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
LOG.debug("Found all %s admitted threads after %s seconds", num_threads,
|
||||
round(time() - start_time, 1))
|
||||
|
||||
def end_admitted_queries(self, num_queries):
|
||||
def cancel_admitted_queries(self, num_queries):
|
||||
"""
|
||||
Requests each admitted query to end its query.
|
||||
Cancels queries on threads that are currently blocked on query execution.
|
||||
"""
|
||||
assert len(self.executing_threads) >= num_queries
|
||||
LOG.debug("Requesting {0} clients to end queries".format(num_queries))
|
||||
|
||||
# Request admitted clients to end their queries
|
||||
current_executing_queries = []
|
||||
LOG.debug("Cancelling %s queries", num_queries)
|
||||
for i in xrange(num_queries):
|
||||
# pop() is thread-safe, it's OK if another thread is appending concurrently.
|
||||
thread = self.executing_threads.pop(0)
|
||||
LOG.debug("Cancelling query %s", thread.query_num)
|
||||
# The other thread sets the query_state before appending itself to the list,
|
||||
# and will not change its state until it is cancelled by this thread.
|
||||
assert thread.query_state == 'ADMITTED'
|
||||
current_executing_queries.append(thread)
|
||||
thread.query_state = 'REQUEST_QUERY_END'
|
||||
|
||||
# Wait for the queries to end
|
||||
start_time = time()
|
||||
while True:
|
||||
all_done = True
|
||||
for thread in self.all_threads:
|
||||
if thread.query_state == 'REQUEST_QUERY_END':
|
||||
all_done = False
|
||||
if all_done:
|
||||
break
|
||||
assert (time() - start_time < STRESS_TIMEOUT),\
|
||||
"Timed out waiting %s seconds for query end" % (STRESS_TIMEOUT,)
|
||||
sleep(1)
|
||||
client = thread.impalad.service.create_beeswax_client()
|
||||
try:
|
||||
cancel_result = client.cancel(thread.query_handle)
|
||||
assert cancel_result.status_code == 0,\
|
||||
'Unexpected status code from cancel request: %s' % cancel_result
|
||||
# Wait for the query to be cancelled and return
|
||||
thread.join(20)
|
||||
LOG.debug("Cancelled admitted query %s %s",
|
||||
thread.query_num, "TIMED OUT" if thread.isAlive() else "")
|
||||
assert not thread.isAlive()
|
||||
assert thread.query_state == 'COMPLETED'
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
class SubmitQueryThread(threading.Thread):
|
||||
def __init__(self, impalad, additional_query_options, vector, query_num,
|
||||
query_end_behavior, executing_threads):
|
||||
executing_threads):
|
||||
"""
|
||||
executing_threads must be provided so that this thread can add itself when the
|
||||
query is admitted and begins execution.
|
||||
@@ -590,7 +576,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
self.vector = vector
|
||||
self.additional_query_options = additional_query_options
|
||||
self.query_num = query_num
|
||||
self.query_end_behavior = query_end_behavior
|
||||
self.impalad = impalad
|
||||
self.error = None
|
||||
# query_state is defined and used only by the test code, not a property exposed by
|
||||
@@ -614,6 +599,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
return
|
||||
|
||||
exec_options = self.vector.get_value('exec_option')
|
||||
exec_options['debug_action'] = '0:GETNEXT:WAIT'
|
||||
exec_options.update(self.additional_query_options)
|
||||
query = QUERY % (self.query_num,)
|
||||
self.query_state = 'SUBMITTING'
|
||||
@@ -621,9 +607,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
ImpalaTestSuite.change_database(client, self.vector.get_value('table_format'))
|
||||
client.set_configuration(exec_options)
|
||||
|
||||
if self.query_end_behavior == 'QUERY_TIMEOUT':
|
||||
client.execute("SET QUERY_TIMEOUT_S=5")
|
||||
|
||||
LOG.debug("Submitting query %s", self.query_num)
|
||||
self.query_handle = client.execute_async(query)
|
||||
except ImpalaBeeswaxException as e:
|
||||
@@ -644,22 +627,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
# The thread becomes visible to the main thread when it is added to the
|
||||
# shared list of executing_threads. append() is atomic and thread-safe.
|
||||
self.executing_threads.append(self)
|
||||
|
||||
# Synchronize with the main thread. At this point, the thread is executing a
|
||||
# query. It needs to wait until the main thread requests it to end its query.
|
||||
while not self.shutdown:
|
||||
# The QUERY_TIMEOUT needs to stay active until the main thread requests it
|
||||
# to end. Otherwise, the query may get cancelled early. Fetch a row every
|
||||
# second to avoid going idle.
|
||||
if self.query_end_behavior == 'QUERY_TIMEOUT' and \
|
||||
self.query_state != 'COMPLETED':
|
||||
client.fetch(query, self.query_handle, 1)
|
||||
if self.query_state == 'REQUEST_QUERY_END':
|
||||
self._end_query(client, query)
|
||||
# The query has released admission control resources
|
||||
try:
|
||||
# fetch() will block until we cancel the query from the main thread
|
||||
# (unless an unexpected error occurs). If an error occurs on the main therad,
|
||||
# it is possible that teardown() cancels this query before we call fetch(). In
|
||||
# that case a different exception is thrown and we handle it gracefully.
|
||||
client.fetch(query, self.query_handle)
|
||||
except ImpalaBeeswaxException as e:
|
||||
if "Cancelled" in str(e):
|
||||
LOG.debug("Query %s completed", self.query_num)
|
||||
self.query_state = 'COMPLETED'
|
||||
self.query_handle = None
|
||||
sleep(1)
|
||||
elif "Invalid or unknown query handle" in str(e):
|
||||
# May happen if the test is being torn down early (i.e. an error occurred).
|
||||
LOG.debug("Query %s already cancelled in test shutdown.")
|
||||
else:
|
||||
raise e
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
# Unknown errors will be raised later
|
||||
@@ -670,27 +653,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
if client is not None:
|
||||
client.close()
|
||||
|
||||
def _end_query(self, client, query):
|
||||
"""Bring the query to the appropriate end state defined by self.query_end_behaviour.
|
||||
Returns once the query has reached that state."""
|
||||
if self.query_end_behavior == 'QUERY_TIMEOUT':
|
||||
# Sleep and wait for the query to be cancelled. The cancellation will
|
||||
# set the state to EXCEPTION.
|
||||
start_time = time()
|
||||
while (client.get_state(self.query_handle) != \
|
||||
client.QUERY_STATES['EXCEPTION']):
|
||||
assert (time() - start_time < STRESS_TIMEOUT),\
|
||||
"Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
|
||||
sleep(1)
|
||||
elif self.query_end_behavior == 'EOS':
|
||||
# Fetch all rows so we hit eos.
|
||||
client.fetch(query, self.query_handle)
|
||||
elif self.query_end_behavior == 'CLIENT_CANCEL':
|
||||
client.cancel(self.query_handle)
|
||||
else:
|
||||
assert self.query_end_behavior == 'CLIENT_CLOSE'
|
||||
client.close_query(self.query_handle)
|
||||
|
||||
def _check_queries_page_resource_pools(self):
|
||||
"""Checks that all queries in the '/queries' webpage json have the correct resource
|
||||
pool (this is called after all queries have been admitted, queued, or rejected, so
|
||||
@@ -731,11 +693,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
initial_metrics = self.get_admission_metrics();
|
||||
log_metrics("Initial metrics: ", initial_metrics);
|
||||
|
||||
for query_num in xrange(num_queries):
|
||||
# Want query_num to start at 1 because this gets used as the limit in the query to
|
||||
# help debugging (we can associate a running query with a thread). If we start at 0,
|
||||
# that query would be evaluated as a constant expression and never hit the WAIT debug
|
||||
# action.
|
||||
for query_num in xrange(1, num_queries + 1):
|
||||
impalad = self.impalads[query_num % len(self.impalads)]
|
||||
query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)]
|
||||
thread = self.SubmitQueryThread(impalad, additional_query_options, vector,
|
||||
query_num, query_end_behavior, self.executing_threads)
|
||||
query_num, self.executing_threads)
|
||||
thread.start()
|
||||
self.all_threads.append(thread)
|
||||
sleep(submission_delay_ms / 1000.0)
|
||||
@@ -770,10 +735,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
while len(self.executing_threads) > 0:
|
||||
curr_metrics = self.get_admission_metrics();
|
||||
log_metrics("Main loop, curr_metrics: ", curr_metrics);
|
||||
num_to_end = len(self.executing_threads)
|
||||
LOG.debug("Main loop, will request %s queries to end", num_to_end)
|
||||
self.end_admitted_queries(num_to_end)
|
||||
self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
|
||||
num_to_cancel = len(self.executing_threads)
|
||||
LOG.debug("Main loop, will cancel %s queries", num_to_cancel)
|
||||
self.cancel_admitted_queries(num_to_cancel)
|
||||
self.wait_for_metric_changes(['released'], curr_metrics, num_to_cancel)
|
||||
|
||||
num_queued_remaining =\
|
||||
curr_metrics['queued'] - curr_metrics['dequeued'] - curr_metrics['timed-out']
|
||||
|
||||
Reference in New Issue
Block a user