diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h index 0e35999fa..a0f27c78f 100644 --- a/be/src/runtime/backend-client.h +++ b/be/src/runtime/backend-client.h @@ -104,6 +104,14 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient { ImpalaInternalServiceClient::recv_PublishFilter(_return); } + void RemoteShutdown(TRemoteShutdownResult& _return, const TRemoteShutdownParams& params, + bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_RemoteShutdown(params); + *send_done = true; + ImpalaInternalServiceClient::recv_RemoteShutdown(_return); + } + #pragma clang diagnostic pop private: diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h index 500ad5905..21b323d2f 100644 --- a/be/src/runtime/client-cache.h +++ b/be/src/runtime/client-cache.h @@ -27,10 +27,11 @@ #include #include "catalog/catalog-service-client-wrapper.h" -#include "runtime/client-cache-types.h" -#include "util/metrics.h" #include "rpc/thrift-client.h" #include "rpc/thrift-util.h" +#include "runtime/client-cache-types.h" +#include "util/debug-util.h" +#include "util/metrics.h" #include "common/status.h" @@ -261,6 +262,43 @@ class ClientConnection { return Status::OK(); } + /// Return struct for DoRpcWithRetry() that allows callers to distinguish between + /// failures in getting a client and failures sending the RPC. + struct RpcStatus { + Status status; + + // Set to true if 'status' is not OK and the error occurred while getting the client. + bool client_error; + + static RpcStatus OK() { return {Status::OK(), false}; } + }; + + /// Helper that retries constructing a client and calling DoRpc() up the three times + /// and handles both RPC failures and failures to get a client from 'client_cache'. + /// 'debug_fn' is a Status-returning function that can be used to inject errors into + /// the RPC. + template + static RpcStatus DoRpcWithRetry(ClientCache* client_cache, TNetworkAddress address, + const F& f, const Request& request, const DebugF& debug_fn, Response* response) { + Status rpc_status; + Status client_status; + + // Try to send the RPC 3 times before failing. + for (int i = 0; i < 3; ++i) { + ImpalaBackendConnection client(client_cache, address, &client_status); + if (!client_status.ok()) continue; + + rpc_status = debug_fn(); + if (!rpc_status.ok()) continue; + + rpc_status = client.DoRpc(f, request, response); + if (rpc_status.ok()) break; + } + if (!client_status.ok()) return {client_status, true}; + if (!rpc_status.ok()) return {rpc_status, false}; + return RpcStatus::OK(); + } + /// In certain cases, the server may take longer to provide an RPC response than /// the configured socket timeout. Callers may wish to retry receiving the response. /// This is safe if and only if DoRpc() returned RPC_RECV_TIMEOUT. diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 2d97b546c..95484a490 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -377,38 +377,29 @@ bool Coordinator::BackendState::Cancel() { VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id()) << " backend=" << TNetworkAddressToString(impalad_address()); - Status rpc_status; - Status client_status; - // Try to send the RPC 3 times before failing. - for (int i = 0; i < 3; ++i) { - ImpalaBackendConnection backend_client(ExecEnv::GetInstance()->impalad_client_cache(), - impalad_address(), &client_status); - if (!client_status.ok()) continue; - rpc_status = DebugAction(query_ctx().client_request.query_options, - "COORD_CANCEL_QUERY_FINSTANCES_RPC"); - if (!rpc_status.ok()) continue; - - // The return value 'dummy' is ignored as it's only set if the fragment - // instance cannot be found in the backend. The fragment instances of a query - // can all be cancelled locally in a backend due to RPC failure to - // coordinator. In which case, the query state can be gone already. - rpc_status = backend_client.DoRpc( - &ImpalaBackendClient::CancelQueryFInstances, params, &dummy); - if (rpc_status.ok()) break; - } - if (!client_status.ok()) { - status_.MergeStatus(client_status); - VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) - << " failed to connect to " << TNetworkAddressToString(impalad_address()) - << " :" << client_status.msg().msg(); - return true; - } - if (!rpc_status.ok()) { - status_.MergeStatus(rpc_status); - VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) - << " rpc to " << TNetworkAddressToString(impalad_address()) - << " failed: " << rpc_status.msg().msg(); + // The return value 'dummy' is ignored as it's only set if the fragment + // instance cannot be found in the backend. The fragment instances of a query + // can all be cancelled locally in a backend due to RPC failure to + // coordinator. In which case, the query state can be gone already. + ImpalaBackendConnection::RpcStatus rpc_status = ImpalaBackendConnection::DoRpcWithRetry( + ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), + &ImpalaBackendClient::CancelQueryFInstances, params, + [this] () { + return DebugAction(query_ctx().client_request.query_options, + "COORD_CANCEL_QUERY_FINSTANCES_RPC"); + }, &dummy); + if (!rpc_status.status.ok()) { + status_.MergeStatus(rpc_status.status); + if (rpc_status.client_error) { + VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) + << " failed to connect to " << TNetworkAddressToString(impalad_address()) + << " :" << rpc_status.status.msg().msg(); + } else { + VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id()) + << " rpc to " << TNetworkAddressToString(impalad_address()) + << " failed: " << rpc_status.status.msg().msg(); + } return true; } return true; diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index ae4f049a0..43bf199b3 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -190,7 +190,14 @@ void Scheduler::UpdateMembership( << TNetworkAddressToString(local_backend_descriptor_.address) << ")"; continue; } - if (be_desc.is_executor) { + if (be_desc.is_quiescing) { + // Make sure backends that are shutting down are not scheduled on. + auto it = current_executors_.find(item.key); + if (it != current_executors_.end()) { + new_executors_config->RemoveBackend(it->second); + current_executors_.erase(it); + } + } else if (be_desc.is_executor) { new_executors_config->AddBackend(be_desc); current_executors_.insert(make_pair(item.key, be_desc)); } diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 0db511d28..ca871ab3e 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -21,6 +21,7 @@ #include #include +#include "runtime/backend-client.h" #include "runtime/coordinator.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" @@ -51,6 +52,7 @@ using namespace apache::thrift; using namespace beeswax; using namespace strings; +DECLARE_int32(be_port); DECLARE_int32(catalog_service_port); DECLARE_string(catalog_service_host); DECLARE_int64(max_result_cache_size); @@ -220,6 +222,9 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) { } break; } + case TStmtType::ADMIN_FN: + DCHECK(exec_request_.admin_request.type == TAdminRequestType::SHUTDOWN); + return ExecShutdownRequest(); default: stringstream errmsg; errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type; @@ -603,6 +608,44 @@ Status ClientRequestState::ExecDdlRequest() { return Status::OK(); } +Status ClientRequestState::ExecShutdownRequest() { + const TShutdownParams& request = exec_request_.admin_request.shutdown_params; + int port = request.__isset.backend && request.backend.port != 0 ? request.backend.port : + FLAGS_be_port; + // Use the local shutdown code path if the host is unspecified or if it exactly matches + // the configured host/port. This avoids the possibility of RPC errors preventing + // shutdown. + if (!request.__isset.backend + || (request.backend.hostname == FLAGS_hostname && port == FLAGS_be_port)) { + TShutdownStatus shutdown_status; + int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1; + RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status)); + SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)}); + return Status::OK(); + } + TNetworkAddress addr = MakeNetworkAddress(request.backend.hostname, port); + + TRemoteShutdownParams params; + if (request.__isset.deadline_s) params.__set_deadline_s(request.deadline_s); + TRemoteShutdownResult resp; + VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr); + ImpalaBackendConnection::RpcStatus rpc_status = ImpalaBackendConnection::DoRpcWithRetry( + ExecEnv::GetInstance()->impalad_client_cache(), addr, + &ImpalaBackendClient::RemoteShutdown, params, + [this]() { return DebugAction(query_options(), "CRS_SHUTDOWN_RPC"); }, &resp); + if (!rpc_status.status.ok()) { + VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id()) + << " failed to send RPC to " << TNetworkAddressToString(addr) << " :" + << rpc_status.status.msg().msg(); + return rpc_status.status; + } + + Status shutdown_status(resp.status); + RETURN_IF_ERROR(shutdown_status); + SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status)}); + return Status::OK(); +} + void ClientRequestState::Done() { MarkActive(); // Make sure we join on wait_thread_ before we finish (and especially before this object diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index b524409ae..9442a0d9b 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -398,8 +398,6 @@ class ClientRequestState { bool user_has_profile_access_ = true; TResultSetMetadata result_metadata_; // metadata for select query - RowBatch* current_batch_ = nullptr; // the current row batch; only applicable if coord is set - int current_batch_row_ = 0 ; // number of rows fetched within the current batch int num_rows_fetched_ = 0; // number of rows fetched by client for the entire query /// True if a fetch was attempted by a client, regardless of whether a result set @@ -448,8 +446,8 @@ class ClientRequestState { /// queries (e.g., compute stats) or dml (e.g., create table as select) Status ExecDdlRequest() WARN_UNUSED_RESULT; - /// Executes a LOAD DATA - Status ExecLoadDataRequest() WARN_UNUSED_RESULT; + /// Executes a shut down request. + Status ExecShutdownRequest() WARN_UNUSED_RESULT; /// Core logic of Wait(). Does not update operation_state_/query_status_. Status WaitInternal() WARN_UNUSED_RESULT; diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 3ecbe803a..c3570b294 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -52,6 +52,8 @@ namespace impala { void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { VLOG_QUERY << "query(): query=" << query.query; + RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); + ScopedSessionState session_handle(this); shared_ptr session; RAISE_IF_ERROR( @@ -87,6 +89,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, const LogContextId& client_ctx) { VLOG_QUERY << "executeAndWait(): query=" << query.query; + RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); ScopedSessionState session_handle(this); shared_ptr session; RAISE_IF_ERROR( @@ -140,6 +143,7 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que // Translate Beeswax Query to Impala's QueryRequest and then set the explain plan bool // before shipping to FE VLOG_QUERY << "explain(): query=" << query.query; + RAISE_IF_ERROR(CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); ScopedSessionState session_handle(this); RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()), SQLSTATE_GENERAL_ERROR); diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 353c19035..881b19aff 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -266,6 +266,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext( // HiveServer2 API void ImpalaServer::OpenSession(TOpenSessionResp& return_val, const TOpenSessionReq& request) { + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); // Generate session ID and the secret TUniqueId session_id; @@ -384,6 +385,7 @@ void ImpalaServer::CloseSession(TCloseSessionResp& return_val, void ImpalaServer::GetInfo(TGetInfoResp& return_val, const TGetInfoReq& request) { VLOG_QUERY << "GetInfo(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TUniqueId session_id; TUniqueId secret; @@ -412,6 +414,7 @@ void ImpalaServer::GetInfo(TGetInfoResp& return_val, void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, const TExecuteStatementReq& request) { VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); // We ignore the runAsync flag here: Impala's queries will always run asynchronously, // and will block on fetch. To the client, this looks like Hive's synchronous mode; the // difference is that rows are not available when ExecuteStatement() returns. @@ -490,6 +493,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val, const TGetTypeInfoReq& request) { VLOG_QUERY << "GetTypeInfo(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_TYPE_INFO); @@ -508,6 +512,7 @@ void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val, void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val, const TGetCatalogsReq& request) { VLOG_QUERY << "GetCatalogs(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_CATALOGS); @@ -526,6 +531,7 @@ void ImpalaServer::GetCatalogs(TGetCatalogsResp& return_val, void ImpalaServer::GetSchemas(TGetSchemasResp& return_val, const TGetSchemasReq& request) { VLOG_QUERY << "GetSchemas(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_SCHEMAS); @@ -544,6 +550,7 @@ void ImpalaServer::GetSchemas(TGetSchemasResp& return_val, void ImpalaServer::GetTables(TGetTablesResp& return_val, const TGetTablesReq& request) { VLOG_QUERY << "GetTables(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_TABLES); @@ -562,6 +569,7 @@ void ImpalaServer::GetTables(TGetTablesResp& return_val, void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val, const TGetTableTypesReq& request) { VLOG_QUERY << "GetTableTypes(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_TABLE_TYPES); @@ -581,6 +589,7 @@ void ImpalaServer::GetTableTypes(TGetTableTypesResp& return_val, void ImpalaServer::GetColumns(TGetColumnsResp& return_val, const TGetColumnsReq& request) { VLOG_QUERY << "GetColumns(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_COLUMNS); @@ -599,6 +608,7 @@ void ImpalaServer::GetColumns(TGetColumnsResp& return_val, void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val, const TGetFunctionsReq& request) { VLOG_QUERY << "GetFunctions(): request=" << ThriftDebugString(request); + HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR); TMetadataOpRequest req; req.__set_opcode(TMetadataOpcode::GET_FUNCTIONS); diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 05d3c58e2..5435e7a06 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -844,6 +844,7 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args, backend_obj.AddMember("is_coordinator", backend.is_coordinator, document->GetAllocator()); backend_obj.AddMember("is_executor", backend.is_executor, document->GetAllocator()); + backend_obj.AddMember("is_quiescing", backend.is_quiescing, document->GetAllocator()); backends_list.PushBack(backend_obj, document->GetAllocator()); } document->AddMember("backends", backends_list, document->GetAllocator()); diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc index 864a1da50..3e7b22250 100644 --- a/be/src/service/impala-internal-service.cc +++ b/be/src/service/impala-internal-service.cc @@ -107,3 +107,11 @@ void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val, if (qs.get() == nullptr) return; qs->PublishFilter(params); } + +void ImpalaInternalService::RemoteShutdown(TRemoteShutdownResult& return_val, + const TRemoteShutdownParams& params) { + FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN); + Status status = impala_server_->StartShutdown( + params.__isset.deadline_s ? params.deadline_s : -1, &return_val.shutdown_status); + status.ToThrift(&return_val.status); +} diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h index 8d5ddd5d9..971670acd 100644 --- a/be/src/service/impala-internal-service.h +++ b/be/src/service/impala-internal-service.h @@ -41,6 +41,8 @@ class ImpalaInternalService : public ImpalaInternalServiceIf { const TUpdateFilterParams& params); virtual void PublishFilter(TPublishFilterResult& return_val, const TPublishFilterParams& params); + virtual void RemoteShutdown(TRemoteShutdownResult& return_val, + const TRemoteShutdownParams& params); private: ImpalaServer* impala_server_; diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 0999b5e8f..07be1774b 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -210,6 +210,19 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query " "fragments."); +// TODO: can we automatically choose a startup grace period based on the max admission +// control queue timeout + some margin for error? +DEFINE_int64(shutdown_grace_period_s, 120, "Shutdown startup grace period in seconds. " + "When the shutdown process is started for this daemon, it will wait for at least the " + "startup grace period before shutting down. This gives time for updated cluster " + "membership information to propagate to all coordinators and for fragment instances " + "that were scheduled based on old cluster membership to start executing (and " + "therefore be reflected in the metrics used to detect quiescence)."); + +DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for the shut " + "down process. If this duration elapses after the shut down process is started, " + "the daemon shuts down regardless of any running queries."); + #ifndef NDEBUG DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading" "for a given query by injecting a sleep equivalent to this configuration in " @@ -637,6 +650,7 @@ void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) { case TStmtType::EXPLAIN: case TStmtType::LOAD: case TStmtType::SET: + case TStmtType::ADMIN_FN: default: break; } @@ -1662,8 +1676,13 @@ void ImpalaServer::MembershipCallback( VLOG(2) << "Error deserializing topic item with key: " << item.key; continue; } - // This is a new item - add it to the map of known backends. - known_backends_.insert(make_pair(item.key, backend_descriptor)); + // This is a new or modified item - add it to the map of known backends. + auto it = known_backends_.find(item.key); + if (it != known_backends_.end()) { + it->second = backend_descriptor; + } else { + known_backends_.emplace_hint(it, item.key, backend_descriptor); + } } // Register the local backend in the statestore and update the list of known backends. @@ -1753,7 +1772,14 @@ void ImpalaServer::MembershipCallback( void ImpalaServer::AddLocalBackendToStatestore( vector* subscriber_topic_updates) { const string& local_backend_id = exec_env_->subscriber()->id(); - if (known_backends_.find(local_backend_id) != known_backends_.end()) return; + bool is_quiescing = shutting_down_.Load() != 0; + auto it = known_backends_.find(local_backend_id); + // 'is_quiescing' can change during the lifetime of the Impalad - make sure that the + // membership reflects the current value. + if (it != known_backends_.end() + && is_quiescing == it->second.is_quiescing) { + return; + } TBackendDescriptor local_backend_descriptor; local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator); @@ -1762,6 +1788,7 @@ void ImpalaServer::AddLocalBackendToStatestore( local_backend_descriptor.ip_address = exec_env_->ip_address(); local_backend_descriptor.__set_proc_mem_limit( exec_env_->process_mem_tracker()->limit()); + local_backend_descriptor.__set_is_quiescing(is_quiescing); const TNetworkAddress& krpc_address = exec_env_->krpc_address(); DCHECK(IsResolvedAddress(krpc_address)); local_backend_descriptor.__set_krpc_address(krpc_address); @@ -1778,6 +1805,8 @@ void ImpalaServer::AddLocalBackendToStatestore( LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:" << " " << status.GetDetail(); subscriber_topic_updates->pop_back(); + } else if (it != known_backends_.end()) { + it->second.is_quiescing = is_quiescing; } else { known_backends_.insert(make_pair(item.key, local_backend_descriptor)); } @@ -2272,6 +2301,8 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, } void ImpalaServer::Join() { + // The server shuts down by exiting the process, so just block here until the process + // exits. thrift_be_server_->Join(); thrift_be_server_.reset(); @@ -2281,7 +2312,6 @@ void ImpalaServer::Join() { beeswax_server_.reset(); hs2_server_.reset(); } - shutdown_promise_.Get(); } shared_ptr ImpalaServer::GetClientRequestState( @@ -2310,4 +2340,94 @@ void ImpalaServer::UpdateFilter(TUpdateFilterResult& result, } client_request_state->UpdateFilter(params); } + +Status ImpalaServer::CheckNotShuttingDown() const { + if (!IsShuttingDown()) return Status::OK(); + return Status::Expected(ErrorMsg( + TErrorCode::SERVER_SHUTTING_DOWN, ShutdownStatusToString(GetShutdownStatus()))); +} + +TShutdownStatus ImpalaServer::GetShutdownStatus() const { + TShutdownStatus result; + int64_t shutdown_time = shutting_down_.Load(); + DCHECK_GT(shutdown_time, 0); + int64_t shutdown_deadline = shutdown_deadline_.Load(); + DCHECK_GT(shutdown_time, 0); + int64_t now = MonotonicMillis(); + int64_t elapsed_ms = now - shutdown_time; + result.grace_remaining_ms = + max(0, FLAGS_shutdown_grace_period_s * 1000 - elapsed_ms); + result.deadline_remaining_ms = + max(0, shutdown_deadline - now); + result.finstances_executing = + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue(); + result.client_requests_registered = ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue(); + return result; +} + +string ImpalaServer::ShutdownStatusToString(const TShutdownStatus& shutdown_status) { + return Substitute("startup grace period left: $0, deadline left: $1, " + "fragment instances: $2, queries registered: $3", + PrettyPrinter::Print(shutdown_status.grace_remaining_ms, TUnit::TIME_MS), + PrettyPrinter::Print(shutdown_status.deadline_remaining_ms, TUnit::TIME_MS), + shutdown_status.finstances_executing, shutdown_status.client_requests_registered); +} + +Status ImpalaServer::StartShutdown( + int64_t relative_deadline_s, TShutdownStatus* shutdown_status) { + DCHECK_GE(relative_deadline_s, -1); + if (relative_deadline_s == -1) relative_deadline_s = FLAGS_shutdown_deadline_s; + int64_t now = MonotonicMillis(); + int64_t new_deadline = now + relative_deadline_s * 1000L; + + bool set_deadline = false; + bool set_grace = false; + int64_t curr_deadline = shutdown_deadline_.Load(); + while (curr_deadline == 0 || curr_deadline > new_deadline) { + // Set the deadline - it was either unset or later than the new one. + if (shutdown_deadline_.CompareAndSwap(curr_deadline, new_deadline)) { + set_deadline = true; + break; + } + curr_deadline = shutdown_deadline_.Load(); + } + + while (shutting_down_.Load() == 0) { + if (!shutting_down_.CompareAndSwap(0, now)) continue; + unique_ptr t; + Status status = + Thread::Create("shutdown", "shutdown", [this] { ShutdownThread(); }, &t, false); + if (!status.ok()) { + LOG(ERROR) << "Failed to create shutdown thread: " << status.GetDetail(); + return status; + } + set_grace = true; + break; + } + *shutdown_status = GetShutdownStatus(); + // Show the full grace/limit times to avoid showing confusing intermediate values + // to the person running the statement. + if (set_grace) { + shutdown_status->grace_remaining_ms = FLAGS_shutdown_grace_period_s * 1000L; + } + if (set_deadline) shutdown_status->deadline_remaining_ms = relative_deadline_s * 1000L; + return Status::OK(); +} + +[[noreturn]] void ImpalaServer::ShutdownThread() { + while (true) { + SleepForMs(1000); + TShutdownStatus shutdown_status = GetShutdownStatus(); + LOG(INFO) << "Shutdown status: " << ShutdownStatusToString(shutdown_status); + if (shutdown_status.grace_remaining_ms <= 0 + && shutdown_status.finstances_executing == 0 + && shutdown_status.client_requests_registered == 0) { + break; + } else if (shutdown_status.deadline_remaining_ms <= 0) { + break; + } + } + LOG(INFO) << "Shutdown complete, going down."; + exit(0); +} } diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 14c33d19a..7db5ce469 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -97,6 +97,40 @@ class QuerySchedule; /// Internally, the Membership callback thread also participates in startup: /// - If services_started_, then register to the statestore as an executor. /// +/// Shutdown +/// -------- +/// Impala Server shutdown can be initiated by a remote shutdown command from another +/// Impala daemon or by a local shutdown command from a user session. The shutdown +/// sequence aims to quiesce the Impalad (i.e. drain it of any running finstances or +/// client requests) then exit the process cleanly. The shutdown sequence is as follows: +/// +/// 1. StartShutdown() is called to initiate the process. +/// 2. The startup grace period starts, during which: +/// - no new client requests are accepted. Clients can still interact with registered +/// requests and sessions as normal. +/// - the Impala daemon is marked in the statestore as quiescing, so coordinators +/// will not schedule new fragments on it (once the statestore update propagates). +/// - the Impala daemon continues to start executing any new fragments sent to it by +/// coordinators. This is required because the query may have been submitted before +/// the coordinator learned that the executor was quiescing. Delays occur for several +/// reasons: +/// -> Latency of membership propagation through the statestore +/// -> Latency of query startup work including scheduling, admission control and +/// fragment startup. +/// -> Queuing delay in the admission controller (which may be unbounded). +/// 3. The startup grace period elapses. +/// 4. The background shutdown thread periodically checks to see if the Impala daemon is +/// quiesced (i.e. no client requests are registered and no fragment instances are +/// executing). If it is quiesced then it cleanly shuts down by exiting the process. +/// 5. The shutdown deadline elapses. The Impala daemon exits regardless of whether +/// it was successfully quiesced or not. +/// +/// If shutdown is initiated again during this process, it does not cancel the existing +/// shutdown but can decrease the deadline, e.g. if an administrator starts shutdown +/// with a deadline of 1 hour, but then wants to shut down the cluster sooner, they can +/// run the shutdown function again to set a shorter deadline. The deadline can't be +/// increased after shutdown is started. +/// /// Locking /// ------- /// This class is partially thread-safe. To ensure freedom from deadlock, if multiple @@ -118,10 +152,6 @@ class QuerySchedule; /// * catalog_version_lock_ /// * connection_to_sessions_map_lock_ /// -/// TODO: The state of a running query is currently not cleaned up if the -/// query doesn't experience any errors at runtime and close() doesn't get called. -/// The solution is to have a separate thread that cleans up orphaned -/// query execution states after a timeout period. /// TODO: The same doesn't apply to the execution state of an individual plan /// fragment: the originating coordinator might die, but we can get notified of /// that via the statestore. This still needs to be implemented. @@ -145,12 +175,9 @@ class ImpalaServer : public ImpalaServiceIf, /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port(). Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port); - /// Blocks until the server shuts down (by calling Shutdown()). + /// Blocks until the server shuts down. void Join(); - /// Triggers service shutdown, by unblocking Join(). - void Shutdown() { shutdown_promise_.Set(true); } - /// ImpalaService rpcs: Beeswax API (implemented in impala-beeswax-server.cc) virtual void query(beeswax::QueryHandle& query_handle, const beeswax::Query& query); virtual void executeAndWait(beeswax::QueryHandle& query_handle, @@ -372,6 +399,28 @@ class ImpalaServer : public ImpalaServiceIf, typedef boost::unordered_map BackendDescriptorMap; const BackendDescriptorMap& GetKnownBackends(); + /// Start the shutdown process. Return an error if it could not be started. Otherwise, + /// if it was successfully started by this or a previous call, return OK along with + /// information about the pending shutdown in 'shutdown_status'. 'relative_deadline_s' + /// is the deadline value in seconds to use, or -1 if we should use the default + /// deadline. See Shutdown class comment for explanation of the shutdown sequence. + Status StartShutdown(int64_t relative_deadline_s, TShutdownStatus* shutdown_status); + + /// Returns true if a shut down is in progress. + bool IsShuttingDown() const { return shutting_down_.Load() != 0; } + + /// Returns an informational error about why a new operation could not be started + /// if the server is shutting down. Must be called before starting execution of a + /// new operation (e.g. a query). + Status CheckNotShuttingDown() const; + + /// Return information about the status of a shutdown. Only valid to call if a shutdown + /// is in progress (i.e. IsShuttingDown() is true). + TShutdownStatus GetShutdownStatus() const; + + /// Convert the shutdown status to a human-readable string. + static std::string ShutdownStatusToString(const TShutdownStatus& shutdown_status); + // Mapping between query option names and levels QueryOptionLevels query_option_levels_; @@ -852,6 +901,9 @@ class ImpalaServer : public ImpalaServiceIf, const std::string& authorized_proxy_config_delimiter, AuthorizedProxyMap* authorized_proxy_map); + /// Background thread that does the shutdown. + [[noreturn]] void ShutdownThread(); + /// Guards query_log_ and query_log_index_ boost::mutex query_log_lock_; @@ -1143,11 +1195,15 @@ class ImpalaServer : public ImpalaServiceIf, /// set after all services required for the server have been started. std::atomic_bool services_started_; - /// Set to true when this ImpalaServer should shut down. - Promise shutdown_promise_; + /// Whether the Impala server shutdown process started. If 0, shutdown was not started. + /// Otherwise, this is the MonotonicMillis() value when the shut down was started. + AtomicInt64 shutting_down_{0}; + + /// The MonotonicMillis() value after we should shut down regardless of registered + /// client requests and running finstances. Set before 'shutting_down_' and updated + /// atomically if a new shutdown command with a shorter deadline comes in. + AtomicInt64 shutdown_deadline_{0}; }; - - } #endif diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h index f17327fc0..f545e1f96 100644 --- a/be/src/testutil/fault-injection-util.h +++ b/be/src/testutil/fault-injection-util.h @@ -34,6 +34,7 @@ class FaultInjectionUtil { RPC_UPDATEFILTER, RPC_TRANSMITDATA, RPC_REPORTEXECSTATUS, + RPC_REMOTESHUTDOWN, RPC_RANDOM // This must be last. }; diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc index 0b64702ea..c492d0695 100644 --- a/be/src/util/default-path-handlers.cc +++ b/be/src/util/default-path-handlers.cc @@ -251,11 +251,20 @@ void RootHandler(const Webserver::ArgumentMap& args, Document* document) { ExecEnv* env = ExecEnv::GetInstance(); if (env == nullptr || env->impala_server() == nullptr) return; + ImpalaServer* impala_server = env->impala_server(); document->AddMember("impala_server_mode", true, document->GetAllocator()); - document->AddMember("is_coordinator", env->impala_server()->IsCoordinator(), + document->AddMember("is_coordinator", impala_server->IsCoordinator(), document->GetAllocator()); - document->AddMember("is_executor", env->impala_server()->IsExecutor(), + document->AddMember("is_executor", impala_server->IsExecutor(), document->GetAllocator()); + bool is_quiescing = impala_server->IsShuttingDown(); + document->AddMember("is_quiescing", is_quiescing, document->GetAllocator()); + if (is_quiescing) { + Value shutdown_status( + impala_server->ShutdownStatusToString(impala_server->GetShutdownStatus()).c_str(), + document->GetAllocator()); + document->AddMember("shutdown_status", shutdown_status, document->GetAllocator()); + } } void AddDefaultUrlCallbacks(Webserver* webserver, MemTracker* process_mem_tracker, diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index a7d56d793..bd4286f7a 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -513,6 +513,31 @@ struct TSetQueryOptionRequest { 3: optional bool is_set_all } +struct TShutdownParams { + // Set if a backend was specified as an argument to the shutdown function. If not set, + // the current impala daemon will be shut down. If the port was specified, it is set + // in 'backend'. If it was not specified, it is 0 and the port configured for this + // Impala daemon is assumed. + 1: optional Types.TNetworkAddress backend + + // Deadline in seconds for shutting down. + 2: optional i64 deadline_s +} + +// The type of administrative function to be executed. +enum TAdminRequestType { + SHUTDOWN +} + +// Parameters for administrative function statement. This is essentially a tagged union +// that contains parameters for the type of administrative statement to be executed. +struct TAdminRequest { + 1: required TAdminRequestType type + + // The below member corresponding to 'type' should be set. + 2: optional TShutdownParams shutdown_params +} + // HiveServer2 Metadata operations (JniFrontend.hiveServer2MetadataOperation) enum TMetadataOpcode { GET_TYPE_INFO, @@ -601,6 +626,9 @@ struct TExecRequest { // profile. For example, a user can't access the runtime profile of a query // that has a view for which the user doesn't have access to the underlying tables. 12: optional bool user_has_profile_access + + // Set iff stmt_type is ADMIN_FN. + 13: optional TAdminRequest admin_request } // Parameters to FeSupport.cacheJar(). diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 5367a6e42..cf8233d14 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -851,6 +851,38 @@ struct TPublishFilterParams { struct TPublishFilterResult { } +// RemoteShutdown + +struct TRemoteShutdownParams { + // Deadline for the shutdown. After this deadline expires (starting at the time when + // this remote shutdown command is received), the Impala daemon exits immediately + // regardless of whether queries are still executing. + 1: optional i64 deadline_s +} + +// The current status of a shutdown operation. +struct TShutdownStatus { + // Milliseconds remaining in startup grace period. 0 if the period has expired. + 1: required i64 grace_remaining_ms + + // Milliseconds remaining in shutdown deadline. 0 if the deadline has expired. + 2: required i64 deadline_remaining_ms + + // Number of fragment instances still executing. + 3: required i64 finstances_executing + + // Number of client requests still registered with the Impala server that is being shut + // down. + 4: required i64 client_requests_registered +} + +struct TRemoteShutdownResult { + // Success or failure of the operation. + 1: required Status.TStatus status + + // If status is OK, additional info about the shutdown status + 2: required TShutdownStatus shutdown_status +} service ImpalaInternalService { // Called by coord to start asynchronous execution of a query's fragment instances in @@ -875,4 +907,7 @@ service ImpalaInternalService { // Called by the coordinator to deliver global runtime filters to fragments for // application at plan nodes. TPublishFilterResult PublishFilter(1:TPublishFilterParams params); + + // Called to initiate shutdown of this backend. + TRemoteShutdownResult RemoteShutdown(1:TRemoteShutdownParams params); } diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift index 1c821705d..bce0f65fa 100644 --- a/common/thrift/StatestoreService.thrift +++ b/common/thrift/StatestoreService.thrift @@ -74,6 +74,10 @@ struct TBackendDescriptor { // The process memory limit of this backend (in bytes). 8: required i64 proc_mem_limit; + + // True if fragment instances should not be scheduled on this daemon because the + // daemon has been quiescing, e.g. if it shutting down. + 9: required bool is_quiescing; } // Description of a single entry in a topic diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index d1dd5a3bf..6236f105b 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -100,7 +100,8 @@ enum TStmtType { DML, // Data modification e.g. INSERT EXPLAIN, LOAD, // Statement type for LOAD commands - SET + SET, + ADMIN_FN // Admin function, e.g. ": shutdown()". } // Level of verboseness for "explain" output. diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 7775ef688..c64f6684c 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -363,6 +363,7 @@ error_codes = ( ("SCRATCH_READ_VERIFY_FAILED", 118, "Error reading $0 bytes from scratch file '$1' " "on backend $2 at offset $3: verification of read data failed."), + ("SERVER_SHUTTING_DOWN", 119, "Server is being shut down: $0."), ) import sys diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index e33459190..4b6006a67 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -537,6 +537,9 @@ nonterminal ShowFunctionsStmt show_functions_stmt; nonterminal DropFunctionStmt drop_function_stmt; nonterminal TFunctionCategory opt_function_category; +// Admin statements. +nonterminal AdminFnStmt admin_fn_stmt; + precedence left KW_OR; precedence left KW_AND; precedence right KW_NOT, NOT; @@ -671,6 +674,8 @@ stmt ::= {: RESULT = revoke_privilege; :} | comment_on_stmt:comment_on {: RESULT = comment_on; :} + | admin_fn_stmt:shutdown + {: RESULT = shutdown; :} | stmt:s SEMICOLON {: RESULT = s; :} ; @@ -2513,6 +2518,14 @@ set_stmt ::= {: RESULT = new SetStmt(null, null, false); :} ; +// Top-level function call, e.g. ": shutdown()", used for admin commands, etc. +admin_fn_stmt ::= + COLON ident_or_default:fn_name LPAREN RPAREN + {: RESULT = new AdminFnStmt(fn_name, Collections.emptyList()); :} + | COLON ident_or_default:fn_name LPAREN expr_list:params RPAREN + {: RESULT = new AdminFnStmt(fn_name, params); :} + ; + select_list ::= select_list_item:item {: diff --git a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java new file mode 100644 index 000000000..2f2eb2e75 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.List; + +import org.apache.impala.authorization.PrivilegeRequestBuilder; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.InternalException; +import org.apache.impala.common.Pair; +import org.apache.impala.thrift.TAdminRequest; +import org.apache.impala.thrift.TAdminRequestType; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TShutdownParams; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Represents an administrative function call, e.g. ": shutdown('hostname:123')". + * + * This "admin statement" framework provides a way to expand the set of supported admin + * statements without modifying the SQL grammar. For now, the only supported function is + * shutdown(), so the logic in here is not generic. + */ +public class AdminFnStmt extends StatementBase { + // Name of the function. Validated during analysis. + private final String fnName_; + + // Arguments to the function. Always non-null. + private final List params_; + + // Parameters for the shutdown() command. + // Address of the backend to shut down, If 'backend_' is null, that means the current + // server. If 'backend_.port' is 0, we assume the backend has the same port as this + // impalad. + private TNetworkAddress backend_; + // Deadline in seconds. -1 if no deadline specified. + private long deadlineSecs_; + + public AdminFnStmt(String fnName, List params) { + this.fnName_ = fnName; + this.params_ = params; + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append(":").append(fnName_).append("("); + List paramsSql = Lists.newArrayList(); + for (Expr param: params_) paramsSql.add(param.toSql()); + sb.append(Joiner.on(", ").join(paramsSql)); + sb.append(")"); + return sb.toString(); + } + + public TAdminRequest toThrift() throws InternalException { + TAdminRequest result = new TAdminRequest(); + result.type = TAdminRequestType.SHUTDOWN; + result.shutdown_params = new TShutdownParams(); + if (backend_ != null) result.shutdown_params.setBackend(backend_); + if (deadlineSecs_ != -1) result.shutdown_params.setDeadline_s(deadlineSecs_); + return result; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + for (Expr param : params_) param.analyze(analyzer); + // Only shutdown is supported. + if (fnName_.toLowerCase().equals("shutdown")) { + analyzeShutdown(analyzer); + } else { + throw new AnalysisException("Unknown admin function: " + fnName_); + } + } + + /** + * Supports optionally specifying the backend and the deadline: either shutdown(), + * shutdown('host:port'), shutdown(deadline), shutdown('host:port', deadline). + */ + private void analyzeShutdown(Analyzer analyzer) throws AnalysisException { + if (analyzer.getAuthzConfig().isEnabled()) { + // Only admins (i.e. user with ALL privilege on server) can execute admin functions. + String authzServer = analyzer.getAuthzConfig().getServerName(); + Preconditions.checkNotNull(authzServer); + analyzer.registerPrivReq( + new PrivilegeRequestBuilder().onServer(authzServer).all().toRequest()); + } + + // TODO: this parsing and type checking logic is specific to the command, similar to + // handling of other top-level commands. If we add a lot more of these functions we + // could consider making it generic, similar to handling of normal function calls. + Pair args = getShutdownArgs(); + Expr backendExpr = args.first; + Expr deadlineExpr = args.second; + backend_ = null; + deadlineSecs_ = -1; + if (backendExpr != null) { + if (!(backendExpr instanceof StringLiteral)) { + throw new AnalysisException( + "Invalid backend, must be a string literal: " + backendExpr.toSql()); + } + backend_ = parseBackendAddress(((StringLiteral) backendExpr).getValue()); + } + if (deadlineExpr != null) { + deadlineSecs_ = deadlineExpr.evalToNonNegativeInteger(analyzer, "deadline"); + } + } + + // Return a pair of the backend and deadline arguments, null if not present. + private Pair getShutdownArgs() throws AnalysisException { + if (params_.size() == 0) { + return Pair.create(null, null); + } else if (params_.size() == 1) { + if (params_.get(0).getType().isStringType()) { + return Pair.create(params_.get(0), null); + } else { + return Pair.create(null, params_.get(0)); + } + } else if (params_.size() == 2) { + return Pair.create(params_.get(0), params_.get(1)); + } else { + throw new AnalysisException("Shutdown takes 0, 1 or 2 arguments: " + toSql()); + } + } + + // Parse the backend and optional port from 'backend'. Port is set to 0 if not set in + // the string. + private TNetworkAddress parseBackendAddress(String backend) throws AnalysisException { + TNetworkAddress result = new TNetworkAddress(); + // Extract host and port from backend string. + String[] toks = backend.trim().split(":"); + if (toks.length == 0 || toks.length > 2) { + throw new AnalysisException("Invalid backend address: " + backend); + } + result.hostname = toks[0]; + result.port = 0; + if (toks.length == 2) { + try { + result.port = Integer.parseInt(toks[1]); + } catch (NumberFormatException nfe) { + throw new AnalysisException( + "Invalid port number in backend address: " + backend); + } + } + return result; + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java index 2f51c71f5..a10f5f0bb 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java @@ -132,6 +132,7 @@ public class AnalysisContext { return stmt_ instanceof ShowCreateFunctionStmt; } public boolean isShowFilesStmt() { return stmt_ instanceof ShowFilesStmt; } + public boolean isAdminFnStmt() { return stmt_ instanceof AdminFnStmt; } public boolean isDescribeDbStmt() { return stmt_ instanceof DescribeDbStmt; } public boolean isDescribeTableStmt() { return stmt_ instanceof DescribeTableStmt; } public boolean isResetMetadataStmt() { return stmt_ instanceof ResetMetadataStmt; } @@ -366,6 +367,11 @@ public class AnalysisContext { return (AlterDbStmt) stmt_; } + public AdminFnStmt getAdminFnStmt() { + Preconditions.checkState(isAdminFnStmt()); + return (AdminFnStmt) stmt_; + } + public StatementBase getStmt() { return stmt_; } public Analyzer getAnalyzer() { return analyzer_; } public Set getAccessEvents() { return analyzer_.getAccessEvents(); } diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java index 77797bcce..55453b544 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Expr.java +++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java @@ -33,8 +33,11 @@ import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.InternalException; import org.apache.impala.common.TreeNode; import org.apache.impala.rewrite.ExprRewriter; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; import org.apache.impala.thrift.TFunction; @@ -1504,4 +1507,63 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl }; return Joiner.on(",").join(Iterables.transform(exprs, toSql)); } + + /** + * Analyzes and evaluates expression to an integral value, returned as a long. + * Throws if the expression cannot be evaluated or if the value evaluates to null. + * The 'name' parameter is used in exception messages, e.g. "LIMIT expression + * evaluates to NULL". + */ + public long evalToInteger(Analyzer analyzer, String name) throws AnalysisException { + // Check for slotrefs and subqueries before analysis so we can provide a more + // helpful error message. + if (contains(SlotRef.class) || contains(Subquery.class)) { + throw new AnalysisException(name + " expression must be a constant expression: " + + toSql()); + } + analyze(analyzer); + if (!isConstant()) { + throw new AnalysisException(name + " expression must be a constant expression: " + + toSql()); + } + if (!getType().isIntegerType()) { + throw new AnalysisException(name + " expression must be an integer type but is '" + + getType() + "': " + toSql()); + } + TColumnValue val = null; + try { + val = FeSupport.EvalExprWithoutRow(this, analyzer.getQueryCtx()); + } catch (InternalException e) { + throw new AnalysisException("Failed to evaluate expr: " + toSql(), e); + } + long value; + if (val.isSetLong_val()) { + value = val.getLong_val(); + } else if (val.isSetInt_val()) { + value = val.getInt_val(); + } else if (val.isSetShort_val()) { + value = val.getShort_val(); + } else if (val.isSetByte_val()) { + value = val.getByte_val(); + } else { + throw new AnalysisException(name + " expression evaluates to NULL: " + toSql()); + } + return value; + } + + /** + * Analyzes and evaluates expression to a non-negative integral value, returned as a + * long. Throws if the expression cannot be evaluated, if the value evaluates to null, + * or if the result is negative. The 'name' parameter is used in exception messages, + * e.g. "LIMIT expression evaluates to NULL". + */ + public long evalToNonNegativeInteger(Analyzer analyzer, String name) + throws AnalysisException { + long value = evalToInteger(analyzer, name); + if (value < 0) { + throw new AnalysisException(name + " must be a non-negative integer: " + + toSql() + " = " + value); + } + return value; + } } diff --git a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java index ab5dac119..73758a2e9 100644 --- a/fe/src/main/java/org/apache/impala/analysis/LimitElement.java +++ b/fe/src/main/java/org/apache/impala/analysis/LimitElement.java @@ -18,9 +18,6 @@ package org.apache.impala.analysis; import org.apache.impala.common.AnalysisException; -import org.apache.impala.common.InternalException; -import org.apache.impala.service.FeSupport; -import org.apache.impala.thrift.TColumnValue; import com.google.common.base.Preconditions; @@ -103,63 +100,14 @@ class LimitElement { public void analyze(Analyzer analyzer) throws AnalysisException { isAnalyzed_ = true; if (limitExpr_ != null) { - limit_ = evalIntegerExpr(analyzer, limitExpr_, "LIMIT"); + limit_ = limitExpr_.evalToNonNegativeInteger(analyzer, "LIMIT"); } if (limit_ == 0) analyzer.setHasEmptyResultSet(); if (offsetExpr_ != null) { - offset_ = evalIntegerExpr(analyzer, offsetExpr_, "OFFSET"); + offset_ = offsetExpr_.evalToNonNegativeInteger(analyzer, "OFFSET"); } } - /** - * Analyzes and evaluates expression to a non-zero integral value, returned as a long. - * Throws if the expression cannot be evaluated, if the value evaluates to null, or if - * the result is negative. The 'name' parameter is used in exception messages, e.g. - * "LIMIT expression evaluates to NULL". - */ - private static long evalIntegerExpr(Analyzer analyzer, Expr expr, String name) - throws AnalysisException { - // Check for slotrefs and subqueries before analysis so we can provide a more - // helpful error message. - if (expr.contains(SlotRef.class) || expr.contains(Subquery.class)) { - throw new AnalysisException(name + " expression must be a constant expression: " + - expr.toSql()); - } - expr.analyze(analyzer); - if (!expr.isConstant()) { - throw new AnalysisException(name + " expression must be a constant expression: " + - expr.toSql()); - } - if (!expr.getType().isIntegerType()) { - throw new AnalysisException(name + " expression must be an integer type but is '" + - expr.getType() + "': " + expr.toSql()); - } - TColumnValue val = null; - try { - val = FeSupport.EvalExprWithoutRow(expr, analyzer.getQueryCtx()); - } catch (InternalException e) { - throw new AnalysisException("Failed to evaluate expr: " + expr.toSql(), e); - } - long value; - if (val.isSetLong_val()) { - value = val.getLong_val(); - } else if (val.isSetInt_val()) { - value = val.getInt_val(); - } else if (val.isSetShort_val()) { - value = val.getShort_val(); - } else if (val.isSetByte_val()) { - value = val.getByte_val(); - } else { - throw new AnalysisException(name + " expression evaluates to NULL: " + - expr.toSql()); - } - if (value < 0) { - throw new AnalysisException(name + " must be a non-negative integer: " + - expr.toSql() + " = " + value); - } - return value; - } - @Override public LimitElement clone() { return new LimitElement(this); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 0ae504437..8b9d34de1 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -98,6 +98,7 @@ import org.apache.impala.planner.HdfsScanNode; import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.Planner; import org.apache.impala.planner.ScanNode; +import org.apache.impala.thrift.TAdminRequest; import org.apache.impala.thrift.TAlterDbParams; import org.apache.impala.thrift.TCatalogOpRequest; import org.apache.impala.thrift.TCatalogOpType; @@ -132,6 +133,7 @@ import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TResultSetMetadata; import org.apache.impala.thrift.TShowFilesParams; import org.apache.impala.thrift.TShowStatsOp; +import org.apache.impala.thrift.TShutdownParams; import org.apache.impala.thrift.TStmtType; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; @@ -1085,6 +1087,12 @@ public class Frontend { new TColumn("level", Type.STRING.toThrift())))); result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); return result; + } else if (analysisResult.isAdminFnStmt()) { + result.stmt_type = TStmtType.ADMIN_FN; + result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( + new TColumn("summary", Type.STRING.toThrift())))); + result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift()); + return result; } // If unset, set MT_DOP to 0 to simplify the rest of the code. if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 520b88f18..fe7e352c5 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -3861,4 +3861,54 @@ public class AnalyzeStmtsTest extends AnalyzerTest { fnName.analyze(dummyAnalyzer, false); assertFalse(fnName.isBuiltin()); } + + @Test + public void TestAdminFns() throws ImpalaException { + AnalyzesOk(": shutdown()"); + AnalyzesOk(":sHuTdoWn()"); + AnalyzesOk(": SHUTDOWN()"); + AnalyzesOk(": sHuTdoWn('hostname')"); + AnalyzesOk(": sHuTdoWn(\"hostname\")"); + AnalyzesOk(": sHuTdoWn(\"hostname:1234\")"); + AnalyzesOk(": shutdown(10)"); + AnalyzesOk(": shutdown('hostname', 10)"); + AnalyzesOk(": shutdown('hostname:11', 10)"); + AnalyzesOk(": shutdown('hostname:11', 10 * 60)"); + AnalyzesOk(": shutdown(10 * 60)"); + AnalyzesOk(": shutdown(0)"); + + // Unknown admin functions. + AnalysisError(": foobar()", "Unknown admin function: foobar"); + AnalysisError(": 1a()", "Unknown admin function: 1a"); + AnalysisError(": foobar(1,2,3)", "Unknown admin function: foobar"); + + // Invalid number of shutdown params. + AnalysisError(": shutdown('a', 'b', 'c', 'd')", + "Shutdown takes 0, 1 or 2 arguments: :shutdown('a', 'b', 'c', 'd')"); + AnalysisError(": shutdown(1, 2, 3)", + "Shutdown takes 0, 1 or 2 arguments: :shutdown(1, 2, 3)"); + + // Invalid type of shutdown params. + AnalysisError(": shutdown(a)", + "Could not resolve column/field reference: 'a'"); + AnalysisError(": shutdown(1, 2)", + "Invalid backend, must be a string literal: 1"); + AnalysisError(": shutdown(concat('host:', '1234'), 2)", + "Invalid backend, must be a string literal: concat('host:', '1234')"); + AnalysisError(": shutdown('backend:1234', '...')", + "deadline expression must be an integer type but is 'STRING': '...'"); + AnalysisError(": shutdown(true)", + "deadline expression must be an integer type but is 'BOOLEAN': TRUE"); + + // Invalid host/port. + AnalysisError(": shutdown('foo:bar')", + "Invalid port number in backend address: foo:bar"); + AnalysisError(": shutdown('foo:bar:1234')", + "Invalid backend address: foo:bar:1234"); + + // Invalid deadline value. + AnalysisError(": shutdown(-1)", "deadline must be a non-negative integer: -1 = -1"); + AnalysisError(": shutdown(1.234)", + "deadline expression must be an integer type but is 'DECIMAL(4,3)': 1.234"); + } } diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java index 6d301d9cf..b8d2d1edc 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java @@ -2583,6 +2583,19 @@ public class AuthorizationStmtTest extends FrontendTestBase { } } + @Test + public void testShutdown() throws ImpalaException { + // Requires ALL privilege on server. + authorize(": shutdown()") + .ok(onServer(TPrivilegeLevel.ALL)) + .error(accessError("server")) + .error(accessError("server"), onServer(TPrivilegeLevel.REFRESH)) + .error(accessError("server"), onServer(TPrivilegeLevel.SELECT)) + .error(accessError("server"), onDatabase("functional", TPrivilegeLevel.ALL)) + .error(accessError("server"), + onTable("functional", "alltypes", TPrivilegeLevel.ALL)); + } + // Convert TDescribeResult to list of strings. private static List resultToStringList(TDescribeResult result) { List list = new ArrayList<>(); diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index be97d6736..5ad2876a4 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -3838,4 +3838,39 @@ public class ParserTest extends FrontendTestBase { ParserError(String.format("ALTER %s SET OWNER", type)); } } + + public void TestAdminFns() { + // Any combination of whitespace is ok. + ParsesOk(":foobar()"); + ParsesOk(": foobar()"); + ParsesOk(":\tfoobar()"); + ParsesOk(" :\tfoobar()"); + ParsesOk("\n:foobar()"); + ParsesOk("\n:foobar(123)"); + ParsesOk("\n:foobar(123, 456)"); + ParsesOk("\n:foobar('foo', 'bar')"); + ParsesOk("\n:foobar('foo', 'bar', 1, -1, 1234, 99, false)"); + + // Any identifier is supported. + ParsesOk(": 1a()"); + + // Must be prefixed with colon. + ParserError("foobar()"); + ParserError(" foobar()"); + + // Non-identifiers not supported. + ParserError(": 1()"); + ParserError(": 'string'()"); + ParserError(": a.b()"); + + // Must be single function with parens. Cannot have multiple statements. + ParserError(": shutdown"); + ParserError(": shutdown foo"); + ParserError(": shutdown() other()"); + ParserError(": shutdown(); other()"); + ParserError(": shutdown(), other()"); + ParserError(": shutdown() :other()"); + ParserError(": shutdown() :other()"); + ParserError(": shutdown('hostA'); :shutdown('hostB');"); + } } diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index 72093643b..dcdd81460 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.fail; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.testutil.TestUtils; -import org.junit.Ignore; import org.junit.Test; import com.google.common.base.Preconditions; @@ -66,6 +65,14 @@ public class ToSqlTest extends FrontendTestBase { } } + /** + * Helper for the common case when the string should be identical after a roundtrip + * through the parser. + */ + private void testToSql(String query) { + testToSql(query, query); + } + private void testToSql(String query, String expected) { testToSql(query, System.getProperty("user.name"), expected); } @@ -1428,9 +1435,8 @@ public class ToSqlTest extends FrontendTestBase { */ @Test public void testInvalidate() { - testToSql("INVALIDATE METADATA", "INVALIDATE METADATA"); - testToSql("INVALIDATE METADATA functional.alltypes", - "INVALIDATE METADATA functional.alltypes"); + testToSql("INVALIDATE METADATA"); + testToSql("INVALIDATE METADATA functional.alltypes"); } /** @@ -1438,9 +1444,19 @@ public class ToSqlTest extends FrontendTestBase { */ @Test public void testRefresh() { - testToSql("REFRESH functional.alltypes", "REFRESH functional.alltypes"); - testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)", - "REFRESH functional.alltypes PARTITION (year=2009, month=1)"); - testToSql("REFRESH FUNCTIONS functional", "REFRESH FUNCTIONS functional"); + testToSql("REFRESH functional.alltypes"); + testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)"); + testToSql("REFRESH FUNCTIONS functional"); + } + + /** + * Test admin functions are output correctly. + */ + @Test + public void testAdminFn() { + testToSql(":shutdown()"); + testToSql(":shutdown('hostname')"); + testToSql(":shutdown('hostname', 1000)"); + testToSql(":shutdown(1000)"); } } diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index f04b4b94a..f25c8ed1d 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -161,6 +161,9 @@ class Process(object): def get_pid(self): """Gets the PID of the process. Returns None if the PID cannot be determined""" LOG.info("Attempting to find PID for %s" % ' '.join(self.cmd)) + return self.__get_pid() + + def __get_pid(self): for pid in psutil.get_pid_list(): try: process = psutil.Process(pid) @@ -196,10 +199,16 @@ class Process(object): def restart(self): """Kills and restarts the process""" self.kill() - # Wait for a bit so the ports will be released. - sleep(1) + self.wait_for_exit() self.start() + def wait_for_exit(self): + """Wait until the process exits (or return immediately if it already has exited.""" + LOG.info('Waiting for exit: {0} (PID: {1})'.format( + ' '.join(self.cmd), self.get_pid())) + while self.__get_pid() is not None: + sleep(0.01) + def __str__(self): return "Command: %s PID: %s" % (self.cmd, self.get_pid()) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index f23c44870..0ad4496fe 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -172,12 +172,16 @@ class ImpaladService(BaseImpalaService): self.be_port = be_port self.hs2_port = hs2_port - def get_num_known_live_backends(self, timeout=30, interval=1): + def get_num_known_live_backends(self, timeout=30, interval=1, + include_shutting_down=True): LOG.info("Getting num_known_live_backends from %s:%s" % (self.hostname, self.webserver_port)) result = json.loads(self.read_debug_webpage('backends?json', timeout, interval)) - num = len(result['backends']) - return None if num is None else int(num) + count = 0 + for backend in result['backends']: + if include_shutting_down or not backend['is_quiescing']: + count += 1 + return count def get_query_locations(self): # Returns a dictionary of the format @@ -207,12 +211,14 @@ class ImpaladService(BaseImpalaService): (num_in_flight_queries, expected_val)) return False - def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1): + def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1, + include_shutting_down=True): start_time = time() while (time() - start_time < timeout): value = None try: - value = self.get_num_known_live_backends(timeout=timeout, interval=interval) + value = self.get_num_known_live_backends(timeout=timeout, interval=interval, + include_shutting_down=include_shutting_down) except Exception, e: LOG.error(e) if value == expected_value: @@ -250,7 +256,9 @@ class ImpaladService(BaseImpalaService): if query_state == target_state: return sleep(interval) - assert target_state == query_state, 'Did not reach query state in time' + assert target_state == query_state, \ + 'Did not reach query state in time target={0} actual={1}'.format( + target_state, query_state) return def wait_for_query_status(self, client, query_id, expected_content, diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index f2bb7fbd4..9869a2f9a 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -15,15 +15,24 @@ # specific language governing permissions and limitations # under the License. +import logging import pytest +import psutil +import re +import socket +import time -from impala.error import HiveServer2Error from tests.common.environ import specific_build_type_timeout from time import sleep +from impala.error import HiveServer2Error +from TCLIService import TCLIService + +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session - +LOG = logging.getLogger(__name__) class TestRestart(CustomClusterTestSuite): @classmethod @@ -75,3 +84,281 @@ class TestRestart(CustomClusterTestSuite): sleep(3) client.close() + + +def parse_shutdown_result(result): + """Parse the shutdown result string and return the strings (grace left, + deadline left, fragment instances, queries registered).""" + assert len(result.data) == 1 + summary = result.data[0] + match = re.match(r'startup grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), ' + + r'fragment instances: ([0-9]*), queries registered: ([0-9]*)', summary) + assert match is not None, summary + return match.groups() + + +class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite): + IDLE_SHUTDOWN_GRACE_PERIOD_S = 1 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--shutdown_grace_period_s={grace_period} \ + --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, + hostname=socket.gethostname())) + def test_shutdown_idle(self): + """Test that idle impalads shut down in a timely manner after the startup grace period + elapses.""" + impalad1 = psutil.Process(self.cluster.impalads[0].get_pid()) + impalad2 = psutil.Process(self.cluster.impalads[1].get_pid()) + impalad3 = psutil.Process(self.cluster.impalads[2].get_pid()) + + # Test that a failed shut down from a bogus host or port fails gracefully. + ex = self.execute_query_expect_failure(self.client, + ":shutdown('e6c00ca5cd67b567eb96c6ecfb26f05')") + assert "Couldn't open transport" in str(ex) + ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:100000')") + assert "Couldn't open transport" in str(ex) + # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully. + ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:21050')") + assert ("RPC Error: Client for localhost:21050 hit an unexpected exception: " + + "Invalid method name: 'RemoteShutdown'") in str(ex) + # Test RPC error handling with debug action. + ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:22001')", + query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) + assert 'Debug Action: CRS_SHUTDOWN_RPC:FAIL' in str(ex) + + # Test remote shutdown. + LOG.info("Start remote shutdown {0}".format(time.time())) + self.execute_query_expect_success(self.client, ":shutdown('localhost:22001')", + query_options={}) + + # Remote shutdown does not require statestore. + self.cluster.statestored.kill() + self.cluster.statestored.wait_for_exit() + self.execute_query_expect_success(self.client, ":shutdown('localhost:22002')", + query_options={}) + + # Test local shutdown, which should succeed even with injected RPC error. + LOG.info("Start local shutdown {0}".format(time.time())) + self.execute_query_expect_success(self.client, + ":shutdown('{0}:22000')".format(socket.gethostname()), + query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) + + # Make sure that the impala daemons exit after the startup grace period plus a 10 + # second margin of error. + start_time = time.time() + LOG.info("Waiting for impalads to exit {0}".format(start_time)) + impalad1.wait() + LOG.info("First impalad exited {0}".format(time.time())) + impalad2.wait() + LOG.info("Second impalad exited {0}".format(time.time())) + impalad3.wait() + LOG.info("Third impalad exited {0}".format(time.time())) + shutdown_duration = time.time() - start_time + assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 + + EXEC_SHUTDOWN_GRACE_PERIOD_S = 5 + EXEC_SHUTDOWN_DEADLINE_S = 10 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--shutdown_grace_period_s={grace_period} \ + --shutdown_deadline_s={deadline} \ + --hostname={hostname}".format(grace_period=EXEC_SHUTDOWN_GRACE_PERIOD_S, + deadline=EXEC_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname())) + def test_shutdown_executor(self): + """Test that shuts down and then restarts an executor. This should not disrupt any + queries that start after the shutdown or complete before the shutdown time limit.""" + # Add sleeps to make sure that the query takes a couple of seconds to execute on the + # executors. + QUERY = "select count(*) from functional_parquet.alltypes where sleep(1) = bool_col" + SLOW_QUERY = "select count(*) from tpch_parquet.lineitem where sleep(1) = l_orderkey" + SHUTDOWN_EXEC2 = ": shutdown('localhost:22001')" + + # Run this query before shutdown and make sure that it executes successfully on + # all executors through the startup grace period without disruption. + before_shutdown_handle = self.__exec_and_wait_until_running(QUERY) + + # Shut down and wait for the shutdown state to propagate through statestore. + result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2) + assert parse_shutdown_result(result) == ( + "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S), + "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "1", "0") + + # Check that the status is reflected on the debug page. + web_json = self.cluster.impalads[1].service.get_debug_webpage_json("") + assert web_json.get('is_quiescing', None) is True, web_json + assert 'shutdown_status' in web_json, web_json + + self.impalad_test_service.wait_for_num_known_live_backends(2, + timeout=self.EXEC_SHUTDOWN_GRACE_PERIOD_S + 5, interval=0.2, + include_shutting_down=False) + + # Run another query, which shouldn't get scheduled on the new executor. We'll let + # this query continue running through the full shutdown and restart cycle. + after_shutdown_handle = self.__exec_and_wait_until_running(QUERY) + + # Finish executing the first query before the backend exits. + assert self.__fetch_and_get_num_backends(QUERY, before_shutdown_handle) == 3 + + # Wait for the impalad to exit, then start it back up and run another query, which + # should be scheduled on it again. + self.cluster.impalads[1].wait_for_exit() + self.cluster.impalads[1].start() + self.impalad_test_service.wait_for_num_known_live_backends( + 3, timeout=30, interval=0.2, include_shutting_down=False) + after_restart_handle = self.__exec_and_wait_until_running(QUERY) + + # The query started while the backend was shut down should not run on that backend. + assert self.__fetch_and_get_num_backends(QUERY, after_shutdown_handle) == 2 + assert self.__fetch_and_get_num_backends(QUERY, after_restart_handle) == 3 + + # Test that a query will fail when the executor shuts down after the limit. + deadline_expiry_handle = self.__exec_and_wait_until_running(SLOW_QUERY) + result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2) + assert parse_shutdown_result(result) == ( + "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S), + "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "1", "0") + self.cluster.impalads[1].wait_for_exit() + self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle) + + # Test that we can reduce the deadline after setting it to a high value. + # Run a query that will fail as a result of the reduced deadline. + deadline_expiry_handle = self.__exec_and_wait_until_running(SLOW_QUERY) + SHUTDOWN_EXEC3 = ": shutdown('localhost:22002', {0})" + VERY_HIGH_DEADLINE = 5000 + HIGH_DEADLINE = 1000 + LOW_DEADLINE = 5 + result = self.execute_query_expect_success( + self.client, SHUTDOWN_EXEC3.format(HIGH_DEADLINE)) + grace, deadline, _, _ = parse_shutdown_result(result) + assert grace == "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S) + assert deadline == "{0}m{1}s".format(HIGH_DEADLINE / 60, HIGH_DEADLINE % 60) + + result = self.execute_query_expect_success( + self.client, SHUTDOWN_EXEC3.format(VERY_HIGH_DEADLINE)) + _, deadline, _, _ = parse_shutdown_result(result) + LOG.info("Deadline is {0}".format(deadline)) + min_string, sec_string = re.match("([0-9]*)m([0-9]*)s", deadline).groups() + assert int(min_string) * 60 + int(sec_string) <= HIGH_DEADLINE, \ + "Cannot increase deadline " + deadline + + result = self.execute_query_expect_success( + self.client, SHUTDOWN_EXEC3.format(LOW_DEADLINE)) + _, deadline, finstances, _ = parse_shutdown_result(result) + assert deadline == "{0}s000ms".format(LOW_DEADLINE) + assert int(finstances) > 0, "Slow query should still be running." + self.cluster.impalads[2].wait_for_exit() + self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle) + + COORD_SHUTDOWN_GRACE_PERIOD_S = 5 + COORD_SHUTDOWN_DEADLINE_S = 120 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--shutdown_grace_period_s={grace_period} \ + --shutdown_deadline_s={deadline} \ + --hostname={hostname} \ + --default_query_options=NUM_SCANNER_THREADS=1".format( + grace_period=COORD_SHUTDOWN_GRACE_PERIOD_S, + deadline=COORD_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname())) + @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, + close_session=False) + def test_shutdown_coordinator(self): + """Test that shuts down the coordinator. Running queries should finish but new + requests should be rejected.""" + # Start a query running. This should complete successfully and keep the coordinator + # up until it finishes. We set NUM_SCANNER_THREADS=1 above to make the runtime more + # predictable. + SLOW_QUERY = """select * from tpch_parquet.lineitem where sleep(1) < l_orderkey""" + SHUTDOWN = ": shutdown()" + SHUTDOWN_ERROR_PREFIX = 'Server is being shut down:' + + before_shutdown_handle = self.__exec_and_wait_until_running(SLOW_QUERY) + before_shutdown_hs2_handle = self.execute_statement(SLOW_QUERY).operationHandle + + # Shut down the coordinator. Operations that start after this point should fail. + result = self.execute_query_expect_success(self.client, SHUTDOWN) + grace, deadline, _, registered = parse_shutdown_result(result) + assert grace == "{0}s000ms".format(self.COORD_SHUTDOWN_GRACE_PERIOD_S) + assert deadline == "{0}m".format(self.COORD_SHUTDOWN_DEADLINE_S / 60), "4" + assert registered == "3" + + # Expect that the beeswax shutdown error occurs when calling fn() + def expect_beeswax_shutdown_error(fn): + try: + fn() + except ImpalaBeeswaxException, e: + assert SHUTDOWN_ERROR_PREFIX in str(e) + expect_beeswax_shutdown_error(lambda: self.client.execute("select 1")) + expect_beeswax_shutdown_error(lambda: self.client.execute_async("select 1")) + + # Test that the HS2 shutdown error occurs for various HS2 operations. + self.execute_statement("select 1", None, TCLIService.TStatusCode.ERROR_STATUS, + SHUTDOWN_ERROR_PREFIX) + + def check_hs2_shutdown_error(hs2_response): + HS2TestSuite.check_response(hs2_response, TCLIService.TStatusCode.ERROR_STATUS, + SHUTDOWN_ERROR_PREFIX) + check_hs2_shutdown_error(self.hs2_client.OpenSession(TCLIService.TOpenSessionReq())) + check_hs2_shutdown_error(self.hs2_client.GetInfo(TCLIService.TGetInfoReq( + self.session_handle, TCLIService.TGetInfoType.CLI_MAX_DRIVER_CONNECTIONS))) + check_hs2_shutdown_error(self.hs2_client.GetTypeInfo( + TCLIService.TGetTypeInfoReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetCatalogs( + TCLIService.TGetCatalogsReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetSchemas( + TCLIService.TGetSchemasReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetTables( + TCLIService.TGetTablesReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetTableTypes( + TCLIService.TGetTableTypesReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetColumns( + TCLIService.TGetColumnsReq(self.session_handle))) + check_hs2_shutdown_error(self.hs2_client.GetFunctions( + TCLIService.TGetFunctionsReq(self.session_handle, functionName=""))) + + # Operations on running HS2 query still work. + self.fetch_until(before_shutdown_hs2_handle, + TCLIService.TFetchOrientation.FETCH_NEXT, 10) + HS2TestSuite.check_response(self.hs2_client.CancelOperation( + TCLIService.TCancelOperationReq(before_shutdown_hs2_handle))) + HS2TestSuite.check_response(self.hs2_client.CloseOperation( + TCLIService.TCloseOperationReq(before_shutdown_hs2_handle))) + + # Make sure that the beeswax query is still executing, then close it to allow the + # coordinator to shut down. + self.impalad_test_service.wait_for_query_state(self.client, before_shutdown_handle, + self.client.QUERY_STATES['FINISHED'], timeout=20) + self.client.close_query(before_shutdown_handle) + self.cluster.impalads[0].wait_for_exit() + + def __exec_and_wait_until_running(self, query, timeout=20): + """Execute 'query' with self.client and wait until it is in the RUNNING state. + 'timeout' controls how long we will wait""" + # Fix number of scanner threads to make runtime more deterministic. + handle = self.execute_query_async(query, {'num_scanner_threads': 1}) + self.impalad_test_service.wait_for_query_state(self.client, handle, + self.client.QUERY_STATES['RUNNING'], timeout=20) + return handle + + def __fetch_and_get_num_backends(self, query, handle): + """Fetch the results of 'query' from the beeswax handle 'handle', close the + query and return the number of backends obtained from the profile.""" + self.impalad_test_service.wait_for_query_state(self.client, handle, + self.client.QUERY_STATES['FINISHED'], timeout=20) + self.client.fetch(query, handle) + profile = self.client.get_runtime_profile(handle) + self.client.close_query(handle) + backends_match = re.search("NumBackends: ([0-9]*)", profile) + assert backends_match is not None, profile + return int(backends_match.group(1)) + + def __check_deadline_expired(self, query, handle): + """Check that the query with 'handle' fails because of a backend hitting the + deadline and shutting down.""" + try: + self.client.fetch(query, handle) + assert False, "Expected query to fail" + except Exception, e: + assert 'Failed due to unreachable impalad(s)' in str(e) diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py index 81c50331b..83b2ff44e 100644 --- a/tests/hs2/hs2_test_suite.py +++ b/tests/hs2/hs2_test_suite.py @@ -28,7 +28,8 @@ from time import sleep, time def needs_session(protocol_version= TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6, - conf_overlay=None): + conf_overlay=None, + close_session=True): def session_decorator(fn): """Decorator that establishes a session and sets self.session_handle. When the test is finished, the session is closed. @@ -47,9 +48,10 @@ def needs_session(protocol_version= try: fn(self) finally: - close_session_req = TCLIService.TCloseSessionReq() - close_session_req.sessionHandle = resp.sessionHandle - HS2TestSuite.check_response(self.hs2_client.CloseSession(close_session_req)) + if close_session: + close_session_req = TCLIService.TCloseSessionReq() + close_session_req.sessionHandle = resp.sessionHandle + HS2TestSuite.check_response(self.hs2_client.CloseSession(close_session_req)) self.session_handle = None return add_session @@ -62,13 +64,10 @@ def operation_id_to_query_id(operation_id): return "%s:%s" % (lo, hi) class HS2TestSuite(ImpalaTestSuite): - TEST_DB = 'hs2_db' - HS2_V6_COLUMN_TYPES = ['boolVal', 'stringVal', 'byteVal', 'i16Val', 'i32Val', 'i64Val', 'doubleVal', 'binaryVal'] def setup(self): - self.cleanup_db(self.TEST_DB) host, port = IMPALAD_HS2_HOST_PORT.split(":") self.socket = TSocket(host, port) self.transport = TBufferedTransport(self.socket) @@ -77,7 +76,6 @@ class HS2TestSuite(ImpalaTestSuite): self.hs2_client = ImpalaHiveServer2Service.Client(self.protocol) def teardown(self): - self.cleanup_db(self.TEST_DB) if self.socket: self.socket.close() diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py index b2d5084fc..4466f913e 100644 --- a/tests/hs2/test_fetch_first.py +++ b/tests/hs2/test_fetch_first.py @@ -26,6 +26,7 @@ from TCLIService import TCLIService from tests.common.impala_cluster import ImpalaCluster class TestFetchFirst(HS2TestSuite): + TEST_DB = 'fetch_first_db' IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size"; def __test_invalid_result_caching(self, sql_stmt): @@ -379,6 +380,7 @@ class TestFetchFirst(HS2TestSuite): fragment, so the query mem tracker is initialized differently. (IMPALA-963) """ + self.cleanup_db(self.TEST_DB) self.client.set_configuration({'sync_ddl': 1}) self.client.execute("create database %s" % self.TEST_DB) self.client.execute("create table %s.orderclone like tpch.orders" % self.TEST_DB) diff --git a/www/backends.tmpl b/www/backends.tmpl index 20cfa11d3..fd62ac242 100644 --- a/www/backends.tmpl +++ b/www/backends.tmpl @@ -26,6 +26,7 @@ under the License. Address Coordinator Executor + Quiescing @@ -34,6 +35,7 @@ under the License. {{address}} {{is_coordinator}} {{is_executor}} + {{is_quiescing}} {{/backends}} diff --git a/www/root.tmpl b/www/root.tmpl index 40d1cf39a..7d156c42d 100644 --- a/www/root.tmpl +++ b/www/root.tmpl @@ -18,8 +18,16 @@ under the License. --> {{! Template for / }} {{>www/common-header.tmpl}} + {{?is_quiescing}} + + {{/is_quiescing}} + {{?impala_server_mode}} -

Impala Server Mode: {{?is_coordinator}}Coordinator{{/is_coordinator}} +

Impala Server Mode: {{?is_coordinator}}Coordinator{{/is_coordinator}} {{?is_executor}}Executor{{/is_executor}}

{{/impala_server_mode}}