IMPALA-12602: Unregister queries on idle timeout

Queries cancelled due to idle_query_timeout/QUERY_TIMEOUT_S are now also
Unregistered to free any remaining memory, as you cannot fetch results
from a cancelled query.

Adds a new structure - idle_query_statuses_ - to retain Status messages
for queries closed this way so that we can continue to return a clear
error message if the client returns and requests query status or
attempts to fetch results. This structure must be global because HS2
server can only identify a session ID from a query handle, and the query
handle no longer exists. SessionState tracks queries added to
idle_query_statuses_ so they can be cleared when the session is closed.

Also ensures MarkInactive is called in ClientRequestState when Wait()
completes. Previously WaitInternal would only MarkInactive on success,
leaving any failed requests in an active state until explicitly closed
or the session ended.

The beeswax get_log RPC will not return the preserved error message or
any warnings for these queries. It's also possible the summary and
profile are rotated out of query log as the query is no longer inflight.
This is an acceptable outcome as a client will likely not look for a
log/summary/profile after it times out.

Testing:
- updates test_query_expiration to verify number of waiting queries is
  only non-zero for queries cancelled by EXEC_TIME_LIMIT_S and not yet
  closed as an idle query
- modified test_retry_query_timeout to use exec_time_limit_s because
  queries closed by idle_timeout_s don't work with get_exec_summary

Change-Id: Iacfc285ed3587892c7ec6f7df3b5f71c9e41baf0
Reviewed-on: http://gerrit.cloudera.org:8080/21074
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Michael Smith
2024-02-23 15:37:23 -08:00
committed by Impala Public Jenkins
parent effc9df933
commit f05eac6476
7 changed files with 140 additions and 52 deletions

View File

@@ -1155,6 +1155,10 @@ bool ClientRequestState::BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_
void ClientRequestState::Wait() {
// block until results are ready
Status status = WaitInternal();
// Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
// how long Impala waits for the client to fetch rows. For other statements, track the
// time until a Close() is received.
MarkInactive();
{
lock_guard<mutex> l(lock_);
if (returns_result_set()) {
@@ -1191,7 +1195,6 @@ void ClientRequestState::Wait() {
Status ClientRequestState::WaitInternal() {
// Explain requests have already populated the result set. Nothing to do here.
if (exec_request().stmt_type == TStmtType::EXPLAIN) {
MarkInactive();
return Status::OK();
}
@@ -1240,10 +1243,6 @@ Status ClientRequestState::WaitInternal() {
} else if (isCTAS) {
SetCreateTableAsSelectResultSet();
}
// Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
// how long Impala waits for the client to fetch rows. For other statements, track the
// time until a Close() is received.
MarkInactive();
return Status::OK();
}

View File

@@ -292,7 +292,14 @@ beeswax::QueryState::type ImpalaServer::get_state(
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
QueryHandle query_handle;
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
Status status = GetActiveQueryHandle(query_id, &query_handle);
// GetActiveQueryHandle may return the query's status from being cancelled. If not an
// invalid query handle, we can assume that error statuses reflect a query in the
// EXCEPTION state.
if (!status.ok() && status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
return beeswax::QueryState::EXCEPTION;
}
RAISE_IF_ERROR(status, SQLSTATE_GENERAL_ERROR);
// Validate that query can be accessed by user.
RAISE_IF_ERROR(CheckClientRequestSession(session.get(), query_handle->effective_user(),

View File

@@ -1428,6 +1428,16 @@ Status ImpalaServer::RegisterQuery(const TUniqueId& query_id,
return Status::OK();
}
static inline int32_t GetIdleTimeout(const TQueryOptions& query_options) {
int32_t idle_timeout_s = query_options.query_timeout_s;
if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
return min(FLAGS_idle_query_timeout, idle_timeout_s);
} else {
// Use a non-zero timeout, if one exists
return max(FLAGS_idle_query_timeout, idle_timeout_s);
}
}
Status ImpalaServer::SetQueryInflight(
shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
@@ -1459,13 +1469,7 @@ Status ImpalaServer::SetQueryInflight(
}
// If the query has a timeout or time limit, schedule checks.
int32_t idle_timeout_s = query_handle->query_options().query_timeout_s;
if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
} else {
// Use a non-zero timeout, if one exists
idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
}
int32_t idle_timeout_s = GetIdleTimeout(query_handle->query_options());
int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
@@ -1701,6 +1705,14 @@ Status ImpalaServer::GetActiveQueryHandle(
DCHECK(query_handle != nullptr);
shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id);
if (UNLIKELY(query_driver == nullptr)) {
{
lock_guard<mutex> l(idle_query_statuses_lock_);
auto it = idle_query_statuses_.find(query_id);
if (it != idle_query_statuses_.end()) {
return it->second;
}
}
Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
VLOG(1) << err.GetDetail();
return err;
@@ -1797,6 +1809,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
DecrementSessionCount(session_state->connected_user);
}
unordered_set<TUniqueId> inflight_queries;
vector<TUniqueId> idled_queries;
{
lock_guard<mutex> l(session_state->lock);
DCHECK(!session_state->closed);
@@ -1804,6 +1817,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
// Since closed is true, no more queries will be added to the inflight list.
inflight_queries.insert(session_state->inflight_queries.begin(),
session_state->inflight_queries.end());
idled_queries.swap(session_state->idled_queries);
}
// Unregister all open queries from this session.
Status status = Status::Expected("Session closed");
@@ -1811,6 +1825,12 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
// TODO: deal with an error status
UnregisterQueryDiscardResult(query_id, false, &status);
}
{
lock_guard<mutex> l(idle_query_statuses_lock_);
for (const TUniqueId& query_id: idled_queries) {
idle_query_statuses_.erase(query_id);
}
}
// Reconfigure the poll period of session_maintenance_thread_ if necessary.
UnregisterSessionTimeout(session_state->session_timeout);
VLOG_QUERY << "Closed session: " << PrintId(session_id)
@@ -2774,8 +2794,9 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
continue;
}
ClientRequestState* crs = query_driver->GetActiveClientRequestState();
if (crs->is_expired()) {
// Query was expired already from a previous expiration event.
if (crs->is_expired() && expiration_event->kind != ExpirationKind::IDLE_TIMEOUT) {
// Query was expired already from a previous expiration event. Keep idle
// timeouts as they will additionally unregister the query.
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
@@ -2812,13 +2833,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
// Now check to see if the idle timeout has expired. We must check the actual
// expiration time in case the query has updated 'last_active_ms' since the last
// time we looked.
int32_t idle_timeout_s = crs->query_options().query_timeout_s;
if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
} else {
// Use a non-zero timeout, if one exists
idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
}
int32_t idle_timeout_s = GetIdleTimeout(crs->query_options());
int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
if (now < expiration) {
// If the real expiration date is in the future we may need to re-insert the
@@ -2840,10 +2855,29 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
VLOG_QUERY << "Expiring query due to client inactivity: "
<< PrintId(expiration_event->query_id) << ", last activity was at: "
<< ToStringFromUnixMillis(crs->last_active_ms());
ExpireQuery(crs,
Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
PrintId(expiration_event->query_id),
PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S)));
const Status status = Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
PrintId(expiration_event->query_id),
PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
// Save status so we can report it for unregistered queries.
Status preserved_status;
{
lock_guard<mutex> l(*crs->lock());
preserved_status = crs->query_status();
}
preserved_status.MergeStatus(status);
{
shared_ptr<SessionState> session = crs->session();
lock_guard<mutex> l(session->lock);
if (!session->closed) {
lock_guard<mutex> l(idle_query_statuses_lock_);
idle_query_statuses_.emplace(
expiration_event->query_id, move(preserved_status));
session->idled_queries.emplace_back(expiration_event->query_id);
}
}
ExpireQuery(crs, status, true);
expiration_event = queries_by_timestamp_.erase(expiration_event);
} else {
// Iterator is moved on in every other branch.
@@ -2972,12 +3006,18 @@ Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
return Status::OK();
}
void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status,
bool unregister) {
DCHECK(!status.ok());
cancellation_thread_pool_->Offer(
CancellationWork::TerminatedByServer(crs->query_id(), status, false));
ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
crs->set_expired();
CancellationWork::TerminatedByServer(crs->query_id(), status, unregister));
if (crs->is_expired()) {
// Should only be re-entrant if we're now unregistering the query.
DCHECK(unregister);
} else {
ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
crs->set_expired();
}
}
Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,

View File

@@ -185,10 +185,11 @@ class TQueryExecRequest;
/// 1. session_state_map_lock_
/// 2. SessionState::lock
/// 3. query_expiration_lock_
/// 4. ClientRequestState::fetch_rows_lock
/// 5. ClientRequestState::lock
/// 6. ClientRequestState::expiration_data_lock_
/// 7. Coordinator::exec_summary_lock
/// 4. idle_query_statuses_lock_
/// 5. ClientRequestState::fetch_rows_lock
/// 6. ClientRequestState::lock
/// 7. ClientRequestState::expiration_data_lock_
/// 8. Coordinator::exec_summary_lock
///
/// The following locks are not held in conjunction with other locks:
/// * query_log_lock_
@@ -657,6 +658,9 @@ class ImpalaServer : public ImpalaServiceIf,
/// inflight_queries. In that case we add it to prestopped_queries instead.
std::set<TUniqueId> prestopped_queries;
/// Unregistered queries we need to clear from idle_query_statuses_ on closure.
std::vector<TUniqueId> idled_queries;
/// Total number of queries run as part of this session.
int64_t total_queries;
@@ -1153,8 +1157,9 @@ class ImpalaServer : public ImpalaServiceIf,
/// check should be rescheduled for a later time.
Status CheckResourceLimits(ClientRequestState* crs);
/// Expire 'crs' and cancel it with status 'status'.
void ExpireQuery(ClientRequestState* crs, const Status& status);
/// Expire 'crs' and cancel it with status 'status'. Optionally unregisters the query.
void ExpireQuery(ClientRequestState* crs, const Status& status,
bool unregister = false);
typedef boost::unordered_map<std::string, boost::unordered_set<std::string>>
AuthorizedProxyMap;
@@ -1423,6 +1428,13 @@ class ImpalaServer : public ImpalaServiceIf,
typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>> SessionStateMap;
SessionStateMap session_state_map_;
/// Protects idle_query_statuses_;
std::mutex idle_query_statuses_lock_;
/// A map of queries that were stopped due to idle timeout and the status they had when
/// unregistered. Used to return a more useful error when looking up unregistered IDs.
std::map<TUniqueId, Status> idle_query_statuses_;
/// Protects connection_to_sessions_map_. See "Locking" in the class comment for lock
/// acquisition order.
std::mutex connection_to_sessions_map_lock_;

View File

@@ -126,8 +126,10 @@ Trying to re-register with state-store</codeblock>
<codeph>--idle_query_timeout</codeph> disables query timeouts.
</p>
<p>
Cancelled queries remain in the open state but use only the
minimal resources.
Cancelled queries are closed, but the client can still fetch their exception
details. If the query was in a good state when cancelled, it will present an
error like "Query 6f49e509bfa5b347:207d8ef900000000 expired due to client
inactivity", otherwise it will show the relevant error.
</p>
</li>

View File

@@ -29,18 +29,21 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestQueryExpiration(CustomClusterTestSuite):
"""Tests query expiration logic"""
def _check_num_executing(self, impalad, expected):
def _check_num_executing(self, impalad, expected, expect_waiting=0):
in_flight_queries = impalad.service.get_in_flight_queries()
# Guard against too few in-flight queries.
assert expected <= len(in_flight_queries)
actual = 0
actual = waiting = 0
for query in in_flight_queries:
if query["executing"]:
actual += 1
else:
assert query["waiting"]
assert actual == expected, '%s out of %s queries with expected (%s) status' \
waiting += 1
assert actual == expected, '%s out of %s queries executing (expected %s)' \
% (actual, len(in_flight_queries), expected)
assert waiting == expect_waiting, '%s out of %s queries waiting (expected %s)' \
% (waiting, len(in_flight_queries), expect_waiting)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=8 --logbuflevel=-1")
@@ -58,7 +61,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
# This query will hit a lower time limit.
client.execute("SET EXEC_TIME_LIMIT_S=3")
time_limit_expire_handle = client.execute_async(query1);
time_limit_expire_handle = client.execute_async(query1)
handles.append(time_limit_expire_handle)
# This query will hit a lower idle timeout instead of the default timeout or time
@@ -75,30 +78,46 @@ class TestQueryExpiration(CustomClusterTestSuite):
handles.append(default_timeout_expire_handle2)
self._check_num_executing(impalad, len(handles))
# Run a query that fails, and will timeout due to client inactivity.
client.execute("SET QUERY_TIMEOUT_S=1")
client.execute('SET MEM_LIMIT=1')
exception_handle = client.execute_async("select count(*) from functional.alltypes")
client.execute('SET MEM_LIMIT=1g')
handles.append(exception_handle)
before = time()
sleep(4)
# Queries with timeout or time limit of 1 should have expired, other queries should
# Queries with timeout or time limit < 4 should have expired, other queries should
# still be running.
assert num_expired + 2 == impalad.service.get_metric_value(
assert num_expired + 3 == impalad.service.get_metric_value(
'impala-server.num-queries-expired')
assert (client.get_state(short_timeout_expire_handle) ==
client.QUERY_STATES['EXCEPTION'])
assert (client.get_state(time_limit_expire_handle) ==
client.QUERY_STATES['EXCEPTION'])
assert (client.get_state(exception_handle) == client.QUERY_STATES['EXCEPTION'])
assert (client.get_state(default_timeout_expire_handle) ==
client.QUERY_STATES['FINISHED'])
assert (client.get_state(default_timeout_expire_handle2) ==
client.QUERY_STATES['FINISHED'])
# The query cancelled by exec_time_limit_s should be waiting to be closed.
self._check_num_executing(impalad, 2, 1)
self.__expect_expired(client, query1, short_timeout_expire_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 3s000ms\)")
r"Query [0-9a-f]+:[0-9a-f]+ expired due to "
+ r"client inactivity \(timeout is 3s000ms\)")
self.__expect_expired(client, query1, time_limit_expire_handle,
"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 3s000ms")
r"Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 3s000ms")
self.__expect_expired(client, query1, exception_handle,
r"minimum memory reservation is greater than memory available.*\nQuery "
+ r"[0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 1s000ms\)")
self._check_num_executing(impalad, 2)
# Both queries with query_timeout_s < 4 should generate this message.
self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: "
"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d")
r"[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d",
2)
self.assert_impalad_log_contains('INFO',
"Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 3s")
r"Expiring query [0-9a-f]+:[0-9a-f]+ due to execution time limit of 3s")
# Wait until the remaining queries expire. The time limit query will have hit
# expirations but only one should be counted.
@@ -124,14 +143,23 @@ class TestQueryExpiration(CustomClusterTestSuite):
# Confirm that no extra expirations happened
assert impalad.service.get_metric_value('impala-server.num-queries-expired') \
== len(handles)
== num_expired + len(handles)
self._check_num_executing(impalad, 0)
for handle in handles:
try:
client.close_query(handle)
assert False, "Close should always throw an exception"
except Exception as e:
# We fetched from some cancelled handles above, which unregistered the queries.
assert 'Invalid or unknown query handle' in str(e)
# Expired queries always return their exception, so will not be invalid/unknown.
if handle is time_limit_expire_handle:
assert 'Invalid or unknown query handle' in str(e)
else:
if handle is exception_handle:
# Error should return original failure and timeout message
assert 'minimum memory reservation is greater than memory available' in str(e)
assert re.search(
r'Query [0-9a-f]{16}:[0-9a-f]{16} expired due to client inactivity', str(e))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--idle_query_timeout=0")

View File

@@ -886,7 +886,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.cluster.impalads[1].kill()
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
query_options={'retry_failed_queries': 'true', 'exec_time_limit_s': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
# Validate the live exec summary.
@@ -908,7 +908,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.client.fetch(query, handle)
assert False
except Exception as e:
assert "expired due to client inactivity" in str(e)
assert "expired due to execution time limit of 1s000ms" in str(e)
# Assert that the impalad metrics show one expired query.
assert impalad_service.get_metric_value('impala-server.num-queries-expired') == 1