IMPALA-13115: Add query id to error messages

This patch adds the query id to the error messages in both

- the result of the `get_log()` RPC, and
- the error message in an RPC response

before they are returned to the client, so that the users can easily
figure out the errored queries on the client side.

To achieve this, the query id of the thread debug info is set in the
RPC handler method, and is retrieved from the thread debug info each
time the error reporting function or `get_log()` gets called.

Due to the change of the error message format, some checks in the
impala-shell.py are adapted to keep them valid.

Testing:
- Added helper function `error_msg_expected()` to check whether an
  error message is expected. It is stricter than only using the `in`
  operator.
- Added helper function `error_msg_equal()` to check if two error
  messages are equal regardless of the query ids.
- Various test cases are adapted to match the new error message format.
- `ImpalaBeeswaxException`, which is used in tests only, is simplified
  so that it has the same error message format as the exceptions for
  HS2.
- Added an assertion to the case of killing and restarting a worker
  in the custom cluster test to ensure that the query id is in
  the error message in the client log retrieved with `get_log()`.

Change-Id: I67e659681e36162cad1d9684189106f8eedbf092
Reviewed-on: http://gerrit.cloudera.org:8080/21587
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Xuebin Su
2024-07-16 15:52:41 +08:00
committed by Impala Public Jenkins
parent b1941c8f17
commit ad868b9947
26 changed files with 224 additions and 62 deletions

View File

@@ -739,7 +739,10 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
Status status = executor_->Exec(stmt, &result_types);
status = executor_->FetchResult(&result_row);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(EndsWith(status.msg().msg(), error_string)) << "Actual: '"
// Ignore the tailing '\n' characters when matching
string actual_msg = boost::trim_right_copy(status.msg().msg());
string expected_msg = boost::trim_right_copy(error_string);
ASSERT_TRUE(EndsWith(actual_msg, expected_msg)) << "Actual: '"
<< status.msg().msg() << "'" << endl << "Expected: '" << error_string << "'";
}

View File

@@ -18,6 +18,7 @@
#include "service/impala-server.h"
#include "common/logging.h"
#include "common/thread-debug-info.h"
#include "gen-cpp/Frontend_types.h"
#include "rpc/thrift-util.h"
#include "runtime/coordinator.h"
@@ -73,6 +74,9 @@ void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& quer
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
// start thread to wait for results to become available, which will allow
// us to advance query state to FINISHED or EXCEPTION
Status status = query_handle->WaitAsync();
@@ -121,6 +125,9 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
Status status = SetQueryInflight(session, query_handle);
@@ -184,6 +191,9 @@ void ImpalaServer::fetch(Results& query_results,
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -231,6 +241,9 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
// Make query id available to the following RAISE_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -278,6 +291,10 @@ void ImpalaServer::close(const beeswax::QueryHandle& beeswax_handle) {
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
// TODO: do we need to raise an exception if the query state is EXCEPTION?
// TODO: use timeout to get rid of unwanted query_handle.
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
}
@@ -291,6 +308,9 @@ beeswax::QueryState::type ImpalaServer::get_state(
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
Status status = GetActiveQueryHandle(query_id, &query_handle);
// GetActiveQueryHandle may return the query's status from being cancelled. If not an
@@ -336,6 +356,9 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
TUniqueId query_id;
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -365,7 +388,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
!query_handle->query_status().ok());
// If the query status is !ok, include the status error message at the top of the log.
if (!query_handle->query_status().ok()) {
error_log_ss << query_handle->query_status().GetDetail() << "\n";
error_log_ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
query_handle->query_status().GetDetail());
}
}
@@ -408,6 +432,9 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
TUniqueId query_id;
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
// Impala-shell and administrative tools can call this from a different connection,
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
// proceed without validating the session/query relation so that workflows don't
@@ -427,6 +454,9 @@ void ImpalaServer::CloseInsert(TDmlResult& dml_result,
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
// CloseInsertInternal() will validates that 'session' has access to 'query_id'.
Status status = CloseInsertInternal(session.get(), query_id, &dml_result);
if (!status.ok()) {
@@ -454,6 +484,9 @@ void ImpalaServer::GetRuntimeProfile(
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
// If the query was retried, fetch the profile for the most recent attempt of the query
// The original query profile should still be accessible via the web ui.
QueryHandle query_handle;
@@ -489,6 +522,10 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
TUniqueId query_id;
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
// Make query id available to the following RaiseBeeswaxException().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
// GetExecSummary() will validate that the user has access to 'query_id'.
Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
@@ -580,7 +617,10 @@ inline void ImpalaServer::BeeswaxHandleToTUniqueId(
[[noreturn]] void ImpalaServer::RaiseBeeswaxException(
const string& msg, const char* sql_state) {
BeeswaxException exc;
exc.__set_message(msg);
exc.__set_message(GetThreadDebugInfo()->GetQueryId() == TUniqueId() ?
(msg) :
Substitute(ImpalaServer::QUERY_ERROR_FORMAT,
PrintId(GetThreadDebugInfo()->GetQueryId()), (msg)));
exc.__set_SQLState(sql_state);
throw exc;
}

View File

@@ -69,10 +69,17 @@ const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
// HiveServer2 error returning macro
//
// To include query id in the error message, it is required that the query id of the
// thread debug info is set in at least the caller's scope.
#define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
do { \
return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
return_val.status.__set_errorMessage((error_msg)); \
return_val.status.__set_errorMessage( \
GetThreadDebugInfo()->GetQueryId() == TUniqueId() ? \
(error_msg) : \
Substitute(ImpalaServer::QUERY_ERROR_FORMAT, \
PrintId(GetThreadDebugInfo()->GetQueryId()), (error_msg))); \
return_val.status.__set_sqlState((error_state)); \
return; \
} while (false)
@@ -157,7 +164,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
Status register_status = RegisterQuery(query_ctx.query_id, session, &query_handle);
if (!register_status.ok()) {
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
status->__set_errorMessage(register_status.GetDetail());
status->__set_errorMessage(Substitute(
QUERY_ERROR_FORMAT, PrintId(query_ctx.query_id), register_status.GetDetail()));
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
return;
}
@@ -166,7 +174,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
if (!exec_status.ok()) {
discard_result(UnregisterQuery(query_handle->query_id(), false, &exec_status));
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
status->__set_errorMessage(exec_status.GetDetail());
status->__set_errorMessage(Substitute(
QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()), exec_status.GetDetail()));
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
return;
}
@@ -177,7 +186,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
if (!inflight_status.ok()) {
discard_result(UnregisterQuery(query_handle->query_id(), false, &inflight_status));
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
status->__set_errorMessage(inflight_status.GetDetail());
status->__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
PrintId(query_handle->query_id()), inflight_status.GetDetail()));
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
return;
}
@@ -606,6 +616,10 @@ void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
QueryHandle query_handle;
status = Execute(&query_ctx, session, &query_handle, external_exec_request);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
// Start thread to wait for results to become available.
@@ -875,6 +889,9 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
SQLSTATE_GENERAL_ERROR);
VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -893,7 +910,8 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
return_val.__set_operationState(operation_state);
if (operation_state == TOperationState::ERROR_STATE) {
DCHECK(!query_handle->query_status().ok());
return_val.__set_errorMessage(query_handle->query_status().GetDetail());
return_val.__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
PrintId(query_id), query_handle->query_status().GetDetail()));
return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
} else {
ClientRequestState::RetryState retry_state = query_handle->retry_state();
@@ -914,6 +932,9 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
SQLSTATE_GENERAL_ERROR);
VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -945,6 +966,9 @@ void ImpalaServer::CloseImpalaOperation(TCloseImpalaOperationResp& return_val,
SQLSTATE_GENERAL_ERROR);
VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -977,6 +1001,9 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
SQLSTATE_GENERAL_ERROR);
VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
@@ -1028,6 +1055,8 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
<< " fetch_size=" << request.maxRows;
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
@@ -1072,6 +1101,8 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
request.operationHandle.operationId, &query_id, &op_secret),
SQLSTATE_GENERAL_ERROR);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
QueryHandle query_handle;
HS2_RETURN_IF_ERROR(
@@ -1113,7 +1144,10 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
DCHECK_EQ(query_handle->exec_state() == ClientRequestState::ExecState::ERROR,
!query_status.ok());
// If the query status is !ok, include the status error message at the top of the log.
if (!query_status.ok()) ss << query_status.GetDetail();
if (!query_status.ok()) {
ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
query_status.GetDetail());
}
}
// Report analysis errors
@@ -1166,6 +1200,9 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
request.operationHandle.operationId, &query_id, &op_secret),
SQLSTATE_GENERAL_ERROR);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
TExecSummary summary;
TExecSummary original_summary;
bool was_retried = false;
@@ -1252,6 +1289,9 @@ void ImpalaServer::GetRuntimeProfile(
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
// Make query id available to the following HS2_RETURN_IF_ERROR().
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
// Set the RuntimeProfileOutput for the retried query (e.g. the second attempt of a
// query). If the query has been retried this will be the retried profile. If the
// query has not been retried then all entries will be nullptr.

View File

@@ -458,6 +458,7 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
const char* ImpalaServer::QUERY_ERROR_FORMAT = "Query $0 failed:\n$1\n";
// Interval between checks for query expiration.
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
@@ -1255,7 +1256,6 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
QueryHandle* query_handle, const TExecRequest* external_exec_request,
const bool include_in_query_log) {
PrepareQueryContext(query_ctx);
ScopedThreadContext debug_ctx(GetThreadDebugInfo(), query_ctx->query_id);
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
// Redact the SQL stmt and update the query context

View File

@@ -733,6 +733,9 @@ class ImpalaServer : public ImpalaServiceIf,
static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
/// String format of retry information returned in GetLog() RPCs.
static const char* GET_LOG_QUERY_RETRY_INFO_FORMAT;
/// String format of errors related to a query. It contains a placeholder for
/// the query id.
static const char* QUERY_ERROR_FORMAT;
/// Used in situations where the client provides a session ID and a query ID and the
/// caller needs to validate that the query can be accessed from the session. The two
@@ -1010,6 +1013,9 @@ class ImpalaServer : public ImpalaServiceIf,
const beeswax::QueryHandle& beeswax_handle, TUniqueId* query_id);
/// Helper function to raise BeeswaxException
///
/// To include query id in the error message, it is required that the query id of the
/// thread debug info is set in at least the caller's scope.
[[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
/// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.

View File

@@ -208,7 +208,7 @@ public class LdapHS2Test {
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
verifyMetrics(5, 0);
assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
assertEquals(cancelResp.getStatus().getErrorMessage(), expectedError);
assertTrue(cancelResp.getStatus().getErrorMessage().contains(expectedError));
// Open another session which will get username 'Test2Ldap'.
TOpenSessionReq openReq2 = new TOpenSessionReq();

View File

@@ -712,7 +712,9 @@ class ImpalaShell(cmd.Cmd, object):
except Exception as e:
# Suppress harmless errors.
err_msg = str(e).strip()
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle']:
# Check twice so that it can work with both the old and the new error formats.
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle'] or \
('\nCancelled' in err_msg or '\nInvalid or unknown query handle' in err_msg):
break
err_details = "Failed to reconnect and close (try %i/%i): %s"
print(err_details % (cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg),
@@ -813,8 +815,7 @@ class ImpalaShell(cmd.Cmd, object):
try:
summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
except RPCException as e:
import re
error_pattern = re.compile("ERROR: Query id [a-f0-9]+:[a-f0-9]+ not found.")
error_pattern = re.compile("Query id [a-f0-9]+:[a-f0-9]+ not found.")
if error_pattern.match(e.value):
print("Could not retrieve summary for query.", file=sys.stderr)
else:

View File

@@ -14,7 +14,7 @@ INT
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
insert into insertonly_acid values (42);
---- CATCH
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
====
---- QUERY
select * from insertonly_acid;
@@ -28,7 +28,7 @@ INT
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
insert into insertonly_acid values (42);
---- CATCH
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
====
---- QUERY
select * from insertonly_acid;
@@ -58,7 +58,7 @@ INT,INT
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
---- CATCH
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
====
---- QUERY
select p, n from part;
@@ -73,7 +73,7 @@ INT,INT
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
---- CATCH
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
====
---- QUERY
select p, n from part;

View File

@@ -94,7 +94,7 @@ STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
# Avro table with missing two column definitions.
compute stats avro_hive_alltypes_missing_coldef
---- CATCH
MESSAGE: AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
Definition of column 'smallint_col' of type 'smallint' does not match the Avro-schema column 'tinyint_col' of type 'INT' at position '2'.
Please re-create the table with column definitions, e.g., using the result of 'SHOW CREATE TABLE'
====

View File

@@ -3044,12 +3044,12 @@ timestamp
---- QUERY
select to_timestamp('01:01:01', 'HH:mm:ss');
---- CATCH
Query aborted:Bad date/time conversion format: HH:mm:ss
Bad date/time conversion format: HH:mm:ss
====
---- QUERY
select to_timestamp('01:01:01.123', 'HH:mm:ss.SSS');
---- CATCH
Query aborted:Bad date/time conversion format: HH:mm:ss
Bad date/time conversion format: HH:mm:ss
====
---- QUERY
select from_unixtime(1382337792,"HH:mm:ss.SSSSSSSSS");

View File

@@ -39,6 +39,7 @@ from thrift.protocol import TBinaryProtocol
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import assert_file_in_dir_contains,\
assert_no_files_in_dir_contain
from tests.common.test_result_verifier import error_msg_equal
from tests.common.skip import SkipIf
@@ -131,7 +132,7 @@ class TestAuthorization(CustomClusterTestSuite):
exc1_str = str(self.execute_query_expect_failure(self.client, query, user=user))
exc2_str = str(self.execute_query_expect_failure(self.client, query_non_existent,
user=user))
assert exc1_str == exc2_str
assert error_msg_equal(exc1_str, exc2_str)
assert "AuthorizationException" in exc1_str
assert "does not have privileges to access"

View File

@@ -51,7 +51,6 @@ LOG = logging.getLogger('impala_beeswax')
# Custom exception wrapper.
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
# __str__ preserves the exception type.
# TODO: Add the ability to print some of the stack.
class ImpalaBeeswaxException(Exception):
__name__ = "ImpalaBeeswaxException"
@@ -60,7 +59,7 @@ class ImpalaBeeswaxException(Exception):
self.inner_exception = inner_exception
def __str__(self):
return "%s:\n %s" % (self.__name__, self.__message)
return self.__message
class ImpalaBeeswaxResult(object):
def __init__(self, **kwargs):
@@ -402,7 +401,7 @@ class ImpalaBeeswaxClient(object):
try:
error_log = self.__do_rpc(
lambda: self.imp_service.get_log(query_handle.log_context))
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
raise ImpalaBeeswaxException(error_log, None)
finally:
self.close_query(query_handle)
time.sleep(0.05)
@@ -420,7 +419,7 @@ class ImpalaBeeswaxClient(object):
try:
error_log = self.__do_rpc(
lambda: self.imp_service.get_log(query_handle.log_context))
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
raise ImpalaBeeswaxException(error_log, None)
finally:
self.close_query(query_handle)
time.sleep(0.05)
@@ -523,7 +522,7 @@ class ImpalaBeeswaxClient(object):
message = str(exception)
if isinstance(exception, BeeswaxService.BeeswaxException):
message = exception.message
return 'INNER EXCEPTION: %s\n MESSAGE: %s' % (exception.__class__, message)
return message
def __do_rpc(self, rpc):
"""Executes the RPC lambda provided with some error checking.

View File

@@ -55,6 +55,7 @@ from tests.common.test_dimensions import (
get_dataset_from_workload,
load_table_info_dimension)
from tests.common.test_result_verifier import (
error_msg_expected,
try_compile_regex,
verify_lineage,
verify_raw_results,
@@ -738,6 +739,7 @@ class ImpalaTestSuite(BaseTestSuite):
except Exception as e:
if 'CATCH' in test_section:
self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
assert error_msg_expected(str(e)) # Only checks if message contains query id
continue
raise

View File

@@ -797,3 +797,43 @@ def assert_codegen_cache_hit(profile_string, expect_hit):
assert "NumCachedFunctions: 0 " not in profile_string
else:
assert "NumCachedFunctions: 0 " in profile_string
# A query id consists of two 64-bit non-zero hex numbers connected with a ":".
QUERY_ID_REGEX = r"(?!0{16})[0-9a-z]{16}:(?!0{16})[0-9a-z]{16}"
def error_msg_expected(actual_msg, expected_msg="", query_id=None):
"""
Check if the actual error message is expected.
As defined in `ImpalaServer::QUERY_ERROR_FORMAT`, an error message is expected to
has the following form:
Query <query_id> failed:\n<error_detail>\n
- For `query_id`,
- If a query id is specified in the parameter, it checks if the actual error
message contains exactly the query id.
- Otherwise, it checks whether `query_id` match the format using a
regular expresssion.
- For `error_detail`, it checks whether this part starts with the `expected_msg` if it
is specified in the parameter. This is sufficient to distinguish one kind of error
from another.
"""
if query_id is None:
ERROR_REGEX = "^Query " + QUERY_ID_REGEX + " failed:\n"
m = re.search(ERROR_REGEX, actual_msg)
if m is None:
return False
return actual_msg.find(expected_msg, m.end()) != -1
# The beginning of `ImpalaServer::QUERY_ERROR_FORMAT`
ERROR_PROMPT = "Query {} failed:\n{}"
return actual_msg.startswith(ERROR_PROMPT.format(query_id, expected_msg))
def error_msg_equal(msg1, msg2):
"""Check if two error messages are equal ignoring the query ids."""
return re.sub(QUERY_ID_REGEX, "<query_id>", msg1) == \
re.sub(QUERY_ID_REGEX, "<query_id>", msg2)

View File

@@ -26,6 +26,7 @@ from subprocess import check_call
from tests.util.filesystem_utils import get_fs_path
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIf, SkipIfFS
from tests.common.test_result_verifier import error_msg_expected
LOG = logging.getLogger('test_coordinators')
LOG.setLevel(level=logging.DEBUG)
@@ -320,10 +321,9 @@ class TestCoordinators(CustomClusterTestSuite):
# Pick a non-trivial query that needs to be scheduled on executors.
query = "select count(*) from functional.alltypes where month + random() < 3"
result = self.execute_query_expect_failure(self.client, query)
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
"pool default-pool. Queued reason: Waiting for executors to " \
"start."
assert expected_error in str(result)
expected_error = "Admission for query exceeded timeout 2000ms in pool " \
"default-pool. Queued reason: Waiting for executors to start."
assert error_msg_expected(str(result), expected_error)
# Now pick a coordinator only query.
query = "select 1"
self.execute_query_expect_success(self.client, query)

View File

@@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function
from builtins import range
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.test_result_verifier import error_msg_expected
from tests.util.concurrent_workload import ConcurrentWorkload
import json
@@ -530,12 +531,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
# Run query to make sure it times out
result = self.execute_query_expect_failure(self.client, TEST_QUERY)
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
expected_error = "Admission for query exceeded timeout 2000ms in " \
"pool default-pool. Queued reason: Waiting for executors to " \
"start. Only DDL queries and queries scheduled only on the " \
"coordinator (either NUM_NODES set to 1 or when small query " \
"optimization is triggered) can currently run."
assert expected_error in str(result)
assert error_msg_expected(str(result), expected_error)
assert self._get_num_executor_groups(only_healthy=True) == 0
@pytest.mark.execute_serially

View File

@@ -28,6 +28,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.kudu_test_suite import KuduTestSuite
from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
from tests.common.test_dimensions import add_mandatory_exec_option
from tests.common.test_result_verifier import error_msg_expected
from tests.util.event_processor_utils import EventProcessorUtils
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
@@ -432,19 +433,19 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
self.execute_query(self._update_query.format(table_name))
assert False, "query was expected to fail"
except ImpalaBeeswaxException as e:
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
try:
self.execute_query(self._upsert_query.format(table_name))
assert False, "query was expected to fail"
except ImpalaBeeswaxException as e:
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
try:
self.execute_query(self._delete_query.format(table_name))
assert False, "query was expected to fail"
except ImpalaBeeswaxException as e:
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
# Verify that number of rows has not been changed.
cursor.execute(self._row_num_query.format(table_name))

View File

@@ -23,6 +23,7 @@ from beeswaxd.BeeswaxService import QueryState
from tests.common.custom_cluster_test_suite import (
DEFAULT_CLUSTER_SIZE,
CustomClusterTestSuite)
from tests.common.test_result_verifier import error_msg_expected
# The exact query doesn't matter much for these tests, just want a query that touches
# data on all nodes.
@@ -153,6 +154,7 @@ class TestProcessFailures(CustomClusterTestSuite):
query_id = handle.get_handle().id
error_state = "Failed due to unreachable impalad"
assert impalad.service.wait_for_query_status(client, query_id, error_state)
assert error_msg_expected(client.get_log(handle), error_state, query_id)
# Assert that the query status on the query profile web page contains the expected
# failed hostport.

View File

@@ -187,8 +187,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
impalad.service.create_hs2_client()
assert False, "should have failed"
except Exception as e:
assert re.match(r"Number of sessions for user \S+ exceeds coordinator limit 2",
str(e)), "Unexpected exception: " + str(e)
assert re.match(r".*Number of sessions for user \S+ exceeds coordinator limit 2",
str(e), re.DOTALL), "Unexpected exception: " + str(e)
# Test webui for hs2 sessions.
res = impalad.service.get_debug_webpage_json("/sessions")

View File

@@ -20,6 +20,7 @@ from __future__ import absolute_import, division, print_function
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_result_verifier import error_msg_expected
class TestSysDb(CustomClusterTestSuite):
@@ -57,6 +58,8 @@ class TestSysDb(CustomClusterTestSuite):
assert False, "table '{0}' should have failed to create but was created" \
.format(table_name)
except ImpalaBeeswaxException as e:
assert "Query aborted:IllegalStateException: Can't create blacklisted table: {0}" \
.format(table_name) in str(e), "table '{0}' failed to create but for the " \
"wrong reason".format(table_name)
expected_error = "IllegalStateException: Can't create blacklisted table: {0}" \
.format(table_name)
assert error_msg_expected(str(e), expected_error), \
"table '{0}' failed to create but for the wrong reason:\n{1}\n" \
.format(table_name, str(e))

View File

@@ -26,6 +26,7 @@ from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
from tests.common.test_result_verifier import error_msg_expected
from time import sleep, time
@@ -136,10 +137,11 @@ class HS2TestSuite(ImpalaTestSuite):
def check_response(response,
expected_status_code = TCLIService.TStatusCode.SUCCESS_STATUS,
expected_error_prefix = None):
assert response.status.statusCode == expected_status_code
assert response.status.statusCode == expected_status_code, str(response)
if expected_status_code != TCLIService.TStatusCode.SUCCESS_STATUS\
and expected_error_prefix is not None:
assert response.status.errorMessage.startswith(expected_error_prefix)
assert response.status.errorMessage.startswith(expected_error_prefix) or \
error_msg_expected(response.status.errorMessage, expected_error_prefix)
@staticmethod
def check_invalid_session(response):

View File

@@ -84,7 +84,7 @@ class TestRewrittenFile(ImpalaTestSuite):
# Metadata for file '...' appears stale. Try running "refresh
# unique_database_name.new_file_shorter" to reload the file metadata.
# IMPALA-2512: Error message could also be something like
# Query aborted:Disk I/O error on ...:27001: Error seeking ...
# Disk I/O error on ...:27001: Error seeking ...
# between 0 and ... for '...'
# TODO: find a better way to detect stale file meta and remove skip markers.
table_name = "new_file_shorter"

View File

@@ -39,6 +39,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.iceberg_test_suite import IcebergTestSuite
from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
from tests.common.test_dimensions import add_exec_option_dimension
from tests.common.test_result_verifier import error_msg_expected
from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet)
@@ -1378,7 +1379,7 @@ class TestIcebergTable(IcebergTestSuite):
query_options=abort_ice_transaction_options)
# Check that the error message looks reasonable.
result = str(err)
assert "Query aborted:CommitFailedException: simulated commit failure" in result
assert error_msg_expected(result, "CommitFailedException: simulated commit failure")
# Check that no data was inserted.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(tbl_name))
@@ -1393,7 +1394,8 @@ class TestIcebergTable(IcebergTestSuite):
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
ddl_result = str(ddl_err)
# Check that the error message looks reasonable.
assert "Query aborted:CommitFailedException: simulated commit failure" in ddl_result
assert error_msg_expected(ddl_result,
"CommitFailedException: simulated commit failure")
# Check that no column was added.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(tbl_name))

View File

@@ -39,10 +39,12 @@ from tests.common.skip import SkipIf
from tests.common.test_dimensions import (
create_client_protocol_dimension, create_client_protocol_strict_dimension,
create_uncompressed_text_dimension, create_single_exec_option_dimension)
from tests.common.test_result_verifier import error_msg_expected
from time import sleep, time
from tests.shell.util import (get_impalad_host_port, assert_var_substitution,
run_impala_shell_cmd, ImpalaShell, build_shell_env, wait_for_query_state,
create_impala_shell_executable_dimension, get_impala_shell_executable)
create_impala_shell_executable_dimension, get_impala_shell_executable,
stderr_get_first_error_msg)
from contextlib import closing
@@ -274,8 +276,11 @@ class TestImpalaShell(ImpalaTestSuite):
args = ['-q', 'set abort_on_error=true;'
'select id from functional_parquet.bad_column_metadata t']
result = run_impala_shell_cmd(vector, args, expect_success=False)
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr
assert 'but read 10 values from column id.' in result.stderr
assert error_msg_expected(
stderr_get_first_error_msg(result.stderr),
"Column metadata states there are 11 values, but read 10 values from column id."
)
def test_completed_query_errors_2(self, vector):
if vector.get_value('strict_hs2_protocol'):
@@ -284,9 +289,10 @@ class TestImpalaShell(ImpalaTestSuite):
'select id, cnt from functional_parquet.bad_column_metadata t, '
'(select 1 cnt) u']
result = run_impala_shell_cmd(vector, args, expect_success=False)
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr,\
result.stderr
assert 'but read 10 values from column id.' in result.stderr, result.stderr
assert error_msg_expected(
stderr_get_first_error_msg(result.stderr),
"Column metadata states there are 11 values, but read 10 values from column id."
)
def test_no_warnings_in_log_with_quiet_mode(self, vector):
if vector.get_value('strict_hs2_protocol'):

View File

@@ -46,9 +46,11 @@ from tests.common.skip import SkipIfLocal
from tests.common.test_dimensions import (
create_client_protocol_dimension, create_client_protocol_strict_dimension,
create_uncompressed_text_dimension, create_single_exec_option_dimension)
from tests.common.test_result_verifier import error_msg_expected
from tests.shell.util import (assert_var_substitution, ImpalaShell, get_impalad_port,
get_shell_cmd, get_open_sessions_metric, spawn_shell, get_unused_port,
create_impala_shell_executable_dimension, get_impala_shell_executable)
create_impala_shell_executable_dimension, get_impala_shell_executable,
stderr_get_first_error_msg)
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
@@ -1161,8 +1163,10 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
assert "ParseException" in result.stderr,\
result.stderr
else:
assert "ERROR: ParseException: Unmatched string literal" in result.stderr,\
result.stderr
assert error_msg_expected(
stderr_get_first_error_msg(result.stderr),
"ParseException: Unmatched string literal"
)
def test_utf8_error_message(self, vector):
if vector.get_value('strict_hs2_protocol'):
@@ -1172,8 +1176,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
query = "select cast(now() as string format 'yyyy年MM月dd日')"
shell.send_cmd(query)
result = shell.get_result()
assert "ERROR: Bad date/time conversion format: yyyy年MM月dd日" in result.stderr,\
result.stderr
assert error_msg_expected(
stderr_get_first_error_msg(result.stderr),
"Bad date/time conversion format: yyyy年MM月dd日"
)
def test_timezone_validation(self, vector):
"""Test that query option TIMEZONE is validated when executing a query.

View File

@@ -400,3 +400,9 @@ def get_impala_shell_executable(vector):
'python2': [os.path.join(IMPALA_HOME, 'shell/build/python2_venv/bin/impala-shell')],
'python3': [os.path.join(IMPALA_HOME, 'shell/build/python3_venv/bin/impala-shell')]
}[vector.get_value_with_default('impala_shell', 'dev')]
def stderr_get_first_error_msg(stderr):
"""Seek to the begining of the first error message in stderr of impala-shell."""
PROMPT = "\nERROR: "
return stderr[(stderr.index(PROMPT) + len(PROMPT)):]