mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13115: Add query id to error messages
This patch adds the query id to the error messages in both - the result of the `get_log()` RPC, and - the error message in an RPC response before they are returned to the client, so that the users can easily figure out the errored queries on the client side. To achieve this, the query id of the thread debug info is set in the RPC handler method, and is retrieved from the thread debug info each time the error reporting function or `get_log()` gets called. Due to the change of the error message format, some checks in the impala-shell.py are adapted to keep them valid. Testing: - Added helper function `error_msg_expected()` to check whether an error message is expected. It is stricter than only using the `in` operator. - Added helper function `error_msg_equal()` to check if two error messages are equal regardless of the query ids. - Various test cases are adapted to match the new error message format. - `ImpalaBeeswaxException`, which is used in tests only, is simplified so that it has the same error message format as the exceptions for HS2. - Added an assertion to the case of killing and restarting a worker in the custom cluster test to ensure that the query id is in the error message in the client log retrieved with `get_log()`. Change-Id: I67e659681e36162cad1d9684189106f8eedbf092 Reviewed-on: http://gerrit.cloudera.org:8080/21587 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
b1941c8f17
commit
ad868b9947
@@ -739,7 +739,10 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
|
||||
Status status = executor_->Exec(stmt, &result_types);
|
||||
status = executor_->FetchResult(&result_row);
|
||||
ASSERT_FALSE(status.ok());
|
||||
ASSERT_TRUE(EndsWith(status.msg().msg(), error_string)) << "Actual: '"
|
||||
// Ignore the tailing '\n' characters when matching
|
||||
string actual_msg = boost::trim_right_copy(status.msg().msg());
|
||||
string expected_msg = boost::trim_right_copy(error_string);
|
||||
ASSERT_TRUE(EndsWith(actual_msg, expected_msg)) << "Actual: '"
|
||||
<< status.msg().msg() << "'" << endl << "Expected: '" << error_string << "'";
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "service/impala-server.h"
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/thread-debug-info.h"
|
||||
#include "gen-cpp/Frontend_types.h"
|
||||
#include "rpc/thrift-util.h"
|
||||
#include "runtime/coordinator.h"
|
||||
@@ -73,6 +74,9 @@ void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& quer
|
||||
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
||||
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||
|
||||
// start thread to wait for results to become available, which will allow
|
||||
// us to advance query state to FINISHED or EXCEPTION
|
||||
Status status = query_handle->WaitAsync();
|
||||
@@ -121,6 +125,9 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
|
||||
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
||||
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||
|
||||
// Once the query is running do a final check for session closure and add it to the
|
||||
// set of in-flight queries.
|
||||
Status status = SetQueryInflight(session, query_handle);
|
||||
@@ -184,6 +191,9 @@ void ImpalaServer::fetch(Results& query_results,
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
@@ -231,6 +241,9 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following RAISE_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
@@ -278,6 +291,10 @@ void ImpalaServer::close(const beeswax::QueryHandle& beeswax_handle) {
|
||||
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
|
||||
// TODO: do we need to raise an exception if the query state is EXCEPTION?
|
||||
// TODO: use timeout to get rid of unwanted query_handle.
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
|
||||
}
|
||||
|
||||
@@ -291,6 +308,9 @@ beeswax::QueryState::type ImpalaServer::get_state(
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
Status status = GetActiveQueryHandle(query_id, &query_handle);
|
||||
// GetActiveQueryHandle may return the query's status from being cancelled. If not an
|
||||
@@ -336,6 +356,9 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
|
||||
TUniqueId query_id;
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
@@ -365,7 +388,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
|
||||
!query_handle->query_status().ok());
|
||||
// If the query status is !ok, include the status error message at the top of the log.
|
||||
if (!query_handle->query_status().ok()) {
|
||||
error_log_ss << query_handle->query_status().GetDetail() << "\n";
|
||||
error_log_ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
|
||||
query_handle->query_status().GetDetail());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,6 +432,9 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
|
||||
TUniqueId query_id;
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
// Impala-shell and administrative tools can call this from a different connection,
|
||||
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
|
||||
// proceed without validating the session/query relation so that workflows don't
|
||||
@@ -427,6 +454,9 @@ void ImpalaServer::CloseInsert(TDmlResult& dml_result,
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
// CloseInsertInternal() will validates that 'session' has access to 'query_id'.
|
||||
Status status = CloseInsertInternal(session.get(), query_id, &dml_result);
|
||||
if (!status.ok()) {
|
||||
@@ -454,6 +484,9 @@ void ImpalaServer::GetRuntimeProfile(
|
||||
|
||||
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
// If the query was retried, fetch the profile for the most recent attempt of the query
|
||||
// The original query profile should still be accessible via the web ui.
|
||||
QueryHandle query_handle;
|
||||
@@ -489,6 +522,10 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
|
||||
TUniqueId query_id;
|
||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||
VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following RaiseBeeswaxException().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
// GetExecSummary() will validate that the user has access to 'query_id'.
|
||||
Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
|
||||
if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
|
||||
@@ -580,7 +617,10 @@ inline void ImpalaServer::BeeswaxHandleToTUniqueId(
|
||||
[[noreturn]] void ImpalaServer::RaiseBeeswaxException(
|
||||
const string& msg, const char* sql_state) {
|
||||
BeeswaxException exc;
|
||||
exc.__set_message(msg);
|
||||
exc.__set_message(GetThreadDebugInfo()->GetQueryId() == TUniqueId() ?
|
||||
(msg) :
|
||||
Substitute(ImpalaServer::QUERY_ERROR_FORMAT,
|
||||
PrintId(GetThreadDebugInfo()->GetQueryId()), (msg)));
|
||||
exc.__set_SQLState(sql_state);
|
||||
throw exc;
|
||||
}
|
||||
|
||||
@@ -69,10 +69,17 @@ const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
|
||||
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
|
||||
|
||||
// HiveServer2 error returning macro
|
||||
//
|
||||
// To include query id in the error message, it is required that the query id of the
|
||||
// thread debug info is set in at least the caller's scope.
|
||||
#define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
|
||||
do { \
|
||||
return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
|
||||
return_val.status.__set_errorMessage((error_msg)); \
|
||||
return_val.status.__set_errorMessage( \
|
||||
GetThreadDebugInfo()->GetQueryId() == TUniqueId() ? \
|
||||
(error_msg) : \
|
||||
Substitute(ImpalaServer::QUERY_ERROR_FORMAT, \
|
||||
PrintId(GetThreadDebugInfo()->GetQueryId()), (error_msg))); \
|
||||
return_val.status.__set_sqlState((error_state)); \
|
||||
return; \
|
||||
} while (false)
|
||||
@@ -157,7 +164,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
||||
Status register_status = RegisterQuery(query_ctx.query_id, session, &query_handle);
|
||||
if (!register_status.ok()) {
|
||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||
status->__set_errorMessage(register_status.GetDetail());
|
||||
status->__set_errorMessage(Substitute(
|
||||
QUERY_ERROR_FORMAT, PrintId(query_ctx.query_id), register_status.GetDetail()));
|
||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||
return;
|
||||
}
|
||||
@@ -166,7 +174,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
||||
if (!exec_status.ok()) {
|
||||
discard_result(UnregisterQuery(query_handle->query_id(), false, &exec_status));
|
||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||
status->__set_errorMessage(exec_status.GetDetail());
|
||||
status->__set_errorMessage(Substitute(
|
||||
QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()), exec_status.GetDetail()));
|
||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||
return;
|
||||
}
|
||||
@@ -177,7 +186,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
||||
if (!inflight_status.ok()) {
|
||||
discard_result(UnregisterQuery(query_handle->query_id(), false, &inflight_status));
|
||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||
status->__set_errorMessage(inflight_status.GetDetail());
|
||||
status->__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
|
||||
PrintId(query_handle->query_id()), inflight_status.GetDetail()));
|
||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||
return;
|
||||
}
|
||||
@@ -606,6 +616,10 @@ void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
|
||||
|
||||
QueryHandle query_handle;
|
||||
status = Execute(&query_ctx, session, &query_handle, external_exec_request);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||
|
||||
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
// Start thread to wait for results to become available.
|
||||
@@ -875,6 +889,9 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
@@ -893,7 +910,8 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
|
||||
return_val.__set_operationState(operation_state);
|
||||
if (operation_state == TOperationState::ERROR_STATE) {
|
||||
DCHECK(!query_handle->query_status().ok());
|
||||
return_val.__set_errorMessage(query_handle->query_status().GetDetail());
|
||||
return_val.__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
|
||||
PrintId(query_id), query_handle->query_status().GetDetail()));
|
||||
return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||
} else {
|
||||
ClientRequestState::RetryState retry_state = query_handle->retry_state();
|
||||
@@ -914,6 +932,9 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
@@ -945,6 +966,9 @@ void ImpalaServer::CloseImpalaOperation(TCloseImpalaOperationResp& return_val,
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
@@ -977,6 +1001,9 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||
@@ -1028,6 +1055,8 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
|
||||
VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
|
||||
<< " fetch_size=" << request.maxRows;
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
@@ -1072,6 +1101,8 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
|
||||
request.operationHandle.operationId, &query_id, &op_secret),
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
QueryHandle query_handle;
|
||||
HS2_RETURN_IF_ERROR(
|
||||
@@ -1113,7 +1144,10 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
|
||||
DCHECK_EQ(query_handle->exec_state() == ClientRequestState::ExecState::ERROR,
|
||||
!query_status.ok());
|
||||
// If the query status is !ok, include the status error message at the top of the log.
|
||||
if (!query_status.ok()) ss << query_status.GetDetail();
|
||||
if (!query_status.ok()) {
|
||||
ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
|
||||
query_status.GetDetail());
|
||||
}
|
||||
}
|
||||
|
||||
// Report analysis errors
|
||||
@@ -1166,6 +1200,9 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
|
||||
request.operationHandle.operationId, &query_id, &op_secret),
|
||||
SQLSTATE_GENERAL_ERROR);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
TExecSummary summary;
|
||||
TExecSummary original_summary;
|
||||
bool was_retried = false;
|
||||
@@ -1252,6 +1289,9 @@ void ImpalaServer::GetRuntimeProfile(
|
||||
|
||||
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
||||
|
||||
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||
|
||||
// Set the RuntimeProfileOutput for the retried query (e.g. the second attempt of a
|
||||
// query). If the query has been retried this will be the retried profile. If the
|
||||
// query has not been retried then all entries will be nullptr.
|
||||
|
||||
@@ -458,6 +458,7 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
|
||||
|
||||
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
|
||||
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
|
||||
const char* ImpalaServer::QUERY_ERROR_FORMAT = "Query $0 failed:\n$1\n";
|
||||
|
||||
// Interval between checks for query expiration.
|
||||
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
|
||||
@@ -1255,7 +1256,6 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
|
||||
QueryHandle* query_handle, const TExecRequest* external_exec_request,
|
||||
const bool include_in_query_log) {
|
||||
PrepareQueryContext(query_ctx);
|
||||
ScopedThreadContext debug_ctx(GetThreadDebugInfo(), query_ctx->query_id);
|
||||
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
||||
|
||||
// Redact the SQL stmt and update the query context
|
||||
|
||||
@@ -733,6 +733,9 @@ class ImpalaServer : public ImpalaServiceIf,
|
||||
static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
|
||||
/// String format of retry information returned in GetLog() RPCs.
|
||||
static const char* GET_LOG_QUERY_RETRY_INFO_FORMAT;
|
||||
/// String format of errors related to a query. It contains a placeholder for
|
||||
/// the query id.
|
||||
static const char* QUERY_ERROR_FORMAT;
|
||||
|
||||
/// Used in situations where the client provides a session ID and a query ID and the
|
||||
/// caller needs to validate that the query can be accessed from the session. The two
|
||||
@@ -1010,6 +1013,9 @@ class ImpalaServer : public ImpalaServiceIf,
|
||||
const beeswax::QueryHandle& beeswax_handle, TUniqueId* query_id);
|
||||
|
||||
/// Helper function to raise BeeswaxException
|
||||
///
|
||||
/// To include query id in the error message, it is required that the query id of the
|
||||
/// thread debug info is set in at least the caller's scope.
|
||||
[[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
|
||||
|
||||
/// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
|
||||
|
||||
@@ -208,7 +208,7 @@ public class LdapHS2Test {
|
||||
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
|
||||
verifyMetrics(5, 0);
|
||||
assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
|
||||
assertEquals(cancelResp.getStatus().getErrorMessage(), expectedError);
|
||||
assertTrue(cancelResp.getStatus().getErrorMessage().contains(expectedError));
|
||||
|
||||
// Open another session which will get username 'Test2Ldap'.
|
||||
TOpenSessionReq openReq2 = new TOpenSessionReq();
|
||||
|
||||
@@ -712,7 +712,9 @@ class ImpalaShell(cmd.Cmd, object):
|
||||
except Exception as e:
|
||||
# Suppress harmless errors.
|
||||
err_msg = str(e).strip()
|
||||
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle']:
|
||||
# Check twice so that it can work with both the old and the new error formats.
|
||||
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle'] or \
|
||||
('\nCancelled' in err_msg or '\nInvalid or unknown query handle' in err_msg):
|
||||
break
|
||||
err_details = "Failed to reconnect and close (try %i/%i): %s"
|
||||
print(err_details % (cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg),
|
||||
@@ -813,8 +815,7 @@ class ImpalaShell(cmd.Cmd, object):
|
||||
try:
|
||||
summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
|
||||
except RPCException as e:
|
||||
import re
|
||||
error_pattern = re.compile("ERROR: Query id [a-f0-9]+:[a-f0-9]+ not found.")
|
||||
error_pattern = re.compile("Query id [a-f0-9]+:[a-f0-9]+ not found.")
|
||||
if error_pattern.match(e.value):
|
||||
print("Could not retrieve summary for query.", file=sys.stderr)
|
||||
else:
|
||||
|
||||
@@ -14,7 +14,7 @@ INT
|
||||
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
||||
insert into insertonly_acid values (42);
|
||||
---- CATCH
|
||||
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||
====
|
||||
---- QUERY
|
||||
select * from insertonly_acid;
|
||||
@@ -28,7 +28,7 @@ INT
|
||||
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
||||
insert into insertonly_acid values (42);
|
||||
---- CATCH
|
||||
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||
====
|
||||
---- QUERY
|
||||
select * from insertonly_acid;
|
||||
@@ -58,7 +58,7 @@ INT,INT
|
||||
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
||||
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
||||
---- CATCH
|
||||
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||
====
|
||||
---- QUERY
|
||||
select p, n from part;
|
||||
@@ -73,7 +73,7 @@ INT,INT
|
||||
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
||||
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
||||
---- CATCH
|
||||
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||
====
|
||||
---- QUERY
|
||||
select p, n from part;
|
||||
|
||||
@@ -94,7 +94,7 @@ STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
|
||||
# Avro table with missing two column definitions.
|
||||
compute stats avro_hive_alltypes_missing_coldef
|
||||
---- CATCH
|
||||
MESSAGE: AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
|
||||
AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
|
||||
Definition of column 'smallint_col' of type 'smallint' does not match the Avro-schema column 'tinyint_col' of type 'INT' at position '2'.
|
||||
Please re-create the table with column definitions, e.g., using the result of 'SHOW CREATE TABLE'
|
||||
====
|
||||
|
||||
@@ -3044,12 +3044,12 @@ timestamp
|
||||
---- QUERY
|
||||
select to_timestamp('01:01:01', 'HH:mm:ss');
|
||||
---- CATCH
|
||||
Query aborted:Bad date/time conversion format: HH:mm:ss
|
||||
Bad date/time conversion format: HH:mm:ss
|
||||
====
|
||||
---- QUERY
|
||||
select to_timestamp('01:01:01.123', 'HH:mm:ss.SSS');
|
||||
---- CATCH
|
||||
Query aborted:Bad date/time conversion format: HH:mm:ss
|
||||
Bad date/time conversion format: HH:mm:ss
|
||||
====
|
||||
---- QUERY
|
||||
select from_unixtime(1382337792,"HH:mm:ss.SSSSSSSSS");
|
||||
|
||||
@@ -39,6 +39,7 @@ from thrift.protocol import TBinaryProtocol
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.file_utils import assert_file_in_dir_contains,\
|
||||
assert_no_files_in_dir_contain
|
||||
from tests.common.test_result_verifier import error_msg_equal
|
||||
from tests.common.skip import SkipIf
|
||||
|
||||
|
||||
@@ -131,7 +132,7 @@ class TestAuthorization(CustomClusterTestSuite):
|
||||
exc1_str = str(self.execute_query_expect_failure(self.client, query, user=user))
|
||||
exc2_str = str(self.execute_query_expect_failure(self.client, query_non_existent,
|
||||
user=user))
|
||||
assert exc1_str == exc2_str
|
||||
assert error_msg_equal(exc1_str, exc2_str)
|
||||
assert "AuthorizationException" in exc1_str
|
||||
assert "does not have privileges to access"
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ LOG = logging.getLogger('impala_beeswax')
|
||||
|
||||
# Custom exception wrapper.
|
||||
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
|
||||
# __str__ preserves the exception type.
|
||||
# TODO: Add the ability to print some of the stack.
|
||||
class ImpalaBeeswaxException(Exception):
|
||||
__name__ = "ImpalaBeeswaxException"
|
||||
@@ -60,7 +59,7 @@ class ImpalaBeeswaxException(Exception):
|
||||
self.inner_exception = inner_exception
|
||||
|
||||
def __str__(self):
|
||||
return "%s:\n %s" % (self.__name__, self.__message)
|
||||
return self.__message
|
||||
|
||||
class ImpalaBeeswaxResult(object):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -402,7 +401,7 @@ class ImpalaBeeswaxClient(object):
|
||||
try:
|
||||
error_log = self.__do_rpc(
|
||||
lambda: self.imp_service.get_log(query_handle.log_context))
|
||||
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
|
||||
raise ImpalaBeeswaxException(error_log, None)
|
||||
finally:
|
||||
self.close_query(query_handle)
|
||||
time.sleep(0.05)
|
||||
@@ -420,7 +419,7 @@ class ImpalaBeeswaxClient(object):
|
||||
try:
|
||||
error_log = self.__do_rpc(
|
||||
lambda: self.imp_service.get_log(query_handle.log_context))
|
||||
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
|
||||
raise ImpalaBeeswaxException(error_log, None)
|
||||
finally:
|
||||
self.close_query(query_handle)
|
||||
time.sleep(0.05)
|
||||
@@ -523,7 +522,7 @@ class ImpalaBeeswaxClient(object):
|
||||
message = str(exception)
|
||||
if isinstance(exception, BeeswaxService.BeeswaxException):
|
||||
message = exception.message
|
||||
return 'INNER EXCEPTION: %s\n MESSAGE: %s' % (exception.__class__, message)
|
||||
return message
|
||||
|
||||
def __do_rpc(self, rpc):
|
||||
"""Executes the RPC lambda provided with some error checking.
|
||||
|
||||
@@ -55,6 +55,7 @@ from tests.common.test_dimensions import (
|
||||
get_dataset_from_workload,
|
||||
load_table_info_dimension)
|
||||
from tests.common.test_result_verifier import (
|
||||
error_msg_expected,
|
||||
try_compile_regex,
|
||||
verify_lineage,
|
||||
verify_raw_results,
|
||||
@@ -738,6 +739,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
except Exception as e:
|
||||
if 'CATCH' in test_section:
|
||||
self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
|
||||
assert error_msg_expected(str(e)) # Only checks if message contains query id
|
||||
continue
|
||||
raise
|
||||
|
||||
|
||||
@@ -797,3 +797,43 @@ def assert_codegen_cache_hit(profile_string, expect_hit):
|
||||
assert "NumCachedFunctions: 0 " not in profile_string
|
||||
else:
|
||||
assert "NumCachedFunctions: 0 " in profile_string
|
||||
|
||||
|
||||
# A query id consists of two 64-bit non-zero hex numbers connected with a ":".
|
||||
QUERY_ID_REGEX = r"(?!0{16})[0-9a-z]{16}:(?!0{16})[0-9a-z]{16}"
|
||||
|
||||
|
||||
def error_msg_expected(actual_msg, expected_msg="", query_id=None):
|
||||
"""
|
||||
Check if the actual error message is expected.
|
||||
|
||||
As defined in `ImpalaServer::QUERY_ERROR_FORMAT`, an error message is expected to
|
||||
has the following form:
|
||||
|
||||
Query <query_id> failed:\n<error_detail>\n
|
||||
|
||||
- For `query_id`,
|
||||
- If a query id is specified in the parameter, it checks if the actual error
|
||||
message contains exactly the query id.
|
||||
- Otherwise, it checks whether `query_id` match the format using a
|
||||
regular expresssion.
|
||||
- For `error_detail`, it checks whether this part starts with the `expected_msg` if it
|
||||
is specified in the parameter. This is sufficient to distinguish one kind of error
|
||||
from another.
|
||||
"""
|
||||
if query_id is None:
|
||||
ERROR_REGEX = "^Query " + QUERY_ID_REGEX + " failed:\n"
|
||||
m = re.search(ERROR_REGEX, actual_msg)
|
||||
if m is None:
|
||||
return False
|
||||
return actual_msg.find(expected_msg, m.end()) != -1
|
||||
|
||||
# The beginning of `ImpalaServer::QUERY_ERROR_FORMAT`
|
||||
ERROR_PROMPT = "Query {} failed:\n{}"
|
||||
return actual_msg.startswith(ERROR_PROMPT.format(query_id, expected_msg))
|
||||
|
||||
|
||||
def error_msg_equal(msg1, msg2):
|
||||
"""Check if two error messages are equal ignoring the query ids."""
|
||||
return re.sub(QUERY_ID_REGEX, "<query_id>", msg1) == \
|
||||
re.sub(QUERY_ID_REGEX, "<query_id>", msg2)
|
||||
|
||||
@@ -26,6 +26,7 @@ from subprocess import check_call
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import SkipIf, SkipIfFS
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
|
||||
LOG = logging.getLogger('test_coordinators')
|
||||
LOG.setLevel(level=logging.DEBUG)
|
||||
@@ -320,10 +321,9 @@ class TestCoordinators(CustomClusterTestSuite):
|
||||
# Pick a non-trivial query that needs to be scheduled on executors.
|
||||
query = "select count(*) from functional.alltypes where month + random() < 3"
|
||||
result = self.execute_query_expect_failure(self.client, query)
|
||||
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
|
||||
"pool default-pool. Queued reason: Waiting for executors to " \
|
||||
"start."
|
||||
assert expected_error in str(result)
|
||||
expected_error = "Admission for query exceeded timeout 2000ms in pool " \
|
||||
"default-pool. Queued reason: Waiting for executors to start."
|
||||
assert error_msg_expected(str(result), expected_error)
|
||||
# Now pick a coordinator only query.
|
||||
query = "select 1"
|
||||
self.execute_query_expect_success(self.client, query)
|
||||
|
||||
@@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function
|
||||
from builtins import range
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from tests.util.concurrent_workload import ConcurrentWorkload
|
||||
|
||||
import json
|
||||
@@ -530,12 +531,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
|
||||
# Run query to make sure it times out
|
||||
result = self.execute_query_expect_failure(self.client, TEST_QUERY)
|
||||
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
|
||||
expected_error = "Admission for query exceeded timeout 2000ms in " \
|
||||
"pool default-pool. Queued reason: Waiting for executors to " \
|
||||
"start. Only DDL queries and queries scheduled only on the " \
|
||||
"coordinator (either NUM_NODES set to 1 or when small query " \
|
||||
"optimization is triggered) can currently run."
|
||||
assert expected_error in str(result)
|
||||
assert error_msg_expected(str(result), expected_error)
|
||||
assert self._get_num_executor_groups(only_healthy=True) == 0
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
|
||||
@@ -28,6 +28,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.kudu_test_suite import KuduTestSuite
|
||||
from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
|
||||
from tests.common.test_dimensions import add_mandatory_exec_option
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from tests.util.event_processor_utils import EventProcessorUtils
|
||||
|
||||
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
|
||||
@@ -432,19 +433,19 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
|
||||
self.execute_query(self._update_query.format(table_name))
|
||||
assert False, "query was expected to fail"
|
||||
except ImpalaBeeswaxException as e:
|
||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
||||
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||
|
||||
try:
|
||||
self.execute_query(self._upsert_query.format(table_name))
|
||||
assert False, "query was expected to fail"
|
||||
except ImpalaBeeswaxException as e:
|
||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
||||
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||
|
||||
try:
|
||||
self.execute_query(self._delete_query.format(table_name))
|
||||
assert False, "query was expected to fail"
|
||||
except ImpalaBeeswaxException as e:
|
||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
||||
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||
|
||||
# Verify that number of rows has not been changed.
|
||||
cursor.execute(self._row_num_query.format(table_name))
|
||||
|
||||
@@ -23,6 +23,7 @@ from beeswaxd.BeeswaxService import QueryState
|
||||
from tests.common.custom_cluster_test_suite import (
|
||||
DEFAULT_CLUSTER_SIZE,
|
||||
CustomClusterTestSuite)
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
|
||||
# The exact query doesn't matter much for these tests, just want a query that touches
|
||||
# data on all nodes.
|
||||
@@ -153,6 +154,7 @@ class TestProcessFailures(CustomClusterTestSuite):
|
||||
query_id = handle.get_handle().id
|
||||
error_state = "Failed due to unreachable impalad"
|
||||
assert impalad.service.wait_for_query_status(client, query_id, error_state)
|
||||
assert error_msg_expected(client.get_log(handle), error_state, query_id)
|
||||
|
||||
# Assert that the query status on the query profile web page contains the expected
|
||||
# failed hostport.
|
||||
|
||||
@@ -187,8 +187,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
|
||||
impalad.service.create_hs2_client()
|
||||
assert False, "should have failed"
|
||||
except Exception as e:
|
||||
assert re.match(r"Number of sessions for user \S+ exceeds coordinator limit 2",
|
||||
str(e)), "Unexpected exception: " + str(e)
|
||||
assert re.match(r".*Number of sessions for user \S+ exceeds coordinator limit 2",
|
||||
str(e), re.DOTALL), "Unexpected exception: " + str(e)
|
||||
|
||||
# Test webui for hs2 sessions.
|
||||
res = impalad.service.get_debug_webpage_json("/sessions")
|
||||
|
||||
@@ -20,6 +20,7 @@ from __future__ import absolute_import, division, print_function
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
|
||||
|
||||
class TestSysDb(CustomClusterTestSuite):
|
||||
@@ -57,6 +58,8 @@ class TestSysDb(CustomClusterTestSuite):
|
||||
assert False, "table '{0}' should have failed to create but was created" \
|
||||
.format(table_name)
|
||||
except ImpalaBeeswaxException as e:
|
||||
assert "Query aborted:IllegalStateException: Can't create blacklisted table: {0}" \
|
||||
.format(table_name) in str(e), "table '{0}' failed to create but for the " \
|
||||
"wrong reason".format(table_name)
|
||||
expected_error = "IllegalStateException: Can't create blacklisted table: {0}" \
|
||||
.format(table_name)
|
||||
assert error_msg_expected(str(e), expected_error), \
|
||||
"table '{0}' failed to create but for the wrong reason:\n{1}\n" \
|
||||
.format(table_name, str(e))
|
||||
|
||||
@@ -26,6 +26,7 @@ from thrift.transport.TSocket import TSocket
|
||||
from thrift.transport.TTransport import TBufferedTransport
|
||||
from thrift.protocol import TBinaryProtocol
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from time import sleep, time
|
||||
|
||||
|
||||
@@ -136,10 +137,11 @@ class HS2TestSuite(ImpalaTestSuite):
|
||||
def check_response(response,
|
||||
expected_status_code = TCLIService.TStatusCode.SUCCESS_STATUS,
|
||||
expected_error_prefix = None):
|
||||
assert response.status.statusCode == expected_status_code
|
||||
assert response.status.statusCode == expected_status_code, str(response)
|
||||
if expected_status_code != TCLIService.TStatusCode.SUCCESS_STATUS\
|
||||
and expected_error_prefix is not None:
|
||||
assert response.status.errorMessage.startswith(expected_error_prefix)
|
||||
assert response.status.errorMessage.startswith(expected_error_prefix) or \
|
||||
error_msg_expected(response.status.errorMessage, expected_error_prefix)
|
||||
|
||||
@staticmethod
|
||||
def check_invalid_session(response):
|
||||
|
||||
@@ -84,7 +84,7 @@ class TestRewrittenFile(ImpalaTestSuite):
|
||||
# Metadata for file '...' appears stale. Try running "refresh
|
||||
# unique_database_name.new_file_shorter" to reload the file metadata.
|
||||
# IMPALA-2512: Error message could also be something like
|
||||
# Query aborted:Disk I/O error on ...:27001: Error seeking ...
|
||||
# Disk I/O error on ...:27001: Error seeking ...
|
||||
# between 0 and ... for '...'
|
||||
# TODO: find a better way to detect stale file meta and remove skip markers.
|
||||
table_name = "new_file_shorter"
|
||||
|
||||
@@ -39,6 +39,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.iceberg_test_suite import IcebergTestSuite
|
||||
from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
|
||||
from tests.common.test_dimensions import add_exec_option_dimension
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from tests.common.file_utils import (
|
||||
create_iceberg_table_from_directory,
|
||||
create_table_from_parquet)
|
||||
@@ -1378,7 +1379,7 @@ class TestIcebergTable(IcebergTestSuite):
|
||||
query_options=abort_ice_transaction_options)
|
||||
# Check that the error message looks reasonable.
|
||||
result = str(err)
|
||||
assert "Query aborted:CommitFailedException: simulated commit failure" in result
|
||||
assert error_msg_expected(result, "CommitFailedException: simulated commit failure")
|
||||
# Check that no data was inserted.
|
||||
data = self.execute_query_expect_success(self.client,
|
||||
"select * from {0}".format(tbl_name))
|
||||
@@ -1393,7 +1394,8 @@ class TestIcebergTable(IcebergTestSuite):
|
||||
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
|
||||
ddl_result = str(ddl_err)
|
||||
# Check that the error message looks reasonable.
|
||||
assert "Query aborted:CommitFailedException: simulated commit failure" in ddl_result
|
||||
assert error_msg_expected(ddl_result,
|
||||
"CommitFailedException: simulated commit failure")
|
||||
# Check that no column was added.
|
||||
data = self.execute_query_expect_success(self.client,
|
||||
"select * from {0}".format(tbl_name))
|
||||
|
||||
@@ -39,10 +39,12 @@ from tests.common.skip import SkipIf
|
||||
from tests.common.test_dimensions import (
|
||||
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
||||
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from time import sleep, time
|
||||
from tests.shell.util import (get_impalad_host_port, assert_var_substitution,
|
||||
run_impala_shell_cmd, ImpalaShell, build_shell_env, wait_for_query_state,
|
||||
create_impala_shell_executable_dimension, get_impala_shell_executable)
|
||||
create_impala_shell_executable_dimension, get_impala_shell_executable,
|
||||
stderr_get_first_error_msg)
|
||||
from contextlib import closing
|
||||
|
||||
|
||||
@@ -274,8 +276,11 @@ class TestImpalaShell(ImpalaTestSuite):
|
||||
args = ['-q', 'set abort_on_error=true;'
|
||||
'select id from functional_parquet.bad_column_metadata t']
|
||||
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
||||
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr
|
||||
assert 'but read 10 values from column id.' in result.stderr
|
||||
assert error_msg_expected(
|
||||
stderr_get_first_error_msg(result.stderr),
|
||||
"Column metadata states there are 11 values, but read 10 values from column id."
|
||||
)
|
||||
|
||||
|
||||
def test_completed_query_errors_2(self, vector):
|
||||
if vector.get_value('strict_hs2_protocol'):
|
||||
@@ -284,9 +289,10 @@ class TestImpalaShell(ImpalaTestSuite):
|
||||
'select id, cnt from functional_parquet.bad_column_metadata t, '
|
||||
'(select 1 cnt) u']
|
||||
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
||||
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr,\
|
||||
result.stderr
|
||||
assert 'but read 10 values from column id.' in result.stderr, result.stderr
|
||||
assert error_msg_expected(
|
||||
stderr_get_first_error_msg(result.stderr),
|
||||
"Column metadata states there are 11 values, but read 10 values from column id."
|
||||
)
|
||||
|
||||
def test_no_warnings_in_log_with_quiet_mode(self, vector):
|
||||
if vector.get_value('strict_hs2_protocol'):
|
||||
|
||||
@@ -46,9 +46,11 @@ from tests.common.skip import SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
||||
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
||||
from tests.common.test_result_verifier import error_msg_expected
|
||||
from tests.shell.util import (assert_var_substitution, ImpalaShell, get_impalad_port,
|
||||
get_shell_cmd, get_open_sessions_metric, spawn_shell, get_unused_port,
|
||||
create_impala_shell_executable_dimension, get_impala_shell_executable)
|
||||
create_impala_shell_executable_dimension, get_impala_shell_executable,
|
||||
stderr_get_first_error_msg)
|
||||
|
||||
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
|
||||
|
||||
@@ -1161,8 +1163,10 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
|
||||
assert "ParseException" in result.stderr,\
|
||||
result.stderr
|
||||
else:
|
||||
assert "ERROR: ParseException: Unmatched string literal" in result.stderr,\
|
||||
result.stderr
|
||||
assert error_msg_expected(
|
||||
stderr_get_first_error_msg(result.stderr),
|
||||
"ParseException: Unmatched string literal"
|
||||
)
|
||||
|
||||
def test_utf8_error_message(self, vector):
|
||||
if vector.get_value('strict_hs2_protocol'):
|
||||
@@ -1172,8 +1176,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
|
||||
query = "select cast(now() as string format 'yyyy年MM月dd日')"
|
||||
shell.send_cmd(query)
|
||||
result = shell.get_result()
|
||||
assert "ERROR: Bad date/time conversion format: yyyy年MM月dd日" in result.stderr,\
|
||||
result.stderr
|
||||
assert error_msg_expected(
|
||||
stderr_get_first_error_msg(result.stderr),
|
||||
"Bad date/time conversion format: yyyy年MM月dd日"
|
||||
)
|
||||
|
||||
|
||||
def test_timezone_validation(self, vector):
|
||||
"""Test that query option TIMEZONE is validated when executing a query.
|
||||
|
||||
@@ -400,3 +400,9 @@ def get_impala_shell_executable(vector):
|
||||
'python2': [os.path.join(IMPALA_HOME, 'shell/build/python2_venv/bin/impala-shell')],
|
||||
'python3': [os.path.join(IMPALA_HOME, 'shell/build/python3_venv/bin/impala-shell')]
|
||||
}[vector.get_value_with_default('impala_shell', 'dev')]
|
||||
|
||||
|
||||
def stderr_get_first_error_msg(stderr):
|
||||
"""Seek to the begining of the first error message in stderr of impala-shell."""
|
||||
PROMPT = "\nERROR: "
|
||||
return stderr[(stderr.index(PROMPT) + len(PROMPT)):]
|
||||
|
||||
Reference in New Issue
Block a user