IMPALA-13682: Implement missing capabilities in ImpylaHS2Connection

This patch implements 'wait_for_finished_timeout',
'wait_for_admission_control', and 'get_admission_result' for
ImpylaHS2Client.

This patch also changes the behavior of ImpylaHS2Connection to produce
several extra cursors aside from self.__cursor for 'execute' call that
supplies user argument and each 'execute_async' to make issuing multiple
concurrent queries possible. Note that each HS2 cursor opens its own HS2
Session. Therefore, this patch breaks the assumption that an
ImpylaHS2Connection is always under a single HS2 Session (see HIVE-11402
and HIVE-14247 on why concurrent query with shared HS2 Session is
problematic). However, they do share the same query options stored at
self.__query_options. In practice, most Impala tests do not care about
running concurrent queries under a single HS2 session but only require
them to use the same query options.

The following additions are new for both BeeswaxConnection and
ImpylaHS2Connection:
- Add method 'log_client' for convenience.
- Implement consistent query state mapping and checking through several
  accessor methods.
- Add methods 'wait_for_impala_state' and 'wait_for_any_impala_state'
  that use 'get_impala_exec_state' method internally.
- Add 'fetch_profile_after_close' parameter to 'close_query' method. If
  True, 'close_query' will return the query profile after closing the
  query.
- Add 'discard_results' parameter for 'fetch' method. This can save time
  parsing results if the test does not care about the result.

Reuse existing op_handle_to_query_id and add new
session_handle_to_session_id to parse HS2
TOperationHandle.operationId.guid and TSessionHandle.sessionId.guid
respectively.

To show that ImpylaHS2Connection is on par with BeeswaxConnection, this
patch refactors test_admission_controller.py to test using HS2 protocol
by default. Test that does raw HS2 RPC (require capabilities from
HS2TestSuite) is separated out into a new TestAdmissionControllerRawHS2
class and stays using beeswax protocol by default. All calls to
copy.copy is replaced with copy.deepcopy for safety.

Testing:
- Pass exhaustive tests.

Change-Id: I9ac07732424c16338e060c9392100b54337f11b8
Reviewed-on: http://gerrit.cloudera.org:8080/22362
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-01-16 19:42:51 -08:00
committed by Impala Public Jenkins
parent fc152b376c
commit daaf73a7c2
8 changed files with 680 additions and 305 deletions

View File

@@ -82,6 +82,11 @@ class ClientRequestState {
~ClientRequestState();
/// Enums representing query exection state internal to impala.
/// Mapping to client protocol-specific states are provided by methods such as
/// BeeswaxQueryState() for Beeswax and TOperationState() for HS2.
/// If updating this enum, please also update mappings at
/// tests/common/impala_connection.py
enum class ExecState { INITIALIZED, PENDING, RUNNING, FINISHED, ERROR };
enum class RetryState { RETRYING, RETRIED, NOT_RETRIED };

View File

@@ -224,11 +224,11 @@ class ImpalaBeeswaxClient(object):
if not fetch_profile_after_close:
result.runtime_profile = self.get_runtime_profile(handle)
self.close_query(handle)
profile_after_close = self.close_query(handle, fetch_profile_after_close)
if fetch_profile_after_close:
# Fetch the profile again after the query has closed and the profile is complete.
result.runtime_profile = self.get_runtime_profile(handle)
# Attach profile that is obtained after query closed.
result.runtime_profile = profile_after_close
return result
@@ -292,8 +292,11 @@ class ImpalaBeeswaxClient(object):
def cancel_query(self, query_id):
return self.__do_rpc(lambda: self.imp_service.Cancel(query_id))
def close_query(self, handle):
def close_query(self, handle, fetch_profile_after_close=False):
self.__do_rpc(lambda: self.imp_service.close(handle))
if fetch_profile_after_close:
return self.get_runtime_profile(handle)
return None
def wait_for_finished(self, query_handle):
"""Given a query handle, polls the coordinator waiting for the query to transition to
@@ -371,7 +374,8 @@ class ImpalaBeeswaxClient(object):
def get_log(self, query_handle):
return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
def fetch_results(self, query_string, query_handle, max_rows=-1):
def fetch_results(self, query_string, query_handle, max_rows=-1,
discard_results=False):
"""Fetches query results given a handle and query type (insert, use, other)"""
query_type = self.__get_query_type(query_string)
if query_type == 'use':
@@ -382,6 +386,8 @@ class ImpalaBeeswaxClient(object):
# Result fetching for insert is different from other queries.
exec_result = None
if discard_results:
return exec_result
if query_type == 'insert':
exec_result = self.__fetch_insert_results(query_handle)
else:

View File

@@ -21,15 +21,24 @@
from __future__ import absolute_import, division, print_function
import abc
import codecs
from future.utils import with_metaclass
import logging
import re
import time
from beeswaxd.BeeswaxService import QueryState
import impala.dbapi as impyla
import impala.error as impyla_error
import tests.common
from RuntimeProfile.ttypes import TRuntimeProfileFormat
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
from tests.beeswax.impala_beeswax import (
DEFAULT_SLEEP_INTERVAL,
ImpalaBeeswaxClient,
ImpalaBeeswaxException)
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
from tests.util.thrift_util import (
op_handle_to_query_id,
session_handle_to_session_id)
LOG = logging.getLogger(__name__)
@@ -46,6 +55,36 @@ PROGRESS_LOG_RE = re.compile(
MAX_SQL_LOGGING_LENGTH = 128 * 1024
# Tuple of root exception types from different client protocol.
IMPALA_CONNECTION_EXCEPTION = (ImpalaBeeswaxException, impyla_error.Error)
# String representation of ClientRequestState::ExecState
INITIALIZED = 'INITIALIZED'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FINISHED = 'FINISHED'
ERROR = 'ERROR'
# ExecState that is final.
EXEC_STATES_FINAL = set([FINISHED, ERROR])
# Possible ExecState after query passed admission controller.
EXEC_STATES_ADMITTED = set([RUNNING, FINISHED, ERROR])
# Mapping of a ExecState to a set of possible future ExecState.
LEGAL_FUTURE_STATES = {
INITIALIZED: set([PENDING, RUNNING, FINISHED, ERROR]),
PENDING: set([RUNNING, FINISHED, ERROR]),
RUNNING: set([FINISHED, ERROR]),
FINISHED: set([ERROR]),
ERROR: set()
}
def has_legal_future_state(impala_state, future_states):
"""Return True if 'impala_state' can transition to one of state listed in
'future_states'."""
assert impala_state in LEGAL_FUTURE_STATES
expected_impala_states = set(future_states)
return len(LEGAL_FUTURE_STATES[impala_state] & expected_impala_states) > 0
# test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs). It is the
# only test that runs SQL larger than 128KB. Logging these SQLs in execute() increases
@@ -149,20 +188,65 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
pass
@abc.abstractmethod
def close_query(self, handle):
def close_query(self, handle, fetch_profile_after_close=False):
"""Closes the query."""
pass
@abc.abstractmethod
def get_state(self, operation_handle):
"""Returns the state of a query"""
"""Returns the state of a query.
May raise en error, depending on connection type."""
pass
@abc.abstractmethod
def state_is_finished(self, operation_handle):
"""Returns whether the state of a query is finished"""
def get_impala_exec_state(self, operation_handle):
"""Returns a string translation from client specific state of operation_handle
to Impala's ClientRequestState::ExecState."""
pass
def __is_at_exec_state(self, operation_handle, impala_state):
self.log_handle(
operation_handle, 'checking ' + impala_state + ' state for operation')
return self.get_impala_exec_state(operation_handle) == impala_state
def state_is_finished(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is FINISHED.
DEPRECATED: use is_finished() instead."""
return self.is_finished(operation_handle)
def is_initialized(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is INITIALIZED"""
return self.__is_at_exec_state(operation_handle, INITIALIZED)
def is_pending(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is PENDING"""
return self.__is_at_exec_state(operation_handle, PENDING)
def is_running(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is RUNNING"""
return self.__is_at_exec_state(operation_handle, RUNNING)
def is_finished(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is FINISHED"""
return self.__is_at_exec_state(operation_handle, FINISHED)
def is_error(self, operation_handle):
"""Returns whether the Impala exec state of a operation_handle is ERROR.
Internally, it will call get_state(), and any exception thrown by get_state() will
cause this method to return True."""
return self.__is_at_exec_state(operation_handle, ERROR)
def is_executing(self, operation_handle):
"""Returns whether the state of a operation_handle is executing or will be
executing. Return False if operation_handle has ended, either successful or
with error."""
return self.get_impala_exec_state(operation_handle) not in EXEC_STATES_FINAL
def is_admitted(self, operation_handle):
"""Returns whether the state of a operation_handle has passed Impala
admission control. Return True if handle state is error."""
return self.get_impala_exec_state(operation_handle) in EXEC_STATES_ADMITTED
@abc.abstractmethod
def get_log(self, operation_handle):
"""Returns the log of an operation as a string, with entries separated by newlines."""
@@ -185,10 +269,19 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
pass
@abc.abstractmethod
def fetch(self, sql_stmt, operation_handle, max_rows=-1):
def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False):
"""Fetches query results up to max_rows given a handle and sql statement.
If max_rows < 0, all rows are fetched. If max_rows > 0 but the number of
rows returned is less than max_rows, all the rows have been fetched."""
rows returned is less than max_rows, all the rows have been fetched.
Return None if discard_results is True.
TODO: 'sql_stmt' can be obtained from 'operation_handle'."""
pass
@abc.abstractmethod
def get_runtime_profile(self, operation_handle,
profile_format=TRuntimeProfileFormat.STRING):
"""Get runtime profile of given 'operation_handle'.
Handle must stay open."""
pass
@abc.abstractmethod
@@ -198,14 +291,85 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
Otherwise, return str(operation_handle)."""
pass
@abc.abstractmethod
def log_handle(self, operation_handle, message):
"""Log 'message' at INFO level, along with id of 'operation_handle'."""
handle_id = self.handle_id(operation_handle)
LOG.info("-- {0}: {1}".format(handle_id, message))
def log_client(self, message):
"""Log 'message' at INFO level."""
LOG.info("-- {0}".format(message))
def wait_for_impala_state(self, operation_handle, expected_impala_state, timeout):
"""Waits for the given 'operation_handle' to reach the 'expected_impala_state'.
'expected_impala_state' must be a string of either 'INITIALIZED', 'PENDING',
'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach the given state within
'timeout' seconds, the method throws an AssertionError.
"""
self.wait_for_any_impala_state(operation_handle, [expected_impala_state], timeout)
def wait_for_any_impala_state(self, operation_handle, expected_impala_states,
timeout_s):
"""Waits for the given 'operation_handle' to reach one of 'expected_impala_states'.
Each string in 'expected_impala_states' must either be 'INITIALIZED', 'PENDING',
'RUNNING', 'FINISHED', or 'ERROR'. If it does not reach one of the given states
within 'timeout' seconds, the method throws an AssertionError.
Returns the final state.
"""
start_time = time.time()
timeout_msg = None
while True:
impala_state = self.get_impala_exec_state(operation_handle)
interval = time.time() - start_time
if impala_state in expected_impala_states:
# Reached one of expected_impala_states.
break
elif not has_legal_future_state(impala_state, expected_impala_states):
timeout_msg = ("query '{0}' can not transition from last known state '{1}' to "
"any of the expected states {2}. Stop waiting after {3} "
"seconds.").format(
self.handle_id(operation_handle), impala_state, expected_impala_states,
interval)
break
elif interval >= timeout_s:
timeout_msg = ("query '{0}' did not reach one of the expected states {1}, last "
"known state {2}").format(
self.handle_id(operation_handle), expected_impala_states, impala_state)
break
time.sleep(DEFAULT_SLEEP_INTERVAL)
if timeout_msg is not None:
raise tests.common.errors.Timeout(timeout_msg)
return impala_state
@abc.abstractmethod
def wait_for_admission_control(self, operation_handle, timeout_s=60):
"""Given an 'operation_handle', polls the coordinator waiting for it to complete
admission control processing of the query.
Return True if query pass admission control after given 'timeout_s'."""
pass
@abc.abstractmethod
def get_admission_result(self, operation_handle):
"""Given an 'operation_handle', returns the admission result from the query
profile"""
pass
# Represents a connection to Impala using the Beeswax API.
class BeeswaxConnection(ImpalaConnection):
# This is based on ClientRequestState::BeeswaxQueryState().
__QUERY_STATE_TO_EXEC_STATE = {
QueryState.CREATED: INITIALIZED,
QueryState.COMPILED: PENDING,
QueryState.RUNNING: RUNNING,
QueryState.FINISHED: FINISHED,
QueryState.EXCEPTION: ERROR,
# These are not official ExecState, but added to complete mapping.
QueryState.INITIALIZED: 'UNIMPLEMENTED_INITIALIZED',
}
def __init__(self, host_port, use_kerberos=False, user=None, password=None,
use_ssl=False):
self.__beeswax_client = ImpalaBeeswaxClient(host_port, use_kerberos, user=user,
@@ -217,7 +381,7 @@ class BeeswaxConnection(ImpalaConnection):
self.QUERY_STATES = self.__beeswax_client.query_states
def get_test_protocol(self):
return 'beeswax'
return BEESWAX
def get_host_port(self):
return self.__host_port
@@ -250,30 +414,35 @@ class BeeswaxConnection(ImpalaConnection):
self.set_configuration_option("client_identifier", tests.common.current_node)
def connect(self):
LOG.info("-- connecting to: %s" % self.__host_port)
self.__beeswax_client.connect()
try:
self.__beeswax_client.connect()
self.log_client("connected to %s with beeswax" % self.__host_port)
except Exception as e:
self.log_client("failed connecting to %s with beeswax" % self.__host_port)
raise e
# TODO: rename to close_connection
def close(self):
LOG.info("-- closing connection to: %s" % self.__host_port)
self.log_client("closing beeswax connection to: %s" % self.__host_port)
self.__beeswax_client.close_connection()
def close_query(self, operation_handle):
def close_query(self, operation_handle, fetch_profile_after_close=False):
self.log_handle(operation_handle, 'closing query for operation')
self.__beeswax_client.close_query(operation_handle.get_handle())
return self.__beeswax_client.close_query(operation_handle.get_handle(),
fetch_profile_after_close)
def close_dml(self, operation_handle):
self.log_handle(operation_handle, 'closing DML query')
self.__beeswax_client.close_dml(operation_handle.get_handle())
def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
LOG.info("-- executing against %s\n" % (self.__host_port))
self.log_client("executing against %s\n" % (self.__host_port))
log_sql_stmt(sql_stmt)
return self.__beeswax_client.execute(sql_stmt, user=user,
fetch_profile_after_close=fetch_profile_after_close)
def execute_async(self, sql_stmt, user=None):
LOG.info("-- executing async: %s\n" % (self.__host_port))
self.log_client("executing async: %s\n" % (self.__host_port))
log_sql_stmt(sql_stmt)
beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt, user=user)
return OperationHandle(beeswax_handle, sql_stmt)
@@ -286,15 +455,17 @@ class BeeswaxConnection(ImpalaConnection):
self.log_handle(operation_handle, 'getting state')
return self.__beeswax_client.get_state(operation_handle.get_handle())
def state_is_finished(self, operation_handle):
self.log_handle(operation_handle, 'checking finished state for operation')
return self.get_state(operation_handle) == self.QUERY_STATES["FINISHED"]
def get_impala_exec_state(self, operation_handle):
return self.__QUERY_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
def get_exec_summary(self, operation_handle):
self.log_handle(operation_handle, 'getting exec summary operation')
return self.__beeswax_client.get_exec_summary(operation_handle.get_handle())
def get_runtime_profile(self, operation_handle):
def get_runtime_profile(self, operation_handle,
profile_format=TRuntimeProfileFormat.STRING):
assert profile_format == TRuntimeProfileFormat.STRING, (
"Beeswax client only support getting runtime profile in STRING format.")
self.log_handle(operation_handle, 'getting runtime profile operation')
return self.__beeswax_client.get_runtime_profile(operation_handle.get_handle())
@@ -316,11 +487,11 @@ class BeeswaxConnection(ImpalaConnection):
self.log_handle(operation_handle, 'getting log for operation')
return self.__beeswax_client.get_log(operation_handle.get_handle().log_context)
def fetch(self, sql_stmt, operation_handle, max_rows=-1):
def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False):
self.log_handle(operation_handle, 'fetching {} rows'.format(
'all' if max_rows < 0 else max_rows))
return self.__beeswax_client.fetch_results(
sql_stmt, operation_handle.get_handle(), max_rows)
sql_stmt, operation_handle.get_handle(), max_rows, discard_results)
def handle_id(self, operation_handle):
query_id = operation_handle.get_handle().id
@@ -340,9 +511,23 @@ class ImpylaHS2Connection(ImpalaConnection):
plus Impala-specific extensions, e.g. for fetching runtime profiles.
TODO: implement support for kerberos, SSL, etc.
"""
# ClientRequestState::TOperationState()
__OPERATION_STATE_TO_EXEC_STATE = {
'INITIALIZED_STATE': INITIALIZED,
'PENDING_STATE': PENDING,
'RUNNING_STATE': RUNNING,
'FINISHED_STATE': FINISHED,
'ERROR_STATE': ERROR,
# These are not official ExecState, but added to complete mapping.
'CANCELED_STATE': 'UNIMPLEMENTED_CANCELLED',
'CLOSED_STATE': 'UNIMPLEMENTED_CLOSED',
'UKNOWN_STATE': 'UNIMPLEMENTED_UNKNOWN'
}
def __init__(self, host_port, use_kerberos=False, is_hive=False,
use_http_transport=False, http_path="", use_ssl=False,
collect_profile_and_log=True):
collect_profile_and_log=True, user=None):
self.__host_port = host_port
self.__use_http_transport = use_http_transport
self.__http_path = http_path
@@ -353,28 +538,37 @@ class ImpylaHS2Connection(ImpalaConnection):
# cursor for different operations (as opposed to creating a new cursor per operation)
# so that the session is preserved. This means that we can only execute one operation
# at a time per connection, which is a limitation also imposed by the Beeswax API.
# However, for ease of async query testing, opening multiple cursors through single
# ImpylaHS2Connection is allowed if executing query through execute_async() or
# execute() with user parameter that is different than self.__user. Do note though
# that they will not share the same session with self.__cursor.
self.__impyla_conn = None
self.__cursor = None
# Default query option obtained from initial connect.
# Query option names are in lower case for consistency.
self.__default_query_options = {}
# List of all cursors that created through execute_async.
self.__async_cursors = list()
# Query options to send along with each query.
self.__query_options = {}
self._is_hive = is_hive
# Some Hive HS2 protocol, such as custom Calcite planner, may be able to collect
# profile and log from Impala.
self._collect_profile_and_log = collect_profile_and_log
self.__user = user
def get_test_protocol(self):
if self.__http_path:
return 'hs2-http'
return HS2_HTTP
else:
return 'hs2'
return HS2
def get_host_port(self):
return self.__host_port
def set_configuration_option(self, name, value):
# Only set the option if it's not already set to the same value.
# value must be parsed to string.
name = name.lower()
value = str(value)
if self.__query_options.get(name) != value:
@@ -389,35 +583,53 @@ class ImpylaHS2Connection(ImpalaConnection):
if hasattr(tests.common, "current_node") and not self._is_hive:
self.set_configuration_option("client_identifier", tests.common.current_node)
def __open_single_cursor(self, user=None):
return self.__impyla_conn.cursor(user=user, convert_types=False,
close_finished_queries=False)
def __close_single_cursor(self, cursor):
try:
# Explicitly close the cursor so that it will close the session.
cursor.close()
except Exception:
# The session may no longer be valid if the impalad was restarted during the test.
pass
def connect(self):
LOG.info("-- connecting to {0} with impyla".format(self.__host_port))
host, port = self.__host_port.split(":")
conn_kwargs = {}
if self._is_hive:
conn_kwargs['auth_mechanism'] = 'PLAIN'
self.__impyla_conn = impyla.connect(host=host, port=int(port),
use_http_transport=self.__use_http_transport,
http_path=self.__http_path,
use_ssl=self.__use_ssl, **conn_kwargs)
# Get the default query options for the session before any modifications are made.
self.__cursor = \
self.__impyla_conn.cursor(convert_types=False, close_finished_queries=False)
self.__default_query_options = {}
if not self._is_hive:
self.__cursor.execute("set all")
for name, val, kind in self.__cursor:
collect_default_query_options(self.__default_query_options, name, val, kind)
self.__cursor.close_operation()
try:
self.__impyla_conn = impyla.connect(
host=host, port=int(port), use_http_transport=self.__use_http_transport,
http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs)
self.__cursor = self.__open_single_cursor(user=self.__user)
# Get the default query options for the session before any modifications are made.
self.__default_query_options = {}
if not self._is_hive:
self.__cursor.execute("set all")
for name, val, kind in self.__cursor:
collect_default_query_options(self.__default_query_options, name, val, kind)
self.__cursor.close_operation()
LOG.debug("Default query options: {0}".format(self.__default_query_options))
self.log_client("connected to {0} with impyla {1} session {2}".format(
self.__host_port, self.get_test_protocol(), self.__get_session_id(self.__cursor)
))
except Exception as e:
self.log_client("failed connecting to {0} with impyla {1}".format(
self.__host_port, self.get_test_protocol()
))
raise e
def close(self):
LOG.info("-- closing connection to: {0}".format(self.__host_port))
try:
# Explicitly close the cursor so that it will close the session.
self.__cursor.close()
except Exception:
# The session may no longer be valid if the impalad was restarted during the test.
pass
self.log_client("closing 1 sync and {0} async {1} connections to: {2}".format(
len(self.__async_cursors), self.get_test_protocol(), self.__host_port))
self.__close_single_cursor(self.__cursor)
for async_cursor in self.__async_cursors:
self.__close_single_cursor(async_cursor)
# Remove all async cursors.
self.__async_cursors = list()
try:
self.__impyla_conn.close()
except AttributeError as e:
@@ -430,59 +642,90 @@ class ImpylaHS2Connection(ImpalaConnection):
"""Trigger the GetTables() HS2 request on the given database (None means all dbs).
Returns a list of (catalogName, dbName, tableName, tableType, tableComment).
"""
LOG.info("-- getting tables for database: {0}".format(database))
self.log_client("getting tables for database: {0}".format(database))
self.__cursor.get_tables(database_name=database)
return self.__cursor.fetchall()
def close_query(self, operation_handle):
def close_query(self, operation_handle, fetch_profile_after_close=False):
self.log_handle(operation_handle, 'closing query for operation')
# close_operation() will wipe out _last_operation.
# Assign it to op_handle so that we can pull the profile after close_operation().
op_handle = operation_handle.get_handle()._last_operation
operation_handle.get_handle().close_operation()
if fetch_profile_after_close:
assert self._collect_profile_and_log, (
"This connection is not configured to collect profile.")
return op_handle.get_profile(TRuntimeProfileFormat.STRING)
return None
def __log_execute(self, cursor, user):
self.log_client(
"executing against {0} at {1}. session: {2} main_cursor: {3} user: {4}\n".format(
(self._is_hive and 'Hive' or 'Impala'), self.__host_port,
self.__get_session_id(cursor), (cursor == self.__cursor), user)
)
def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING,
fetch_profile_after_close=False):
LOG.info("-- executing against {0} at {1}\n".format(
self._is_hive and 'Hive' or 'Impala', self.__host_port))
log_sql_stmt(sql_stmt)
self.__cursor.execute(sql_stmt, configuration=self.__query_options)
handle = OperationHandle(self.__cursor, sql_stmt)
cursor = self.__cursor
result = None
try:
if user != self.__user:
# Must create a new cursor to supply 'user'.
cursor = self.__open_single_cursor(user=user)
self.__log_execute(cursor, user)
cursor.execute(sql_stmt, configuration=self.__query_options)
handle = OperationHandle(cursor, sql_stmt)
self.log_handle(handle, "started query in session {0}".format(
self.__get_session_id(cursor)))
result = self.__fetch_results_and_profile(
handle, profile_format=profile_format,
fetch_profile_after_close=fetch_profile_after_close)
finally:
cursor.close_operation()
if cursor != self.__cursor:
self.__close_single_cursor(cursor)
return result
def __fetch_results_and_profile(
self, operation_handle, profile_format=TRuntimeProfileFormat.STRING,
fetch_profile_after_close=False):
r = None
try:
r = self.__fetch_results(handle, profile_format=profile_format)
r = self.__fetch_results(operation_handle, profile_format=profile_format)
finally:
if r is None:
# Try to close the query handle but ignore any exceptions not to replace the
# original exception raised by '__fetch_results'.
try:
self.close_query(handle)
self.close_query(operation_handle)
except Exception:
pass
elif fetch_profile_after_close:
op_handle = handle.get_handle()._last_operation
self.close_query(handle)
# Match ImpalaBeeswaxResult by placing the full profile including end time and
# duration into the return object.
r.runtime_profile = op_handle.get_profile(profile_format)
r.runtime_profile = self.close_query(operation_handle, fetch_profile_after_close)
return r
else:
self.close_query(handle)
self.close_query(operation_handle)
return r
def execute_async(self, sql_stmt, user=None):
LOG.info("-- executing against {0} at {1}\n".format(
self._is_hive and 'Hive' or 'Impala', self.__host_port))
log_sql_stmt(sql_stmt)
if user is not None:
raise NotImplementedError("Not yet implemented for HS2 - authentication")
async_cursor = None
try:
self.__cursor.execute_async(sql_stmt, configuration=self.__query_options)
handle = OperationHandle(self.__cursor, sql_stmt)
LOG.info("Started query {0}".format(self.get_query_id(handle)))
async_cursor = self.__open_single_cursor(user=user)
handle = OperationHandle(async_cursor, sql_stmt)
self.__log_execute(async_cursor, user)
log_sql_stmt(sql_stmt)
async_cursor.execute_async(sql_stmt, configuration=self.__query_options)
self.__async_cursors.append(async_cursor)
return handle
except Exception:
self.__cursor.close_operation()
raise
except Exception as e:
if async_cursor:
async_cursor.close_operation()
self.__close_single_cursor(async_cursor)
raise e
def cancel(self, operation_handle):
self.log_handle(operation_handle, 'canceling operation')
@@ -492,34 +735,38 @@ class ImpylaHS2Connection(ImpalaConnection):
def get_query_id(self, operation_handle):
"""Return the string representation of the query id.
Return empty string if handle is already canceled or closed."""
id = None
last_op = operation_handle.get_handle()._last_operation
if last_op is None:
return ""
guid_bytes = last_op.handle.operationId.guid
# hex_codec works on bytes, so this needs to a decode() to get back to a string
hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode()
lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
return "{0}:{1}".format(hi_str, lo_str)
if last_op is not None:
id = op_handle_to_query_id(last_op.handle)
return "" if id is None else id
def __get_session_id(self, cursor):
"""Return the string representation of the session id.
Return empty string if handle is already canceled or closed."""
id = None
if cursor.session is not None:
id = session_handle_to_session_id(cursor.session.handle)
return "" if id is None else id
def handle_id(self, operation_handle):
query_id = self.get_query_id(operation_handle)
return query_id if query_id else str(operation_handle)
def log_handle(self, operation_handle, message):
handle_id = self.handle_id(operation_handle)
LOG.info("-- {0}: {1}".format(handle_id, message))
def get_state(self, operation_handle):
self.log_handle(operation_handle, 'getting state')
cursor = operation_handle.get_handle()
return cursor.status()
def state_is_finished(self, operation_handle):
self.log_handle(operation_handle, 'checking finished state for operation')
cursor = operation_handle.get_handle()
# cursor.status contains a string representation of one of
# TCLIService.TOperationState.
return cursor.status() == "FINISHED_STATE"
return cursor.status()
def get_impala_exec_state(self, operation_handle):
try:
return self.__OPERATION_STATE_TO_EXEC_STATE[self.get_state(operation_handle)]
except impyla_error.Error:
return ERROR
except Exception as e:
raise e
def get_exec_summary(self, operation_handle):
self.log_handle(operation_handle, 'getting exec summary operation')
@@ -527,22 +774,53 @@ class ImpylaHS2Connection(ImpalaConnection):
# summary returned is thrift, not string.
return cursor.get_summary()
def get_runtime_profile(self, operation_handle, profile_format):
def get_runtime_profile(self, operation_handle,
profile_format=TRuntimeProfileFormat.STRING):
self.log_handle(operation_handle, 'getting runtime profile operation')
cursor = operation_handle.get_handle()
return cursor.get_profile(profile_format=profile_format)
def wait_for_finished_timeout(self, operation_handle, timeout):
self.log_handle(operation_handle, 'waiting for query to reach FINISHED state')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
start_time = time.time()
while time.time() - start_time < timeout:
start_rpc_time = time.time()
impala_state = self.get_impala_exec_state(operation_handle)
rpc_time = time.time() - start_rpc_time
# if the rpc succeeded, the output is the query state
if impala_state == FINISHED:
return True
elif impala_state == ERROR:
try:
error_log = self.__do_rpc(
lambda: self.imp_service.get_log(operation_handle.log_context))
raise impyla_error.OperationalError(error_log, None)
finally:
self.close_query(operation_handle)
if rpc_time < DEFAULT_SLEEP_INTERVAL:
time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
return False
def wait_for_admission_control(self, operation_handle):
def wait_for_admission_control(self, operation_handle, timeout_s=60):
self.log_handle(operation_handle, 'waiting for completion of the admission control')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
start_time = time.time()
while time.time() - start_time < timeout_s:
start_rpc_time = time.time()
if self.is_admitted(operation_handle):
return True
rpc_time = time.time() - start_rpc_time
if rpc_time < DEFAULT_SLEEP_INTERVAL:
time.sleep(DEFAULT_SLEEP_INTERVAL - rpc_time)
return False
def get_admission_result(self, operation_handle):
self.log_handle(operation_handle, 'getting the admission result')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
if self.is_admitted(operation_handle):
query_profile = self.get_runtime_profile(operation_handle)
admit_result = re.search(r"Admission result: (.*)", query_profile)
if admit_result:
return admit_result.group(1)
return ""
def get_log(self, operation_handle):
self.log_handle(operation_handle, 'getting log for operation')
@@ -552,12 +830,13 @@ class ImpylaHS2Connection(ImpalaConnection):
if not PROGRESS_LOG_RE.match(line)]
return '\n'.join(lines)
def fetch(self, sql_stmt, operation_handle, max_rows=-1):
def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False):
self.log_handle(operation_handle, 'fetching {} rows'.format(
'all' if max_rows < 0 else max_rows))
return self.__fetch_results(operation_handle, max_rows)
return self.__fetch_results(operation_handle, max_rows, discard_results)
def __fetch_results(self, handle, max_rows=-1,
discard_results=False,
profile_format=TRuntimeProfileFormat.STRING):
"""Implementation of result fetching from handle."""
cursor = handle.get_handle()
@@ -581,10 +860,15 @@ class ImpylaHS2Connection(ImpalaConnection):
else:
log = None
profile = None
return ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
column_labels=column_labels, column_types=column_types,
query=handle.sql_stmt(), log=log, profile=profile,
query_id=self.get_query_id(handle))
result = None
if discard_results:
return result
result = ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
column_labels=column_labels, column_types=column_types,
query=handle.sql_stmt(), log=log, profile=profile,
query_id=self.get_query_id(handle))
return result
class ImpylaHS2ResultSet(object):
@@ -628,17 +912,17 @@ class ImpylaHS2ResultSet(object):
return str(val)
def create_connection(host_port, use_kerberos=False, protocol='beeswax',
def create_connection(host_port, use_kerberos=False, protocol=BEESWAX,
is_hive=False, use_ssl=False, collect_profile_and_log=True):
if protocol == 'beeswax':
if protocol == BEESWAX:
c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos,
use_ssl=use_ssl)
elif protocol == 'hs2':
elif protocol == HS2:
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
is_hive=is_hive, use_ssl=use_ssl,
collect_profile_and_log=collect_profile_and_log)
else:
assert protocol == 'hs2-http'
assert protocol == HS2_HTTP
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
is_hive=is_hive, use_http_transport=True, http_path='cliservice',
use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log)

View File

@@ -491,6 +491,26 @@ class ImpaladService(BaseImpalaService):
# Only check if the port is open, do not create Thrift transport.
return self.is_port_open(self.hs2_http_port)
def create_client(self, protocol):
"""Creates a new client connection for given protocol to this impalad"""
port = self.beeswax_port
if protocol == 'hs2':
port = self.hs2_port
elif protocol == 'hs2-http':
port = self.hs2_http_port
client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol)
client.connect()
return client
def create_client_from_vector(self, vector):
"""A shorthand for create_client with test vector as input.
Vector must have 'protocol' and 'exec_option' dimension.
Return a client of specified 'protocol' and with cofiguration 'exec_option' set."""
client = self.create_client(protocol=vector.get_value('protocol'))
client.set_configuration(vector.get_exec_option_dict())
return client
# Allows for interacting with the StateStore service to perform operations such as
# accessing the debug webpage.
class StateStoredService(BaseImpalaService):

View File

@@ -345,13 +345,11 @@ class ImpalaTestSuite(BaseTestSuite):
cls.hive_transport.close()
cls.close_impala_clients()
@classmethod
def setup_method(cls, test_method):
def setup_method(self, test_method):
"""Setup for all test method."""
cls.__reset_impala_clients()
self._reset_impala_clients()
@classmethod
def teardown_method(cls, test_method):
def teardown_method(self, test_method):
"""Teardown for all test method.
Currently, it is only here as a placeholder for future use and complement
setup_method() declaration."""
@@ -426,7 +424,7 @@ class ImpalaTestSuite(BaseTestSuite):
cls.client = cls.default_impala_client(cls.default_test_protocol())
@classmethod
def __reset_impala_clients(cls):
def _reset_impala_clients(cls):
if cls.beeswax_client:
cls.beeswax_client.clear_configuration()
if cls.hs2_client:
@@ -1437,6 +1435,7 @@ class ImpalaTestSuite(BaseTestSuite):
"""Waits for the given 'query_handle' to reach the 'expected_state' using 'client', or
with the default connection if 'client' is None. If it does not reach the given state
within 'timeout' seconds, the method throws an AssertionError.
DEPRECATED: Use client.wait_for_impala_state() instead.
"""
self.wait_for_any_state(handle, [expected_state], timeout, client)
@@ -1445,6 +1444,7 @@ class ImpalaTestSuite(BaseTestSuite):
or with the default connection if 'client' is None. If it does not reach one of the
given states within 'timeout' seconds, the method throws an AssertionError. Returns
the final state.
DEPRECATED: Use client.wait_for_any_impala_state() instead.
"""
if client is None: client = self.client
start_time = time.time()

View File

@@ -27,12 +27,9 @@ import re
import signal
import sys
import threading
from copy import copy
from copy import deepcopy
from time import sleep, time
from beeswaxd.BeeswaxService import QueryState
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.cluster_config import (
impalad_admission_ctrl_flags,
impalad_admission_ctrl_config_args,
@@ -43,9 +40,14 @@ from tests.common.custom_cluster_test_suite import (
START_ARGS,
CustomClusterTestSuite)
from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties
from tests.common.impala_connection import (
RUNNING, FINISHED, ERROR,
IMPALA_CONNECTION_EXCEPTION)
from tests.common.resource_pool_config import ResourcePoolConfig
from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster
from tests.common.test_dimensions import (
HS2, BEESWAX,
add_mandatory_exec_option,
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
@@ -163,11 +165,22 @@ def metric_key(pool_name, metric_name):
return "admission-controller.%s.%s" % (metric_name, pool_name)
def wait_single_statestore_heartbeat():
"""Wait for state sync across impalads."""
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
class TestAdmissionControllerBase(CustomClusterTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def default_test_protocol(cls):
# Do not change this. Multiple test method has been hardcoded under this assumption.
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestAdmissionControllerBase, cls).add_test_dimensions()
@@ -176,14 +189,32 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def enable_admission_service(self, method):
"""Inject argument to enable admission control service.
Must be called at setup_method() and before calling setup_method() of superclass."""
start_args = "--enable_admission_service"
if START_ARGS in method.__dict__:
start_args = method.__dict__[START_ARGS] + " " + start_args
method.__dict__[START_ARGS] = start_args
if IMPALAD_ARGS in method.__dict__:
method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
@classmethod
def default_test_protocol(cls):
# HS2TestSuite override self.hs2_client with a raw Impala hs2 thrift client.
# This will set self.client = self.beeswax_client.
# Do not change this. Multiple test method has been hardcoded under this assumption.
return BEESWAX
class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def __check_pool_rejected(self, client, pool, expected_error_re):
try:
client.set_configuration({'request_pool': pool})
client.execute("select 1")
assert False, "Query should return error"
except ImpalaBeeswaxException as e:
except IMPALA_CONNECTION_EXCEPTION as e:
assert re.search(expected_error_re, str(e))
def __check_query_options(self, profile, expected_query_options):
@@ -232,34 +263,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
HS2TestSuite.check_response(get_profile_resp)
self.__check_query_options(get_profile_resp.profile, expected_options)
def _execute_and_collect_profiles(self, queries, timeout_s, config_options={},
allow_query_failure=False):
"""Submit the query statements in 'queries' in parallel to the first impalad in
the cluster. After submission, the results are fetched from the queries in
sequence and their profiles are collected. Wait for up to timeout_s for
each query to finish. If 'allow_query_failure' is True, succeeds if the query
completes successfully or ends up in the EXCEPTION state. Otherwise expects the
queries to complete successfully.
Returns the profile strings."""
client = self.cluster.impalads[0].service.create_beeswax_client()
expected_states = [client.QUERY_STATES['FINISHED']]
if allow_query_failure:
expected_states.append(client.QUERY_STATES['EXCEPTION'])
try:
handles = []
profiles = []
client.set_configuration(config_options)
for query in queries:
handles.append(client.execute_async(query))
for query, handle in zip(queries, handles):
state = self.wait_for_any_state(handle, expected_states, timeout_s)
if state == client.QUERY_STATES['FINISHED']:
self.client.fetch(query, handle)
profiles.append(self.client.get_runtime_profile(handle))
return profiles
finally:
client.close()
def get_ac_process(self):
"""Returns the Process that is running the admission control service."""
return self.cluster.impalads[0]
@@ -314,7 +317,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Wait for query to clear admission control and get accounted for
client.wait_for_admission_control(handle)
self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
assert client.is_finished(handle)
# queueA has default query options mem_limit=128m,query_timeout_s=5
self.__check_query_options(client.get_runtime_profile(handle),
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
@@ -386,7 +389,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
open_session_req = TCLIService.TOpenSessionReq()
open_session_req.username = ""
open_session_resp = self.hs2_client.OpenSession(open_session_req)
TestAdmissionController.check_response(open_session_resp)
TestAdmissionControllerRawHS2.check_response(open_session_resp)
try:
execute_statement_req = TCLIService.TExecuteStatementReq()
@@ -401,7 +404,110 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
finally:
close_req = TCLIService.TCloseSessionReq()
close_req.sessionHandle = open_session_resp.sessionHandle
TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
TestAdmissionControllerRawHS2.check_response(
self.hs2_client.CloseSession(close_req))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024))
@needs_session()
def test_queuing_status_through_query_log_and_exec_summary(self):
"""Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
the query's queuing status, that is, whether the query was queued and what was the
latest queuing reason."""
# Start a long-running query.
long_query_resp = self.execute_statement("select sleep(10000)")
# Ensure that the query has started executing.
self.wait_for_admission_control(long_query_resp.operationHandle)
# Submit another query.
queued_query_resp = self.execute_statement("select sleep(1)")
# Wait until the query is queued.
self.wait_for_operation_state(queued_query_resp.operationHandle,
TCLIService.TOperationState.PENDING_STATE)
# Check whether the query log message correctly exposes the queuing status.
log = self.wait_for_log_message(
queued_query_resp.operationHandle, "Admission result :")
assert "Admission result : Queued" in log, log
assert "Latest admission queue reason : number of running queries 1 is at or over "
"limit 1" in log, log
# Now check the same for ExecSummary.
summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
summary_req.operationHandle = queued_query_resp.operationHandle
summary_req.sessionHandle = self.session_handle
exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
assert exec_summary_resp.summary.is_queued
assert "number of running queries 1 is at or over limit 1" in \
exec_summary_resp.summary.queued_reason, \
exec_summary_resp.summary.queued_reason
# Close the running query.
self.close(long_query_resp.operationHandle)
# Close the queued query.
self.close(queued_query_resp.operationHandle)
class TestAdmissionControllerRawHS2WithACService(TestAdmissionControllerRawHS2):
"""Runs all of the tests from TestAdmissionControllerRawHS2 but with the second
impalad in the minicluster configured to perform all admission control."""
def get_ac_process(self):
return self.cluster.admissiond
def get_ac_log_name(self):
return "admissiond"
def setup_method(self, method):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
self.enable_admission_service(method)
super(TestAdmissionControllerRawHS2, self).setup_method(method)
class TestAdmissionController(TestAdmissionControllerBase):
def get_ac_process(self):
"""Returns the Process that is running the admission control service."""
return self.cluster.impalads[0]
def get_ac_log_name(self):
"""Returns the prefix of the log files for the admission control process."""
return "impalad"
def setup_method(self, method):
"""All tests in this class is non-destructive. Therefore, we can afford
resetting clients at every setup_method."""
super(TestAdmissionController, self).setup_method(method)
self._reset_impala_clients()
def _execute_and_collect_profiles(self, queries, timeout_s, config_options={},
allow_query_failure=False):
"""Submit the query statements in 'queries' in parallel to the first impalad in
the cluster. After submission, the results are fetched from the queries in
sequence and their profiles are collected. Wait for up to timeout_s for
each query to finish. If 'allow_query_failure' is True, succeeds if the query
completes successfully or ends up in the EXCEPTION state. Otherwise expects the
queries to complete successfully.
Returns the profile strings."""
client = self.cluster.impalads[0].service.create_hs2_client()
expected_states = [FINISHED]
if allow_query_failure:
expected_states.append(ERROR)
try:
handles = []
profiles = []
client.set_configuration(config_options)
for query in queries:
handles.append(client.execute_async(query))
for query, handle in zip(queries, handles):
state = client.wait_for_any_impala_state(handle, expected_states, timeout_s)
if state == FINISHED:
self.client.fetch(query, handle)
profiles.append(client.get_runtime_profile(handle))
return profiles
finally:
for handle in handles:
client.close_query(handle)
client.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -610,7 +716,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
handle = self.client.execute_async(query.format(1))
self.client.wait_for_finished_timeout(handle, 1000)
expected_mem_limits = self.__get_mem_limits_admission_debug_page()
actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
actual_mem_limits = self.__get_mem_limits_memz_debug_page(
self.client.handle_id(handle))
mem_admitted =\
get_mem_admitted_backends_debug_page(self.cluster, self.get_ac_process())
debug_string = " expected_mem_limits:" + str(
@@ -670,7 +777,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def test_dedicated_coordinator_planner_estimates(self, vector, unique_database):
"""Planner tests to add coverage for coordinator estimates when using dedicated
coordinators. Also includes coverage for verifying cluster memory admitted."""
vector_copy = copy(vector)
vector_copy = deepcopy(vector)
exec_options = vector_copy.get_value('exec_option')
# Remove num_nodes from the options to allow test case runner to set it in one of
# the test cases.
@@ -756,17 +863,17 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
query = "select * from functional.alltypesagg, (select 1) B limit 1"
# Successfully run a query with mem limit equal to the lowest process memory among
# impalads
exec_options = copy(vector.get_value('exec_option'))
exec_options = deepcopy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
self.execute_query_expect_success(self.client, query, exec_options)
# Test that a query scheduled to run on a single node and submitted to the impalad
# with higher proc mem limit succeeds.
exec_options = copy(vector.get_value('exec_option'))
exec_options = deepcopy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
exec_options['num_nodes'] = "1"
self.execute_query_expect_success(self.client, query, exec_options)
# Exercise rejection checks in admission controller.
exec_options = copy(vector.get_value('exec_option'))
exec_options = deepcopy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
ex = self.execute_query_expect_failure(self.client, query, exec_options)
assert ("Rejected query from pool default-pool: request memory needed "
@@ -777,17 +884,18 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Wait for previous queries to finish to avoid flakiness.
for impalad in self.cluster.impalads:
impalad.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0)
impalad_with_2g_mem = self.cluster.impalads[2].service.create_beeswax_client()
impalad_with_2g_mem = self.cluster.impalads[2].service.create_client_from_vector(
vector)
impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
impalad_with_2g_mem.execute_async("select sleep(1000)")
# Wait for statestore update to update the mem admitted in each node.
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
exec_options = copy(vector.get_value('exec_option'))
wait_single_statestore_heartbeat()
exec_options = deepcopy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
# Since Queuing is synchronous, and we can't close the previous query till this
# returns, we wait for this to timeout instead.
self.execute_query(query, exec_options)
except ImpalaBeeswaxException as e:
except IMPALA_CONNECTION_EXCEPTION as e:
assert re.search(r"Queued reason: Not enough memory available on host \S+.Needed "
r"2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e)
finally:
@@ -800,11 +908,11 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
statestored_args=_STATESTORED_ARGS,
disable_log_buffering=True)
def test_cancellation(self):
def test_cancellation(self, vector):
""" Test to confirm that all Async cancellation windows are hit and are able to
successfully cancel the query"""
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
client = impalad.service.create_client_from_vector(vector)
try:
client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000")
client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
@@ -848,15 +956,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
client.set_configuration_option('enable_trivial_query_for_admission', 'false')
queued_query_handle = client.execute_async("select 5")
sleep(1)
assert client.get_state(queued_query_handle) == QueryState.COMPILED
assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
assert client.is_pending(queued_query_handle)
assert "Admission result: Queued" in client.get_runtime_profile(
queued_query_handle)
# Only cancel the queued query, because close will wait till it unregisters, this
# gives us a chance to close the running query and allow the dequeue thread to
# dequeue the queue query
client.cancel(queued_query_handle)
client.close_query(handle)
client.close_query(queued_query_handle)
queued_profile = client.get_runtime_profile(queued_query_handle)
queued_profile = client.close_query(queued_query_handle,
fetch_profile_after_close=True)
assert "Admission result: Cancelled (queued)" in queued_profile, queued_profile
self.assert_log_contains(
self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=")
@@ -866,11 +975,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
handle = client.execute_async("select sleep(10000)")
queued_query_handle = client.execute_async("select 6")
sleep(1)
assert client.get_state(queued_query_handle) == QueryState.COMPILED
assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
client.close_query(queued_query_handle)
assert client.is_pending(queued_query_handle)
assert "Admission result: Queued" in client.get_runtime_profile(
queued_query_handle)
queued_profile = client.close_query(queued_query_handle,
fetch_profile_after_close=True)
client.close_query(handle)
queued_profile = client.get_runtime_profile(queued_query_handle)
assert "Admission result: Cancelled (queued)" in queued_profile
for i in self.cluster.impalads:
i.service.wait_for_metric_value(
@@ -1000,7 +1110,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Run a bunch of queries with mem_limit set so that only one can be admitted
# immediately. The rest should be queued and dequeued (timeout) due to host memory
# pressure.
STMT = "select sleep(100)"
STMT = "select sleep(1000)"
TIMEOUT_S = 20
NUM_QUERIES = 5
# IMPALA-9856: Disable query result spooling so that we can run queries with low
@@ -1034,7 +1144,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Run a bunch of queries with mem_limit set so that only one can be admitted
# immediately. The rest should be queued and dequeued (timeout) due to pool memory
# pressure.
STMT = "select sleep(100)"
STMT = "select sleep(1000)"
TIMEOUT_S = 20
NUM_QUERIES = 5
# IMPALA-9856: Disable query result spooling so that we can run queries with low
@@ -1143,7 +1253,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.close_query(handle_running)
self.__assert_num_queries_accounted(0)
# Case 3: When a query gets rejected
exec_options = copy(vector.get_value('exec_option'))
exec_options = deepcopy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "1b"
self.execute_query_expect_failure(self.client, query, exec_options)
self.__assert_num_queries_accounted(0)
@@ -1211,6 +1321,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
pool=pool)
query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, USER_ROOT,
pool=pool)
wait_single_statestore_heartbeat()
keys = [
"admission-controller.agg-current-users.root.queueB",
"admission-controller.local-current-users.root.queueB",
@@ -1294,16 +1405,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
query_handles.append(query_handle)
# Let state sync across impalads.
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
wait_single_statestore_heartbeat()
# Another query should be rejected
impalad = self.cluster.impalads[limit % 2]
client = impalad.service.create_beeswax_client()
client = impalad.service.create_hs2_client()
client.set_configuration({'request_pool': pool})
try:
client.execute(SLOW_QUERY, user=user)
assert False, "query should fail"
except Exception as e:
except IMPALA_CONNECTION_EXCEPTION as e:
# Construct the expected error message.
expected = ("Rejected query from pool {pool}: current per-{type} load {limit} for "
"user '{user}'{group_description} is at or above the {err_type} limit "
@@ -1328,14 +1439,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def execute_async_and_wait_for_running(self, impalad, query, user, pool):
# Execute a query asynchronously, and wait for it to be running.
# Use beeswax client as it allows specifying the user that runs the query.
client = impalad.service.create_beeswax_client()
client = impalad.service.create_hs2_client()
client.set_configuration({'request_pool': pool})
handle = client.execute_async(query, user=user)
timeout_s = 10
# Make sure the query has been admitted and is running.
self.wait_for_state(
handle, client.QUERY_STATES['RUNNING'], timeout_s, client=client)
client.wait_for_impala_state(handle, RUNNING, timeout_s)
return self.ClientAndHandle(client, handle)
@pytest.mark.execute_serially
@@ -1375,8 +1484,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.client.close_query(sleep_query_handle)
# Observe that the queued query fails.
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
self.client.wait_for_impala_state(queued_query_handle, ERROR, 20),
self.client.close_query(queued_query_handle)
# Change the config back to a valid value
config.set_config_value(pool_name, config_str, 0)
@@ -1396,8 +1505,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.client.close_query(sleep_query_handle)
# Observe that the queued query fails.
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
self.client.wait_for_impala_state(queued_query_handle, ERROR, 20),
self.client.close_query(queued_query_handle)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -1405,7 +1514,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
pool_max_mem=1024 * 1024 * 1024),
statestored_args=_STATESTORED_ARGS)
def test_trivial_query(self):
self.client.execute("set enable_trivial_query_for_admission=false")
self.client.set_configuration_option("enable_trivial_query_for_admission", "false")
# Test the second request does need to queue when trivial query is disabled.
sleep_query_handle = self.client.execute_async("select sleep(10000)")
@@ -1417,7 +1526,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.client.close_query(sleep_query_handle)
self.client.close_query(trivial_query_handle)
self.client.execute("set enable_trivial_query_for_admission=true")
self.client.set_configuration_option("enable_trivial_query_for_admission", "true")
# Test when trivial query is enabled, all trivial queries should be
# admitted immediately.
sleep_query_handle = self.client.execute_async("select sleep(10000)")
@@ -1439,7 +1548,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Test whether it will fail for a normal query.
failed_query_handle = self.client.execute_async(
"select * from functional_parquet.alltypes limit 100")
self.wait_for_state(failed_query_handle, QueryState.EXCEPTION, 20)
self.client.wait_for_impala_state(failed_query_handle, ERROR, 20)
self.client.close_query(failed_query_handle)
# Test it should pass all the trivial queries.
self._test_trivial_queries_suc()
@@ -1463,7 +1572,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def _test_multi_trivial_query_runs(self):
timeout = 10
admit_obj = self.admit_obj
client = admit_obj.cluster.impalads[0].service.create_beeswax_client()
client = admit_obj.cluster.impalads[0].service.create_hs2_client()
for i in range(100):
handle = client.execute_async(self.sql)
if not self.expect_err:
@@ -1580,44 +1689,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
assert False, "Timed out waiting for change to profile\nSearch " \
"String: {0}\nProfile:\n{1}".format(search_string, str(profile))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024))
@needs_session()
def test_queuing_status_through_query_log_and_exec_summary(self):
"""Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
the query's queuing status, that is, whether the query was queued and what was the
latest queuing reason."""
# Start a long-running query.
long_query_resp = self.execute_statement("select sleep(10000)")
# Ensure that the query has started executing.
self.wait_for_admission_control(long_query_resp.operationHandle)
# Submit another query.
queued_query_resp = self.execute_statement("select sleep(1)")
# Wait until the query is queued.
self.wait_for_operation_state(queued_query_resp.operationHandle,
TCLIService.TOperationState.PENDING_STATE)
# Check whether the query log message correctly exposes the queuing status.
log = self.wait_for_log_message(
queued_query_resp.operationHandle, "Admission result :")
assert "Admission result : Queued" in log, log
assert "Latest admission queue reason : number of running queries 1 is at or over "
"limit 1" in log, log
# Now check the same for ExecSummary.
summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
summary_req.operationHandle = queued_query_resp.operationHandle
summary_req.sessionHandle = self.session_handle
exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
assert exec_summary_resp.summary.is_queued
assert "number of running queries 1 is at or over limit 1" in \
exec_summary_resp.summary.queued_reason,\
exec_summary_resp.summary.queued_reason
# Close the running query.
self.close(long_query_resp.operationHandle)
# Close the queued query.
self.close(queued_query_resp.operationHandle)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=(impalad_admission_ctrl_flags(
@@ -1743,12 +1814,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# state because of the 'WAIT' debug action), wait for the 'lineitem' scan to
# complete, and then validate that one of the executor backends shutdowns and
# releases its admitted memory.
self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], timeout)
self.client.wait_for_impala_state(handle, RUNNING, timeout)
# Once the 'lineitem' scan completes, NumCompletedBackends should be 1.
self.assert_eventually(60, 1, lambda: "NumCompletedBackends: 1 (1)"
in self.client.get_runtime_profile(handle))
get_num_completed_backends(self.cluster.impalads[0].service,
handle.get_handle().id) == 1
self.client.handle_id(handle)) == 1
mem_admitted =\
get_mem_admitted_backends_debug_page(self.cluster, self.get_ac_process())
num_executor_zero_admitted = 0
@@ -2012,13 +2083,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
def setup_method(self, method):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
start_args = "--enable_admission_service"
if START_ARGS in method.__dict__:
start_args = method.__dict__[START_ARGS] + " " + start_args
method.__dict__[START_ARGS] = start_args
if IMPALAD_ARGS in method.__dict__:
method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
self.enable_admission_service(method)
super(TestAdmissionControllerWithACService, self).setup_method(method)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2038,8 +2103,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
timeout_s = 10
# Make sure the query is through admission control before killing the admissiond. It
# should be unaffected and finish successfully.
self.wait_for_state(
before_kill_handle, self.client.QUERY_STATES['RUNNING'], timeout_s)
self.client.wait_for_impala_state(before_kill_handle, RUNNING, timeout_s)
self.cluster.admissiond.kill()
result = self.client.fetch(query, before_kill_handle)
assert result.data == ["730"]
@@ -2059,7 +2123,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
try:
result = self.client.fetch(query, no_restart_handle)
assert False, "Query should have failed"
except ImpalaBeeswaxException as e:
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Failed to admit query after waiting " in str(e)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@@ -2077,20 +2141,18 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
handle1 = self.execute_query_async(query)
timeout_s = 10
# Make sure the first query has been admitted.
self.wait_for_state(
handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
# Run another query. This query should be queued because only 1 query is allowed in
# the default pool.
handle2 = self.execute_query_async(query)
handle2 = self.client.execute_async(query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued")
# Cancel the first query. It's resources should be released and the second query
# should be admitted.
self.client.cancel(handle1)
self.client.close_query(handle1)
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@@ -2108,20 +2170,18 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
handle1 = self.execute_query_async(query)
timeout_s = 10
# Make sure the first query has been admitted.
self.wait_for_state(
handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
# Run another query. This query should be queued because the executor group only has 1
# slot.
handle2 = self.execute_query_async(query)
handle2 = self.client.execute_async(query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued")
# Cancel the first query. It's resources should be released and the second query
# should be admitted.
self.client.cancel(handle1)
self.client.close_query(handle1)
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@@ -2133,25 +2193,23 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
# Query designed to run for a few minutes.
query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
impalad1 = self.cluster.impalads[0]
client1 = impalad1.service.create_beeswax_client()
client1 = impalad1.service.create_hs2_client()
handle1 = client1.execute_async(query)
timeout_s = 10
# Make sure the first query has been admitted.
self.wait_for_state(
handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client1)
client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
# Run another query with a different coordinator. This query should be queued because
# only 1 query is allowed in the default pool.
impalad2 = self.cluster.impalads[1]
client2 = impalad2.service.create_beeswax_client()
client2 = impalad2.service.create_hs2_client()
handle2 = client2.execute_async(query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2)
# Kill the coordinator for the first query. The resources for the query should get
# cleaned up and the second query should be admitted.
impalad1.kill()
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2)
client2.wait_for_impala_state(handle2, RUNNING, timeout_s)
class TestAdmissionControllerStress(TestAdmissionControllerBase):
@@ -2185,15 +2243,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
submitting to a single impalad, we know exactly what the values should be,
otherwise we just check that they are within reasonable bounds.
"""
BATCH_SIZE = 100
@classmethod
def add_test_dimensions(cls):
super(TestAdmissionControllerStress, cls).add_test_dimensions()
# This test really need to run using beeswax client since hs2 client has not
# implemented some methods needed to query admission control status.
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
# Slow down test query by setting low batch_size.
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[10]))
cluster_sizes=[0], disable_codegen_options=[False],
batch_sizes=[cls.BATCH_SIZE]))
# Turning off result spooling allows us to better control query execution by
# controlling the number of rows fetched. This allows us to maintain resource
# usage among backends.
add_mandatory_exec_option(cls, 'spool_query_results', 0)
# Set 100ms long poling time to get faster initial response.
add_mandatory_exec_option(cls, 'long_polling_time_ms', 100)
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION))
cls.ImpalaTestMatrix.add_dimension(
@@ -2356,7 +2421,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
REQUEST_QUEUE_UPDATE_INTERVAL)['count']
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
wait_single_statestore_heartbeat()
LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
def wait_for_admitted_threads(self, num_threads):
@@ -2410,7 +2475,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
sleep(1)
class SubmitQueryThread(threading.Thread):
def __init__(self, impalad, additional_query_options, vector, query_num,
def __init__(self, impalad, vector, query_num,
query_end_behavior, executing_threads, exit_signal):
"""
executing_threads must be provided so that this thread can add itself when the
@@ -2418,8 +2483,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""
super(self.__class__, self).__init__()
self.executing_threads = executing_threads
self.vector = vector
self.additional_query_options = additional_query_options
# Make vector local to this thread, because run() will modify it later.
self.vector = deepcopy(vector)
self.query_num = query_num
self.query_end_behavior = query_end_behavior
self.impalad = impalad
@@ -2431,6 +2496,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Set by the main thread when tearing down
self.exit_signal = exit_signal
self.setDaemon(True)
# Determine how many rows to fetch per interval.
self.rows_per_fetch = TestAdmissionControllerStress.BATCH_SIZE
def thread_should_run(self):
return not self.exit_signal.is_set()
@@ -2444,20 +2511,12 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
if not self.thread_should_run():
return
exec_options = dict()
exec_options.update(self.vector.get_exec_option_dict())
exec_options.update(self.additional_query_options)
# Turning off result spooling allows us to better control query execution by
# controlling the number of rows fetched. This allows us to maintain resource
# usage among backends.
exec_options['spool_query_results'] = 0
exec_options['long_polling_time_ms'] = 100
if self.query_end_behavior == 'QUERY_TIMEOUT':
exec_options['query_timeout_s'] = QUERY_END_TIMEOUT_S
self.vector.set_exec_option('query_timeout_s', QUERY_END_TIMEOUT_S)
query = QUERY.format(self.query_num)
self.query_state = 'SUBMITTING'
client = self.impalad.service.create_beeswax_client()
client.set_configuration(exec_options)
assert self.vector.get_protocol() == HS2, "Must use hs2 protocol"
client = self.impalad.service.create_client_from_vector(self.vector)
LOG.info("Submitting query %s with ending behavior %s",
self.query_num, self.query_end_behavior)
@@ -2485,7 +2544,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
msg = "Admission result for query {} : {}".format(
self.query_num, admission_result)
self.log_handle(client, query_handle, msg)
except ImpalaBeeswaxException as e:
except IMPALA_CONNECTION_EXCEPTION as e:
LOG.exception(e)
raise e
@@ -2500,10 +2559,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# query. It needs to wait until the main thread requests it to end its query.
while self.thread_should_run() and self.query_state != 'COMPLETED':
# The query needs to stay active until the main thread requests it to end.
# Otherwise, the query may get cancelled early. Fetch 1 row every
# FETCH_INTERVAL to keep the query active.
fetch_result = client.fetch(query, query_handle, 1)
assert len(fetch_result.data) == 1, str(fetch_result)
# Otherwise, the query may get cancelled early. Fetch self.rows_per_fetch row
# every FETCH_INTERVAL to keep the query active.
fetch_result = client.fetch(query, query_handle, self.rows_per_fetch)
assert len(fetch_result.data) == self.rows_per_fetch, str(fetch_result)
self.num_rows_fetched += len(fetch_result.data)
if self.query_state == 'REQUEST_QUERY_END':
self._end_query(client, query, query_handle)
@@ -2537,20 +2596,19 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Sleep and wait for the query to be cancelled. The cancellation will
# set the state to EXCEPTION.
start_time = time()
while self.thread_should_run() and (
client.get_state(query_handle) != client.QUERY_STATES['EXCEPTION']):
while self.thread_should_run() and not client.is_error(query_handle):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
sleep(1)
# try fetch and confirm from exception message that query was timed out.
try:
client.fetch(query, query_handle)
# try fetch and confirm from exception message that query was timed out.
client.fetch(query, query_handle, discard_results=True)
assert False
except Exception as e:
except (Exception, ImpalaHiveServer2Service) as e:
assert 'expired due to client inactivity' in str(e)
elif self.query_end_behavior == 'EOS':
# Fetch all rows so we hit eos.
client.fetch(query, query_handle)
client.fetch(query, query_handle, discard_results=True)
elif self.query_end_behavior == 'CLIENT_CANCEL':
client.cancel(query_handle)
else:
@@ -2604,8 +2662,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
LOG.info("Found %s queued queries after %s seconds", actual_queued,
round(time() - start_time, 1))
def run_admission_test(self, vector, additional_query_options,
check_user_aggregates=False):
def run_admission_test(self, vector, check_user_aggregates=False):
LOG.info("Starting test case with parameters: %s", vector)
self.impalads = self.cluster.impalads
self.ac_processes = self.get_ac_processes()
@@ -2628,8 +2685,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
sleep(submission_delay_ms / 1000.0)
impalad = self.impalads[query_num % len(self.impalads)]
query_end_behavior = QUERY_END_BEHAVIORS[query_num % len(QUERY_END_BEHAVIORS)]
thread = self.SubmitQueryThread(impalad, additional_query_options, vector,
query_num, query_end_behavior, self.executing_threads, self.exit)
thread = self.SubmitQueryThread(impalad, vector, query_num, query_end_behavior,
self.executing_threads, self.exit)
thread.start()
self.all_threads.append(thread)
@@ -2747,11 +2804,12 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
self.pool_name = 'default-pool'
vector.set_exec_option('request_pool', self.pool_name)
vector.set_exec_option('mem_limit', sys.maxsize)
# The pool has no mem resources set, so submitting queries with huge mem_limits
# should be fine. This exercises the code that does the per-pool memory
# accounting (see MemTracker::GetPoolMemReserved()) without actually being throttled.
self.run_admission_test(vector, {'request_pool': self.pool_name,
'mem_limit': sys.maxsize})
self.run_admission_test(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -2761,7 +2819,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_configs(self, vector):
self.pool_name = 'root.queueB'
self.run_admission_test(vector, {'request_pool': self.pool_name})
vector.set_exec_option('request_pool', self.pool_name)
self.run_admission_test(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -2778,8 +2837,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Save time by running only 1 out of 6 vector combination.
pytest.skip('Only run with round_robin_submission=True and submission_delay_ms=0.')
self.pool_name = 'root.queueF'
self.run_admission_test(vector, {'request_pool': self.pool_name},
check_user_aggregates=True)
vector.set_exec_option('request_pool', self.pool_name)
self.run_admission_test(vector, check_user_aggregates=True)
def get_proc_limit(self):
"""Gets the process mem limit as reported by the impalad's mem-tracker metric.
@@ -2816,8 +2875,9 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# the mem limit.
num_impalads = len(self.cluster.impalads)
query_mem_limit = (proc_limit // MAX_NUM_CONCURRENT_QUERIES // num_impalads) - 1
self.run_admission_test(vector,
{'request_pool': self.pool_name, 'mem_limit': query_mem_limit})
vector.set_exec_option('request_pool', self.pool_name)
vector.set_exec_option('mem_limit', query_mem_limit)
self.run_admission_test(vector)
class TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
@@ -2833,11 +2893,5 @@ class TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
def setup_method(self, method):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
start_args = "--enable_admission_service"
if START_ARGS in method.__dict__:
start_args = method.__dict__[START_ARGS] + " " + start_args
method.__dict__[START_ARGS] = start_args
if IMPALAD_ARGS in method.__dict__:
method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
self.enable_admission_service(method)
super(TestAdmissionControllerStressWithACService, self).setup_method(method)

View File

@@ -179,9 +179,7 @@ class TestSessionExpiration(CustomClusterTestSuite):
self.close_impala_clients()
# Create 2 sessions.
client1 = impalad.service.create_hs2_client()
client1.execute_async("select sleep(5000)")
client2 = impalad.service.create_hs2_client()
client2.execute_async("select sleep(5000)")
try:
# Trying to open a third session should fail.
impalad.service.create_hs2_client()

View File

@@ -25,6 +25,7 @@ from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift_sasl import TSaslClientTransport
def create_transport(host, port, service, transport_type="buffered", user=None,
password=None, use_ssl=False, ssl_cert=None):
"""
@@ -78,4 +79,11 @@ def op_handle_to_query_id(t_op_handle):
if t_op_handle is None or t_op_handle.operationId is None:
return None
# This should use the same logic as in ImpalaServer::THandleIdentifierToTUniqueId().
return "%x:%x" % struct.unpack("QQ", t_op_handle.operationId.guid)
return "%016x:%016x" % struct.unpack("QQ", t_op_handle.operationId.guid)
def session_handle_to_session_id(t_session_op_handle):
if t_session_op_handle is None or t_session_op_handle.sessionId is None:
return None
# This should use the same logic as in ImpalaServer::THandleIdentifierToTUniqueId().
return "%016x:%016x" % struct.unpack("QQ", t_session_op_handle.sessionId.guid)