mirror of
https://github.com/apache/impala.git
synced 2025-12-20 02:20:11 -05:00
IMPALA-13115: Add query id to error messages
This patch adds the query id to the error messages in both - the result of the `get_log()` RPC, and - the error message in an RPC response before they are returned to the client, so that the users can easily figure out the errored queries on the client side. To achieve this, the query id of the thread debug info is set in the RPC handler method, and is retrieved from the thread debug info each time the error reporting function or `get_log()` gets called. Due to the change of the error message format, some checks in the impala-shell.py are adapted to keep them valid. Testing: - Added helper function `error_msg_expected()` to check whether an error message is expected. It is stricter than only using the `in` operator. - Added helper function `error_msg_equal()` to check if two error messages are equal regardless of the query ids. - Various test cases are adapted to match the new error message format. - `ImpalaBeeswaxException`, which is used in tests only, is simplified so that it has the same error message format as the exceptions for HS2. - Added an assertion to the case of killing and restarting a worker in the custom cluster test to ensure that the query id is in the error message in the client log retrieved with `get_log()`. Change-Id: I67e659681e36162cad1d9684189106f8eedbf092 Reviewed-on: http://gerrit.cloudera.org:8080/21587 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
b1941c8f17
commit
ad868b9947
@@ -739,7 +739,10 @@ class ExprTest : public testing::TestWithParam<std::tuple<bool, bool>> {
|
|||||||
Status status = executor_->Exec(stmt, &result_types);
|
Status status = executor_->Exec(stmt, &result_types);
|
||||||
status = executor_->FetchResult(&result_row);
|
status = executor_->FetchResult(&result_row);
|
||||||
ASSERT_FALSE(status.ok());
|
ASSERT_FALSE(status.ok());
|
||||||
ASSERT_TRUE(EndsWith(status.msg().msg(), error_string)) << "Actual: '"
|
// Ignore the tailing '\n' characters when matching
|
||||||
|
string actual_msg = boost::trim_right_copy(status.msg().msg());
|
||||||
|
string expected_msg = boost::trim_right_copy(error_string);
|
||||||
|
ASSERT_TRUE(EndsWith(actual_msg, expected_msg)) << "Actual: '"
|
||||||
<< status.msg().msg() << "'" << endl << "Expected: '" << error_string << "'";
|
<< status.msg().msg() << "'" << endl << "Expected: '" << error_string << "'";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
#include "service/impala-server.h"
|
#include "service/impala-server.h"
|
||||||
|
|
||||||
#include "common/logging.h"
|
#include "common/logging.h"
|
||||||
|
#include "common/thread-debug-info.h"
|
||||||
#include "gen-cpp/Frontend_types.h"
|
#include "gen-cpp/Frontend_types.h"
|
||||||
#include "rpc/thrift-util.h"
|
#include "rpc/thrift-util.h"
|
||||||
#include "runtime/coordinator.h"
|
#include "runtime/coordinator.h"
|
||||||
@@ -73,6 +74,9 @@ void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& quer
|
|||||||
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
||||||
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||||
|
|
||||||
// start thread to wait for results to become available, which will allow
|
// start thread to wait for results to become available, which will allow
|
||||||
// us to advance query state to FINISHED or EXCEPTION
|
// us to advance query state to FINISHED or EXCEPTION
|
||||||
Status status = query_handle->WaitAsync();
|
Status status = query_handle->WaitAsync();
|
||||||
@@ -121,6 +125,9 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
|
|||||||
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
|
||||||
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||||
|
|
||||||
// Once the query is running do a final check for session closure and add it to the
|
// Once the query is running do a final check for session closure and add it to the
|
||||||
// set of in-flight queries.
|
// set of in-flight queries.
|
||||||
Status status = SetQueryInflight(session, query_handle);
|
Status status = SetQueryInflight(session, query_handle);
|
||||||
@@ -184,6 +191,9 @@ void ImpalaServer::fetch(Results& query_results,
|
|||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
|
VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
@@ -231,6 +241,9 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
|
|||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RAISE_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
@@ -278,6 +291,10 @@ void ImpalaServer::close(const beeswax::QueryHandle& beeswax_handle) {
|
|||||||
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
|
||||||
// TODO: do we need to raise an exception if the query state is EXCEPTION?
|
// TODO: do we need to raise an exception if the query state is EXCEPTION?
|
||||||
// TODO: use timeout to get rid of unwanted query_handle.
|
// TODO: use timeout to get rid of unwanted query_handle.
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
|
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -291,6 +308,9 @@ beeswax::QueryState::type ImpalaServer::get_state(
|
|||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
|
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
Status status = GetActiveQueryHandle(query_id, &query_handle);
|
Status status = GetActiveQueryHandle(query_id, &query_handle);
|
||||||
// GetActiveQueryHandle may return the query's status from being cancelled. If not an
|
// GetActiveQueryHandle may return the query's status from being cancelled. If not an
|
||||||
@@ -336,6 +356,9 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
|
|||||||
TUniqueId query_id;
|
TUniqueId query_id;
|
||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
RAISE_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
@@ -365,7 +388,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
|
|||||||
!query_handle->query_status().ok());
|
!query_handle->query_status().ok());
|
||||||
// If the query status is !ok, include the status error message at the top of the log.
|
// If the query status is !ok, include the status error message at the top of the log.
|
||||||
if (!query_handle->query_status().ok()) {
|
if (!query_handle->query_status().ok()) {
|
||||||
error_log_ss << query_handle->query_status().GetDetail() << "\n";
|
error_log_ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
|
||||||
|
query_handle->query_status().GetDetail());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -408,6 +432,9 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
|
|||||||
TUniqueId query_id;
|
TUniqueId query_id;
|
||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
// Impala-shell and administrative tools can call this from a different connection,
|
// Impala-shell and administrative tools can call this from a different connection,
|
||||||
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
|
// e.g. to allow an admin to force-terminate queries. We should allow the operation to
|
||||||
// proceed without validating the session/query relation so that workflows don't
|
// proceed without validating the session/query relation so that workflows don't
|
||||||
@@ -427,6 +454,9 @@ void ImpalaServer::CloseInsert(TDmlResult& dml_result,
|
|||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
// CloseInsertInternal() will validates that 'session' has access to 'query_id'.
|
// CloseInsertInternal() will validates that 'session' has access to 'query_id'.
|
||||||
Status status = CloseInsertInternal(session.get(), query_id, &dml_result);
|
Status status = CloseInsertInternal(session.get(), query_id, &dml_result);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
@@ -454,6 +484,9 @@ void ImpalaServer::GetRuntimeProfile(
|
|||||||
|
|
||||||
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
// If the query was retried, fetch the profile for the most recent attempt of the query
|
// If the query was retried, fetch the profile for the most recent attempt of the query
|
||||||
// The original query profile should still be accessible via the web ui.
|
// The original query profile should still be accessible via the web ui.
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
@@ -489,6 +522,10 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
|
|||||||
TUniqueId query_id;
|
TUniqueId query_id;
|
||||||
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
BeeswaxHandleToTUniqueId(beeswax_handle, &query_id);
|
||||||
VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
|
VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following RaiseBeeswaxException().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
// GetExecSummary() will validate that the user has access to 'query_id'.
|
// GetExecSummary() will validate that the user has access to 'query_id'.
|
||||||
Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
|
Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
|
||||||
if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
|
if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
|
||||||
@@ -580,7 +617,10 @@ inline void ImpalaServer::BeeswaxHandleToTUniqueId(
|
|||||||
[[noreturn]] void ImpalaServer::RaiseBeeswaxException(
|
[[noreturn]] void ImpalaServer::RaiseBeeswaxException(
|
||||||
const string& msg, const char* sql_state) {
|
const string& msg, const char* sql_state) {
|
||||||
BeeswaxException exc;
|
BeeswaxException exc;
|
||||||
exc.__set_message(msg);
|
exc.__set_message(GetThreadDebugInfo()->GetQueryId() == TUniqueId() ?
|
||||||
|
(msg) :
|
||||||
|
Substitute(ImpalaServer::QUERY_ERROR_FORMAT,
|
||||||
|
PrintId(GetThreadDebugInfo()->GetQueryId()), (msg)));
|
||||||
exc.__set_SQLState(sql_state);
|
exc.__set_SQLState(sql_state);
|
||||||
throw exc;
|
throw exc;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,12 +69,19 @@ const TProtocolVersion::type MAX_SUPPORTED_HS2_VERSION =
|
|||||||
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
|
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6;
|
||||||
|
|
||||||
// HiveServer2 error returning macro
|
// HiveServer2 error returning macro
|
||||||
#define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
|
//
|
||||||
do { \
|
// To include query id in the error message, it is required that the query id of the
|
||||||
return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
|
// thread debug info is set in at least the caller's scope.
|
||||||
return_val.status.__set_errorMessage((error_msg)); \
|
#define HS2_RETURN_ERROR(return_val, error_msg, error_state) \
|
||||||
return_val.status.__set_sqlState((error_state)); \
|
do { \
|
||||||
return; \
|
return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS); \
|
||||||
|
return_val.status.__set_errorMessage( \
|
||||||
|
GetThreadDebugInfo()->GetQueryId() == TUniqueId() ? \
|
||||||
|
(error_msg) : \
|
||||||
|
Substitute(ImpalaServer::QUERY_ERROR_FORMAT, \
|
||||||
|
PrintId(GetThreadDebugInfo()->GetQueryId()), (error_msg))); \
|
||||||
|
return_val.status.__set_sqlState((error_state)); \
|
||||||
|
return; \
|
||||||
} while (false)
|
} while (false)
|
||||||
|
|
||||||
#define HS2_RETURN_IF_ERROR(return_val, status, error_state) \
|
#define HS2_RETURN_IF_ERROR(return_val, status, error_state) \
|
||||||
@@ -157,7 +164,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
|||||||
Status register_status = RegisterQuery(query_ctx.query_id, session, &query_handle);
|
Status register_status = RegisterQuery(query_ctx.query_id, session, &query_handle);
|
||||||
if (!register_status.ok()) {
|
if (!register_status.ok()) {
|
||||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||||
status->__set_errorMessage(register_status.GetDetail());
|
status->__set_errorMessage(Substitute(
|
||||||
|
QUERY_ERROR_FORMAT, PrintId(query_ctx.query_id), register_status.GetDetail()));
|
||||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -166,7 +174,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
|||||||
if (!exec_status.ok()) {
|
if (!exec_status.ok()) {
|
||||||
discard_result(UnregisterQuery(query_handle->query_id(), false, &exec_status));
|
discard_result(UnregisterQuery(query_handle->query_id(), false, &exec_status));
|
||||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||||
status->__set_errorMessage(exec_status.GetDetail());
|
status->__set_errorMessage(Substitute(
|
||||||
|
QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()), exec_status.GetDetail()));
|
||||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -177,7 +186,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
|
|||||||
if (!inflight_status.ok()) {
|
if (!inflight_status.ok()) {
|
||||||
discard_result(UnregisterQuery(query_handle->query_id(), false, &inflight_status));
|
discard_result(UnregisterQuery(query_handle->query_id(), false, &inflight_status));
|
||||||
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
|
||||||
status->__set_errorMessage(inflight_status.GetDetail());
|
status->__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
|
||||||
|
PrintId(query_handle->query_id()), inflight_status.GetDetail()));
|
||||||
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -606,6 +616,10 @@ void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
|
|||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
status = Execute(&query_ctx, session, &query_handle, external_exec_request);
|
status = Execute(&query_ctx, session, &query_handle, external_exec_request);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_handle->query_id());
|
||||||
|
|
||||||
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
|
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
// Start thread to wait for results to become available.
|
// Start thread to wait for results to become available.
|
||||||
@@ -875,6 +889,9 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
|
|||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
|
VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
@@ -893,7 +910,8 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
|
|||||||
return_val.__set_operationState(operation_state);
|
return_val.__set_operationState(operation_state);
|
||||||
if (operation_state == TOperationState::ERROR_STATE) {
|
if (operation_state == TOperationState::ERROR_STATE) {
|
||||||
DCHECK(!query_handle->query_status().ok());
|
DCHECK(!query_handle->query_status().ok());
|
||||||
return_val.__set_errorMessage(query_handle->query_status().GetDetail());
|
return_val.__set_errorMessage(Substitute(QUERY_ERROR_FORMAT,
|
||||||
|
PrintId(query_id), query_handle->query_status().GetDetail()));
|
||||||
return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
|
||||||
} else {
|
} else {
|
||||||
ClientRequestState::RetryState retry_state = query_handle->retry_state();
|
ClientRequestState::RetryState retry_state = query_handle->retry_state();
|
||||||
@@ -914,6 +932,9 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
|
|||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
@@ -945,6 +966,9 @@ void ImpalaServer::CloseImpalaOperation(TCloseImpalaOperationResp& return_val,
|
|||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
@@ -977,6 +1001,9 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
|
|||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
|
VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
return_val, GetActiveQueryHandle(query_id, &query_handle), SQLSTATE_GENERAL_ERROR);
|
||||||
@@ -1028,6 +1055,8 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
|
|||||||
VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
|
VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
|
||||||
<< " fetch_size=" << request.maxRows;
|
<< " fetch_size=" << request.maxRows;
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
@@ -1072,6 +1101,8 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
|
|||||||
request.operationHandle.operationId, &query_id, &op_secret),
|
request.operationHandle.operationId, &query_id, &op_secret),
|
||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
QueryHandle query_handle;
|
QueryHandle query_handle;
|
||||||
HS2_RETURN_IF_ERROR(
|
HS2_RETURN_IF_ERROR(
|
||||||
@@ -1113,7 +1144,10 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
|
|||||||
DCHECK_EQ(query_handle->exec_state() == ClientRequestState::ExecState::ERROR,
|
DCHECK_EQ(query_handle->exec_state() == ClientRequestState::ExecState::ERROR,
|
||||||
!query_status.ok());
|
!query_status.ok());
|
||||||
// If the query status is !ok, include the status error message at the top of the log.
|
// If the query status is !ok, include the status error message at the top of the log.
|
||||||
if (!query_status.ok()) ss << query_status.GetDetail();
|
if (!query_status.ok()) {
|
||||||
|
ss << Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()),
|
||||||
|
query_status.GetDetail());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Report analysis errors
|
// Report analysis errors
|
||||||
@@ -1166,6 +1200,9 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
|
|||||||
request.operationHandle.operationId, &query_id, &op_secret),
|
request.operationHandle.operationId, &query_id, &op_secret),
|
||||||
SQLSTATE_GENERAL_ERROR);
|
SQLSTATE_GENERAL_ERROR);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
TExecSummary summary;
|
TExecSummary summary;
|
||||||
TExecSummary original_summary;
|
TExecSummary original_summary;
|
||||||
bool was_retried = false;
|
bool was_retried = false;
|
||||||
@@ -1252,6 +1289,9 @@ void ImpalaServer::GetRuntimeProfile(
|
|||||||
|
|
||||||
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
|
||||||
|
|
||||||
|
// Make query id available to the following HS2_RETURN_IF_ERROR().
|
||||||
|
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id);
|
||||||
|
|
||||||
// Set the RuntimeProfileOutput for the retried query (e.g. the second attempt of a
|
// Set the RuntimeProfileOutput for the retried query (e.g. the second attempt of a
|
||||||
// query). If the query has been retried this will be the retried profile. If the
|
// query). If the query has been retried this will be the retried profile. If the
|
||||||
// query has not been retried then all entries will be nullptr.
|
// query has not been retried then all entries will be nullptr.
|
||||||
|
|||||||
@@ -458,6 +458,7 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
|
|||||||
|
|
||||||
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
|
const char* ImpalaServer::GET_LOG_QUERY_RETRY_INFO_FORMAT =
|
||||||
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
|
"Original query failed:\n$0\nQuery has been retried using query id: $1\n";
|
||||||
|
const char* ImpalaServer::QUERY_ERROR_FORMAT = "Query $0 failed:\n$1\n";
|
||||||
|
|
||||||
// Interval between checks for query expiration.
|
// Interval between checks for query expiration.
|
||||||
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
|
const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
|
||||||
@@ -1255,7 +1256,6 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
|
|||||||
QueryHandle* query_handle, const TExecRequest* external_exec_request,
|
QueryHandle* query_handle, const TExecRequest* external_exec_request,
|
||||||
const bool include_in_query_log) {
|
const bool include_in_query_log) {
|
||||||
PrepareQueryContext(query_ctx);
|
PrepareQueryContext(query_ctx);
|
||||||
ScopedThreadContext debug_ctx(GetThreadDebugInfo(), query_ctx->query_id);
|
|
||||||
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
||||||
|
|
||||||
// Redact the SQL stmt and update the query context
|
// Redact the SQL stmt and update the query context
|
||||||
|
|||||||
@@ -733,6 +733,9 @@ class ImpalaServer : public ImpalaServiceIf,
|
|||||||
static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
|
static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
|
||||||
/// String format of retry information returned in GetLog() RPCs.
|
/// String format of retry information returned in GetLog() RPCs.
|
||||||
static const char* GET_LOG_QUERY_RETRY_INFO_FORMAT;
|
static const char* GET_LOG_QUERY_RETRY_INFO_FORMAT;
|
||||||
|
/// String format of errors related to a query. It contains a placeholder for
|
||||||
|
/// the query id.
|
||||||
|
static const char* QUERY_ERROR_FORMAT;
|
||||||
|
|
||||||
/// Used in situations where the client provides a session ID and a query ID and the
|
/// Used in situations where the client provides a session ID and a query ID and the
|
||||||
/// caller needs to validate that the query can be accessed from the session. The two
|
/// caller needs to validate that the query can be accessed from the session. The two
|
||||||
@@ -1010,6 +1013,9 @@ class ImpalaServer : public ImpalaServiceIf,
|
|||||||
const beeswax::QueryHandle& beeswax_handle, TUniqueId* query_id);
|
const beeswax::QueryHandle& beeswax_handle, TUniqueId* query_id);
|
||||||
|
|
||||||
/// Helper function to raise BeeswaxException
|
/// Helper function to raise BeeswaxException
|
||||||
|
///
|
||||||
|
/// To include query id in the error message, it is required that the query id of the
|
||||||
|
/// thread debug info is set in at least the caller's scope.
|
||||||
[[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
|
[[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
|
||||||
|
|
||||||
/// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
|
/// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
|
||||||
|
|||||||
@@ -208,7 +208,7 @@ public class LdapHS2Test {
|
|||||||
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
|
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
|
||||||
verifyMetrics(5, 0);
|
verifyMetrics(5, 0);
|
||||||
assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
|
assertEquals(cancelResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
|
||||||
assertEquals(cancelResp.getStatus().getErrorMessage(), expectedError);
|
assertTrue(cancelResp.getStatus().getErrorMessage().contains(expectedError));
|
||||||
|
|
||||||
// Open another session which will get username 'Test2Ldap'.
|
// Open another session which will get username 'Test2Ldap'.
|
||||||
TOpenSessionReq openReq2 = new TOpenSessionReq();
|
TOpenSessionReq openReq2 = new TOpenSessionReq();
|
||||||
|
|||||||
@@ -712,7 +712,9 @@ class ImpalaShell(cmd.Cmd, object):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Suppress harmless errors.
|
# Suppress harmless errors.
|
||||||
err_msg = str(e).strip()
|
err_msg = str(e).strip()
|
||||||
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle']:
|
# Check twice so that it can work with both the old and the new error formats.
|
||||||
|
if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle'] or \
|
||||||
|
('\nCancelled' in err_msg or '\nInvalid or unknown query handle' in err_msg):
|
||||||
break
|
break
|
||||||
err_details = "Failed to reconnect and close (try %i/%i): %s"
|
err_details = "Failed to reconnect and close (try %i/%i): %s"
|
||||||
print(err_details % (cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg),
|
print(err_details % (cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg),
|
||||||
@@ -813,8 +815,7 @@ class ImpalaShell(cmd.Cmd, object):
|
|||||||
try:
|
try:
|
||||||
summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
|
summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
|
||||||
except RPCException as e:
|
except RPCException as e:
|
||||||
import re
|
error_pattern = re.compile("Query id [a-f0-9]+:[a-f0-9]+ not found.")
|
||||||
error_pattern = re.compile("ERROR: Query id [a-f0-9]+:[a-f0-9]+ not found.")
|
|
||||||
if error_pattern.match(e.value):
|
if error_pattern.match(e.value):
|
||||||
print("Could not retrieve summary for query.", file=sys.stderr)
|
print("Could not retrieve summary for query.", file=sys.stderr)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ INT
|
|||||||
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
||||||
insert into insertonly_acid values (42);
|
insert into insertonly_acid values (42);
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select * from insertonly_acid;
|
select * from insertonly_acid;
|
||||||
@@ -28,7 +28,7 @@ INT
|
|||||||
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
||||||
insert into insertonly_acid values (42);
|
insert into insertonly_acid values (42);
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select * from insertonly_acid;
|
select * from insertonly_acid;
|
||||||
@@ -58,7 +58,7 @@ INT,INT
|
|||||||
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
|
||||||
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select p, n from part;
|
select p, n from part;
|
||||||
@@ -73,7 +73,7 @@ INT,INT
|
|||||||
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
|
||||||
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select p, n from part;
|
select p, n from part;
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
|
|||||||
# Avro table with missing two column definitions.
|
# Avro table with missing two column definitions.
|
||||||
compute stats avro_hive_alltypes_missing_coldef
|
compute stats avro_hive_alltypes_missing_coldef
|
||||||
---- CATCH
|
---- CATCH
|
||||||
MESSAGE: AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
|
AnalysisException: Cannot COMPUTE STATS on Avro table 'avro_hive_alltypes_missing_coldef' because its column definitions do not match those in the Avro schema.
|
||||||
Definition of column 'smallint_col' of type 'smallint' does not match the Avro-schema column 'tinyint_col' of type 'INT' at position '2'.
|
Definition of column 'smallint_col' of type 'smallint' does not match the Avro-schema column 'tinyint_col' of type 'INT' at position '2'.
|
||||||
Please re-create the table with column definitions, e.g., using the result of 'SHOW CREATE TABLE'
|
Please re-create the table with column definitions, e.g., using the result of 'SHOW CREATE TABLE'
|
||||||
====
|
====
|
||||||
|
|||||||
@@ -3044,12 +3044,12 @@ timestamp
|
|||||||
---- QUERY
|
---- QUERY
|
||||||
select to_timestamp('01:01:01', 'HH:mm:ss');
|
select to_timestamp('01:01:01', 'HH:mm:ss');
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Bad date/time conversion format: HH:mm:ss
|
Bad date/time conversion format: HH:mm:ss
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select to_timestamp('01:01:01.123', 'HH:mm:ss.SSS');
|
select to_timestamp('01:01:01.123', 'HH:mm:ss.SSS');
|
||||||
---- CATCH
|
---- CATCH
|
||||||
Query aborted:Bad date/time conversion format: HH:mm:ss
|
Bad date/time conversion format: HH:mm:ss
|
||||||
====
|
====
|
||||||
---- QUERY
|
---- QUERY
|
||||||
select from_unixtime(1382337792,"HH:mm:ss.SSSSSSSSS");
|
select from_unixtime(1382337792,"HH:mm:ss.SSSSSSSSS");
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ from thrift.protocol import TBinaryProtocol
|
|||||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||||
from tests.common.file_utils import assert_file_in_dir_contains,\
|
from tests.common.file_utils import assert_file_in_dir_contains,\
|
||||||
assert_no_files_in_dir_contain
|
assert_no_files_in_dir_contain
|
||||||
|
from tests.common.test_result_verifier import error_msg_equal
|
||||||
from tests.common.skip import SkipIf
|
from tests.common.skip import SkipIf
|
||||||
|
|
||||||
|
|
||||||
@@ -131,7 +132,7 @@ class TestAuthorization(CustomClusterTestSuite):
|
|||||||
exc1_str = str(self.execute_query_expect_failure(self.client, query, user=user))
|
exc1_str = str(self.execute_query_expect_failure(self.client, query, user=user))
|
||||||
exc2_str = str(self.execute_query_expect_failure(self.client, query_non_existent,
|
exc2_str = str(self.execute_query_expect_failure(self.client, query_non_existent,
|
||||||
user=user))
|
user=user))
|
||||||
assert exc1_str == exc2_str
|
assert error_msg_equal(exc1_str, exc2_str)
|
||||||
assert "AuthorizationException" in exc1_str
|
assert "AuthorizationException" in exc1_str
|
||||||
assert "does not have privileges to access"
|
assert "does not have privileges to access"
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ LOG = logging.getLogger('impala_beeswax')
|
|||||||
|
|
||||||
# Custom exception wrapper.
|
# Custom exception wrapper.
|
||||||
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
|
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
|
||||||
# __str__ preserves the exception type.
|
|
||||||
# TODO: Add the ability to print some of the stack.
|
# TODO: Add the ability to print some of the stack.
|
||||||
class ImpalaBeeswaxException(Exception):
|
class ImpalaBeeswaxException(Exception):
|
||||||
__name__ = "ImpalaBeeswaxException"
|
__name__ = "ImpalaBeeswaxException"
|
||||||
@@ -60,7 +59,7 @@ class ImpalaBeeswaxException(Exception):
|
|||||||
self.inner_exception = inner_exception
|
self.inner_exception = inner_exception
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "%s:\n %s" % (self.__name__, self.__message)
|
return self.__message
|
||||||
|
|
||||||
class ImpalaBeeswaxResult(object):
|
class ImpalaBeeswaxResult(object):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
@@ -402,7 +401,7 @@ class ImpalaBeeswaxClient(object):
|
|||||||
try:
|
try:
|
||||||
error_log = self.__do_rpc(
|
error_log = self.__do_rpc(
|
||||||
lambda: self.imp_service.get_log(query_handle.log_context))
|
lambda: self.imp_service.get_log(query_handle.log_context))
|
||||||
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
|
raise ImpalaBeeswaxException(error_log, None)
|
||||||
finally:
|
finally:
|
||||||
self.close_query(query_handle)
|
self.close_query(query_handle)
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
@@ -420,7 +419,7 @@ class ImpalaBeeswaxClient(object):
|
|||||||
try:
|
try:
|
||||||
error_log = self.__do_rpc(
|
error_log = self.__do_rpc(
|
||||||
lambda: self.imp_service.get_log(query_handle.log_context))
|
lambda: self.imp_service.get_log(query_handle.log_context))
|
||||||
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
|
raise ImpalaBeeswaxException(error_log, None)
|
||||||
finally:
|
finally:
|
||||||
self.close_query(query_handle)
|
self.close_query(query_handle)
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
@@ -523,7 +522,7 @@ class ImpalaBeeswaxClient(object):
|
|||||||
message = str(exception)
|
message = str(exception)
|
||||||
if isinstance(exception, BeeswaxService.BeeswaxException):
|
if isinstance(exception, BeeswaxService.BeeswaxException):
|
||||||
message = exception.message
|
message = exception.message
|
||||||
return 'INNER EXCEPTION: %s\n MESSAGE: %s' % (exception.__class__, message)
|
return message
|
||||||
|
|
||||||
def __do_rpc(self, rpc):
|
def __do_rpc(self, rpc):
|
||||||
"""Executes the RPC lambda provided with some error checking.
|
"""Executes the RPC lambda provided with some error checking.
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ from tests.common.test_dimensions import (
|
|||||||
get_dataset_from_workload,
|
get_dataset_from_workload,
|
||||||
load_table_info_dimension)
|
load_table_info_dimension)
|
||||||
from tests.common.test_result_verifier import (
|
from tests.common.test_result_verifier import (
|
||||||
|
error_msg_expected,
|
||||||
try_compile_regex,
|
try_compile_regex,
|
||||||
verify_lineage,
|
verify_lineage,
|
||||||
verify_raw_results,
|
verify_raw_results,
|
||||||
@@ -738,6 +739,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
if 'CATCH' in test_section:
|
if 'CATCH' in test_section:
|
||||||
self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
|
self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
|
||||||
|
assert error_msg_expected(str(e)) # Only checks if message contains query id
|
||||||
continue
|
continue
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
@@ -797,3 +797,43 @@ def assert_codegen_cache_hit(profile_string, expect_hit):
|
|||||||
assert "NumCachedFunctions: 0 " not in profile_string
|
assert "NumCachedFunctions: 0 " not in profile_string
|
||||||
else:
|
else:
|
||||||
assert "NumCachedFunctions: 0 " in profile_string
|
assert "NumCachedFunctions: 0 " in profile_string
|
||||||
|
|
||||||
|
|
||||||
|
# A query id consists of two 64-bit non-zero hex numbers connected with a ":".
|
||||||
|
QUERY_ID_REGEX = r"(?!0{16})[0-9a-z]{16}:(?!0{16})[0-9a-z]{16}"
|
||||||
|
|
||||||
|
|
||||||
|
def error_msg_expected(actual_msg, expected_msg="", query_id=None):
|
||||||
|
"""
|
||||||
|
Check if the actual error message is expected.
|
||||||
|
|
||||||
|
As defined in `ImpalaServer::QUERY_ERROR_FORMAT`, an error message is expected to
|
||||||
|
has the following form:
|
||||||
|
|
||||||
|
Query <query_id> failed:\n<error_detail>\n
|
||||||
|
|
||||||
|
- For `query_id`,
|
||||||
|
- If a query id is specified in the parameter, it checks if the actual error
|
||||||
|
message contains exactly the query id.
|
||||||
|
- Otherwise, it checks whether `query_id` match the format using a
|
||||||
|
regular expresssion.
|
||||||
|
- For `error_detail`, it checks whether this part starts with the `expected_msg` if it
|
||||||
|
is specified in the parameter. This is sufficient to distinguish one kind of error
|
||||||
|
from another.
|
||||||
|
"""
|
||||||
|
if query_id is None:
|
||||||
|
ERROR_REGEX = "^Query " + QUERY_ID_REGEX + " failed:\n"
|
||||||
|
m = re.search(ERROR_REGEX, actual_msg)
|
||||||
|
if m is None:
|
||||||
|
return False
|
||||||
|
return actual_msg.find(expected_msg, m.end()) != -1
|
||||||
|
|
||||||
|
# The beginning of `ImpalaServer::QUERY_ERROR_FORMAT`
|
||||||
|
ERROR_PROMPT = "Query {} failed:\n{}"
|
||||||
|
return actual_msg.startswith(ERROR_PROMPT.format(query_id, expected_msg))
|
||||||
|
|
||||||
|
|
||||||
|
def error_msg_equal(msg1, msg2):
|
||||||
|
"""Check if two error messages are equal ignoring the query ids."""
|
||||||
|
return re.sub(QUERY_ID_REGEX, "<query_id>", msg1) == \
|
||||||
|
re.sub(QUERY_ID_REGEX, "<query_id>", msg2)
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ from subprocess import check_call
|
|||||||
from tests.util.filesystem_utils import get_fs_path
|
from tests.util.filesystem_utils import get_fs_path
|
||||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||||
from tests.common.skip import SkipIf, SkipIfFS
|
from tests.common.skip import SkipIf, SkipIfFS
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
|
|
||||||
LOG = logging.getLogger('test_coordinators')
|
LOG = logging.getLogger('test_coordinators')
|
||||||
LOG.setLevel(level=logging.DEBUG)
|
LOG.setLevel(level=logging.DEBUG)
|
||||||
@@ -320,10 +321,9 @@ class TestCoordinators(CustomClusterTestSuite):
|
|||||||
# Pick a non-trivial query that needs to be scheduled on executors.
|
# Pick a non-trivial query that needs to be scheduled on executors.
|
||||||
query = "select count(*) from functional.alltypes where month + random() < 3"
|
query = "select count(*) from functional.alltypes where month + random() < 3"
|
||||||
result = self.execute_query_expect_failure(self.client, query)
|
result = self.execute_query_expect_failure(self.client, query)
|
||||||
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
|
expected_error = "Admission for query exceeded timeout 2000ms in pool " \
|
||||||
"pool default-pool. Queued reason: Waiting for executors to " \
|
"default-pool. Queued reason: Waiting for executors to start."
|
||||||
"start."
|
assert error_msg_expected(str(result), expected_error)
|
||||||
assert expected_error in str(result)
|
|
||||||
# Now pick a coordinator only query.
|
# Now pick a coordinator only query.
|
||||||
query = "select 1"
|
query = "select 1"
|
||||||
self.execute_query_expect_success(self.client, query)
|
self.execute_query_expect_success(self.client, query)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function
|
|||||||
from builtins import range
|
from builtins import range
|
||||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||||
from tests.common.parametrize import UniqueDatabase
|
from tests.common.parametrize import UniqueDatabase
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from tests.util.concurrent_workload import ConcurrentWorkload
|
from tests.util.concurrent_workload import ConcurrentWorkload
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@@ -530,12 +531,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
|||||||
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
|
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
|
||||||
# Run query to make sure it times out
|
# Run query to make sure it times out
|
||||||
result = self.execute_query_expect_failure(self.client, TEST_QUERY)
|
result = self.execute_query_expect_failure(self.client, TEST_QUERY)
|
||||||
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
|
expected_error = "Admission for query exceeded timeout 2000ms in " \
|
||||||
"pool default-pool. Queued reason: Waiting for executors to " \
|
"pool default-pool. Queued reason: Waiting for executors to " \
|
||||||
"start. Only DDL queries and queries scheduled only on the " \
|
"start. Only DDL queries and queries scheduled only on the " \
|
||||||
"coordinator (either NUM_NODES set to 1 or when small query " \
|
"coordinator (either NUM_NODES set to 1 or when small query " \
|
||||||
"optimization is triggered) can currently run."
|
"optimization is triggered) can currently run."
|
||||||
assert expected_error in str(result)
|
assert error_msg_expected(str(result), expected_error)
|
||||||
assert self._get_num_executor_groups(only_healthy=True) == 0
|
assert self._get_num_executor_groups(only_healthy=True) == 0
|
||||||
|
|
||||||
@pytest.mark.execute_serially
|
@pytest.mark.execute_serially
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|||||||
from tests.common.kudu_test_suite import KuduTestSuite
|
from tests.common.kudu_test_suite import KuduTestSuite
|
||||||
from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
|
from tests.common.skip import SkipIfKudu, SkipIfBuildType, SkipIf
|
||||||
from tests.common.test_dimensions import add_mandatory_exec_option
|
from tests.common.test_dimensions import add_mandatory_exec_option
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from tests.util.event_processor_utils import EventProcessorUtils
|
from tests.util.event_processor_utils import EventProcessorUtils
|
||||||
|
|
||||||
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
|
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
|
||||||
@@ -432,19 +433,19 @@ class TestKuduTransactionBase(CustomClusterTestSuite):
|
|||||||
self.execute_query(self._update_query.format(table_name))
|
self.execute_query(self._update_query.format(table_name))
|
||||||
assert False, "query was expected to fail"
|
assert False, "query was expected to fail"
|
||||||
except ImpalaBeeswaxException as e:
|
except ImpalaBeeswaxException as e:
|
||||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.execute_query(self._upsert_query.format(table_name))
|
self.execute_query(self._upsert_query.format(table_name))
|
||||||
assert False, "query was expected to fail"
|
assert False, "query was expected to fail"
|
||||||
except ImpalaBeeswaxException as e:
|
except ImpalaBeeswaxException as e:
|
||||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.execute_query(self._delete_query.format(table_name))
|
self.execute_query(self._delete_query.format(table_name))
|
||||||
assert False, "query was expected to fail"
|
assert False, "query was expected to fail"
|
||||||
except ImpalaBeeswaxException as e:
|
except ImpalaBeeswaxException as e:
|
||||||
assert "Query aborted:Kudu reported error: Not implemented" in str(e)
|
assert error_msg_expected(str(e), "Kudu reported error: Not implemented")
|
||||||
|
|
||||||
# Verify that number of rows has not been changed.
|
# Verify that number of rows has not been changed.
|
||||||
cursor.execute(self._row_num_query.format(table_name))
|
cursor.execute(self._row_num_query.format(table_name))
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from beeswaxd.BeeswaxService import QueryState
|
|||||||
from tests.common.custom_cluster_test_suite import (
|
from tests.common.custom_cluster_test_suite import (
|
||||||
DEFAULT_CLUSTER_SIZE,
|
DEFAULT_CLUSTER_SIZE,
|
||||||
CustomClusterTestSuite)
|
CustomClusterTestSuite)
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
|
|
||||||
# The exact query doesn't matter much for these tests, just want a query that touches
|
# The exact query doesn't matter much for these tests, just want a query that touches
|
||||||
# data on all nodes.
|
# data on all nodes.
|
||||||
@@ -153,6 +154,7 @@ class TestProcessFailures(CustomClusterTestSuite):
|
|||||||
query_id = handle.get_handle().id
|
query_id = handle.get_handle().id
|
||||||
error_state = "Failed due to unreachable impalad"
|
error_state = "Failed due to unreachable impalad"
|
||||||
assert impalad.service.wait_for_query_status(client, query_id, error_state)
|
assert impalad.service.wait_for_query_status(client, query_id, error_state)
|
||||||
|
assert error_msg_expected(client.get_log(handle), error_state, query_id)
|
||||||
|
|
||||||
# Assert that the query status on the query profile web page contains the expected
|
# Assert that the query status on the query profile web page contains the expected
|
||||||
# failed hostport.
|
# failed hostport.
|
||||||
|
|||||||
@@ -187,8 +187,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
|
|||||||
impalad.service.create_hs2_client()
|
impalad.service.create_hs2_client()
|
||||||
assert False, "should have failed"
|
assert False, "should have failed"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
assert re.match(r"Number of sessions for user \S+ exceeds coordinator limit 2",
|
assert re.match(r".*Number of sessions for user \S+ exceeds coordinator limit 2",
|
||||||
str(e)), "Unexpected exception: " + str(e)
|
str(e), re.DOTALL), "Unexpected exception: " + str(e)
|
||||||
|
|
||||||
# Test webui for hs2 sessions.
|
# Test webui for hs2 sessions.
|
||||||
res = impalad.service.get_debug_webpage_json("/sessions")
|
res = impalad.service.get_debug_webpage_json("/sessions")
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from __future__ import absolute_import, division, print_function
|
|||||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
|
|
||||||
|
|
||||||
class TestSysDb(CustomClusterTestSuite):
|
class TestSysDb(CustomClusterTestSuite):
|
||||||
@@ -57,6 +58,8 @@ class TestSysDb(CustomClusterTestSuite):
|
|||||||
assert False, "table '{0}' should have failed to create but was created" \
|
assert False, "table '{0}' should have failed to create but was created" \
|
||||||
.format(table_name)
|
.format(table_name)
|
||||||
except ImpalaBeeswaxException as e:
|
except ImpalaBeeswaxException as e:
|
||||||
assert "Query aborted:IllegalStateException: Can't create blacklisted table: {0}" \
|
expected_error = "IllegalStateException: Can't create blacklisted table: {0}" \
|
||||||
.format(table_name) in str(e), "table '{0}' failed to create but for the " \
|
.format(table_name)
|
||||||
"wrong reason".format(table_name)
|
assert error_msg_expected(str(e), expected_error), \
|
||||||
|
"table '{0}' failed to create but for the wrong reason:\n{1}\n" \
|
||||||
|
.format(table_name, str(e))
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ from thrift.transport.TSocket import TSocket
|
|||||||
from thrift.transport.TTransport import TBufferedTransport
|
from thrift.transport.TTransport import TBufferedTransport
|
||||||
from thrift.protocol import TBinaryProtocol
|
from thrift.protocol import TBinaryProtocol
|
||||||
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
|
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
|
|
||||||
|
|
||||||
@@ -136,10 +137,11 @@ class HS2TestSuite(ImpalaTestSuite):
|
|||||||
def check_response(response,
|
def check_response(response,
|
||||||
expected_status_code = TCLIService.TStatusCode.SUCCESS_STATUS,
|
expected_status_code = TCLIService.TStatusCode.SUCCESS_STATUS,
|
||||||
expected_error_prefix = None):
|
expected_error_prefix = None):
|
||||||
assert response.status.statusCode == expected_status_code
|
assert response.status.statusCode == expected_status_code, str(response)
|
||||||
if expected_status_code != TCLIService.TStatusCode.SUCCESS_STATUS\
|
if expected_status_code != TCLIService.TStatusCode.SUCCESS_STATUS\
|
||||||
and expected_error_prefix is not None:
|
and expected_error_prefix is not None:
|
||||||
assert response.status.errorMessage.startswith(expected_error_prefix)
|
assert response.status.errorMessage.startswith(expected_error_prefix) or \
|
||||||
|
error_msg_expected(response.status.errorMessage, expected_error_prefix)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def check_invalid_session(response):
|
def check_invalid_session(response):
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ class TestRewrittenFile(ImpalaTestSuite):
|
|||||||
# Metadata for file '...' appears stale. Try running "refresh
|
# Metadata for file '...' appears stale. Try running "refresh
|
||||||
# unique_database_name.new_file_shorter" to reload the file metadata.
|
# unique_database_name.new_file_shorter" to reload the file metadata.
|
||||||
# IMPALA-2512: Error message could also be something like
|
# IMPALA-2512: Error message could also be something like
|
||||||
# Query aborted:Disk I/O error on ...:27001: Error seeking ...
|
# Disk I/O error on ...:27001: Error seeking ...
|
||||||
# between 0 and ... for '...'
|
# between 0 and ... for '...'
|
||||||
# TODO: find a better way to detect stale file meta and remove skip markers.
|
# TODO: find a better way to detect stale file meta and remove skip markers.
|
||||||
table_name = "new_file_shorter"
|
table_name = "new_file_shorter"
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
|||||||
from tests.common.iceberg_test_suite import IcebergTestSuite
|
from tests.common.iceberg_test_suite import IcebergTestSuite
|
||||||
from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
|
from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
|
||||||
from tests.common.test_dimensions import add_exec_option_dimension
|
from tests.common.test_dimensions import add_exec_option_dimension
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from tests.common.file_utils import (
|
from tests.common.file_utils import (
|
||||||
create_iceberg_table_from_directory,
|
create_iceberg_table_from_directory,
|
||||||
create_table_from_parquet)
|
create_table_from_parquet)
|
||||||
@@ -1378,7 +1379,7 @@ class TestIcebergTable(IcebergTestSuite):
|
|||||||
query_options=abort_ice_transaction_options)
|
query_options=abort_ice_transaction_options)
|
||||||
# Check that the error message looks reasonable.
|
# Check that the error message looks reasonable.
|
||||||
result = str(err)
|
result = str(err)
|
||||||
assert "Query aborted:CommitFailedException: simulated commit failure" in result
|
assert error_msg_expected(result, "CommitFailedException: simulated commit failure")
|
||||||
# Check that no data was inserted.
|
# Check that no data was inserted.
|
||||||
data = self.execute_query_expect_success(self.client,
|
data = self.execute_query_expect_success(self.client,
|
||||||
"select * from {0}".format(tbl_name))
|
"select * from {0}".format(tbl_name))
|
||||||
@@ -1393,7 +1394,8 @@ class TestIcebergTable(IcebergTestSuite):
|
|||||||
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
|
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
|
||||||
ddl_result = str(ddl_err)
|
ddl_result = str(ddl_err)
|
||||||
# Check that the error message looks reasonable.
|
# Check that the error message looks reasonable.
|
||||||
assert "Query aborted:CommitFailedException: simulated commit failure" in ddl_result
|
assert error_msg_expected(ddl_result,
|
||||||
|
"CommitFailedException: simulated commit failure")
|
||||||
# Check that no column was added.
|
# Check that no column was added.
|
||||||
data = self.execute_query_expect_success(self.client,
|
data = self.execute_query_expect_success(self.client,
|
||||||
"select * from {0}".format(tbl_name))
|
"select * from {0}".format(tbl_name))
|
||||||
|
|||||||
@@ -39,10 +39,12 @@ from tests.common.skip import SkipIf
|
|||||||
from tests.common.test_dimensions import (
|
from tests.common.test_dimensions import (
|
||||||
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
||||||
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
from tests.shell.util import (get_impalad_host_port, assert_var_substitution,
|
from tests.shell.util import (get_impalad_host_port, assert_var_substitution,
|
||||||
run_impala_shell_cmd, ImpalaShell, build_shell_env, wait_for_query_state,
|
run_impala_shell_cmd, ImpalaShell, build_shell_env, wait_for_query_state,
|
||||||
create_impala_shell_executable_dimension, get_impala_shell_executable)
|
create_impala_shell_executable_dimension, get_impala_shell_executable,
|
||||||
|
stderr_get_first_error_msg)
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
|
|
||||||
|
|
||||||
@@ -274,8 +276,11 @@ class TestImpalaShell(ImpalaTestSuite):
|
|||||||
args = ['-q', 'set abort_on_error=true;'
|
args = ['-q', 'set abort_on_error=true;'
|
||||||
'select id from functional_parquet.bad_column_metadata t']
|
'select id from functional_parquet.bad_column_metadata t']
|
||||||
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
||||||
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr
|
assert error_msg_expected(
|
||||||
assert 'but read 10 values from column id.' in result.stderr
|
stderr_get_first_error_msg(result.stderr),
|
||||||
|
"Column metadata states there are 11 values, but read 10 values from column id."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_completed_query_errors_2(self, vector):
|
def test_completed_query_errors_2(self, vector):
|
||||||
if vector.get_value('strict_hs2_protocol'):
|
if vector.get_value('strict_hs2_protocol'):
|
||||||
@@ -284,9 +289,10 @@ class TestImpalaShell(ImpalaTestSuite):
|
|||||||
'select id, cnt from functional_parquet.bad_column_metadata t, '
|
'select id, cnt from functional_parquet.bad_column_metadata t, '
|
||||||
'(select 1 cnt) u']
|
'(select 1 cnt) u']
|
||||||
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
result = run_impala_shell_cmd(vector, args, expect_success=False)
|
||||||
assert 'ERROR: Column metadata states there are 11 values, ' in result.stderr,\
|
assert error_msg_expected(
|
||||||
result.stderr
|
stderr_get_first_error_msg(result.stderr),
|
||||||
assert 'but read 10 values from column id.' in result.stderr, result.stderr
|
"Column metadata states there are 11 values, but read 10 values from column id."
|
||||||
|
)
|
||||||
|
|
||||||
def test_no_warnings_in_log_with_quiet_mode(self, vector):
|
def test_no_warnings_in_log_with_quiet_mode(self, vector):
|
||||||
if vector.get_value('strict_hs2_protocol'):
|
if vector.get_value('strict_hs2_protocol'):
|
||||||
|
|||||||
@@ -46,9 +46,11 @@ from tests.common.skip import SkipIfLocal
|
|||||||
from tests.common.test_dimensions import (
|
from tests.common.test_dimensions import (
|
||||||
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
create_client_protocol_dimension, create_client_protocol_strict_dimension,
|
||||||
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
create_uncompressed_text_dimension, create_single_exec_option_dimension)
|
||||||
|
from tests.common.test_result_verifier import error_msg_expected
|
||||||
from tests.shell.util import (assert_var_substitution, ImpalaShell, get_impalad_port,
|
from tests.shell.util import (assert_var_substitution, ImpalaShell, get_impalad_port,
|
||||||
get_shell_cmd, get_open_sessions_metric, spawn_shell, get_unused_port,
|
get_shell_cmd, get_open_sessions_metric, spawn_shell, get_unused_port,
|
||||||
create_impala_shell_executable_dimension, get_impala_shell_executable)
|
create_impala_shell_executable_dimension, get_impala_shell_executable,
|
||||||
|
stderr_get_first_error_msg)
|
||||||
|
|
||||||
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
|
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
|
||||||
|
|
||||||
@@ -1161,8 +1163,10 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
|
|||||||
assert "ParseException" in result.stderr,\
|
assert "ParseException" in result.stderr,\
|
||||||
result.stderr
|
result.stderr
|
||||||
else:
|
else:
|
||||||
assert "ERROR: ParseException: Unmatched string literal" in result.stderr,\
|
assert error_msg_expected(
|
||||||
result.stderr
|
stderr_get_first_error_msg(result.stderr),
|
||||||
|
"ParseException: Unmatched string literal"
|
||||||
|
)
|
||||||
|
|
||||||
def test_utf8_error_message(self, vector):
|
def test_utf8_error_message(self, vector):
|
||||||
if vector.get_value('strict_hs2_protocol'):
|
if vector.get_value('strict_hs2_protocol'):
|
||||||
@@ -1172,8 +1176,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
|
|||||||
query = "select cast(now() as string format 'yyyy年MM月dd日')"
|
query = "select cast(now() as string format 'yyyy年MM月dd日')"
|
||||||
shell.send_cmd(query)
|
shell.send_cmd(query)
|
||||||
result = shell.get_result()
|
result = shell.get_result()
|
||||||
assert "ERROR: Bad date/time conversion format: yyyy年MM月dd日" in result.stderr,\
|
assert error_msg_expected(
|
||||||
result.stderr
|
stderr_get_first_error_msg(result.stderr),
|
||||||
|
"Bad date/time conversion format: yyyy年MM月dd日"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_timezone_validation(self, vector):
|
def test_timezone_validation(self, vector):
|
||||||
"""Test that query option TIMEZONE is validated when executing a query.
|
"""Test that query option TIMEZONE is validated when executing a query.
|
||||||
|
|||||||
@@ -400,3 +400,9 @@ def get_impala_shell_executable(vector):
|
|||||||
'python2': [os.path.join(IMPALA_HOME, 'shell/build/python2_venv/bin/impala-shell')],
|
'python2': [os.path.join(IMPALA_HOME, 'shell/build/python2_venv/bin/impala-shell')],
|
||||||
'python3': [os.path.join(IMPALA_HOME, 'shell/build/python3_venv/bin/impala-shell')]
|
'python3': [os.path.join(IMPALA_HOME, 'shell/build/python3_venv/bin/impala-shell')]
|
||||||
}[vector.get_value_with_default('impala_shell', 'dev')]
|
}[vector.get_value_with_default('impala_shell', 'dev')]
|
||||||
|
|
||||||
|
|
||||||
|
def stderr_get_first_error_msg(stderr):
|
||||||
|
"""Seek to the begining of the first error message in stderr of impala-shell."""
|
||||||
|
PROMPT = "\nERROR: "
|
||||||
|
return stderr[(stderr.index(PROMPT) + len(PROMPT)):]
|
||||||
|
|||||||
Reference in New Issue
Block a user