mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Previously, running ALTER TABLE <table> CONVERT TO ICEBERG on an Iceberg table produced an error. This patch fixes that, so the statement will do nothing when called on an Iceberg table and return with 'Table has already been migrated.' message. This is achieved by adding a new flag to StatementBase to signal when a statement ends up NO_OP, if that's true, the new TStmtType::NO_OP will be set as TExecRequest's type and noop_result can be used to set result from Frontend-side. Tests: * extended fe and e2e tests Change-Id: I41ecbfd350d38e4e3fd7b813a4fc27211d828f73 Reviewed-on: http://gerrit.cloudera.org:8080/23699 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Peter Rozsa <prozsa@cloudera.com>
2780 lines
114 KiB
C++
2780 lines
114 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "service/client-request-state.h"
|
|
|
|
#include <optional>
|
|
|
|
#include <boost/algorithm/string/join.hpp>
|
|
#include <boost/algorithm/string/predicate.hpp>
|
|
#include <boost/algorithm/string/replace.hpp>
|
|
#include <limits>
|
|
#include <gutil/strings/substitute.h>
|
|
#include <rapidjson/rapidjson.h>
|
|
#include <rapidjson/stringbuffer.h>
|
|
#include <rapidjson/writer.h>
|
|
|
|
#include "catalog/catalog-service-client-wrapper.h"
|
|
#include "common/status.h"
|
|
#include "exprs/timezone_db.h"
|
|
#include "gen-cpp/Types_types.h"
|
|
#include "kudu/rpc/rpc_controller.h"
|
|
#include "observe/otel.h"
|
|
#include "observe/span-manager.h"
|
|
#include "rpc/rpc-mgr.inline.h"
|
|
#include "runtime/coordinator.h"
|
|
#include "runtime/exec-env.h"
|
|
#include "runtime/mem-tracker.h"
|
|
#include "runtime/query-driver.h"
|
|
#include "runtime/row-batch.h"
|
|
#include "runtime/runtime-state.h"
|
|
#include "runtime/timestamp-value.h"
|
|
#include "runtime/timestamp-value.inline.h"
|
|
#include "scheduling/admission-control-client.h"
|
|
#include "scheduling/cluster-membership-mgr.h"
|
|
#include "scheduling/scheduler.h"
|
|
#include "service/frontend.h"
|
|
#include "service/impala-server.h"
|
|
#include "service/query-options.h"
|
|
#include "service/query-result-set.h"
|
|
#include "util/auth-util.h"
|
|
#include "util/debug-util.h"
|
|
#include "util/impalad-metrics.h"
|
|
#include "util/lineage-util.h"
|
|
#include "util/pretty-printer.h"
|
|
#include "util/redactor.h"
|
|
#include "util/runtime-profile.h"
|
|
#include "util/runtime-profile-counters.h"
|
|
#include "util/time.h"
|
|
#include "util/uid-util.h"
|
|
|
|
#include "gen-cpp/CatalogService_types.h"
|
|
#include "gen-cpp/control_service.pb.h"
|
|
#include "gen-cpp/control_service.proxy.h"
|
|
|
|
#include <thrift/Thrift.h>
|
|
|
|
#include "common/names.h"
|
|
#include "control-service.h"
|
|
|
|
using boost::algorithm::iequals;
|
|
using boost::algorithm::join;
|
|
using boost::algorithm::replace_all_copy;
|
|
using kudu::rpc::RpcController;
|
|
using namespace apache::hive::service::cli::thrift;
|
|
using namespace apache::thrift;
|
|
using namespace beeswax;
|
|
using namespace strings;
|
|
|
|
DECLARE_bool(abort_on_failed_audit_event);
|
|
DECLARE_bool(abort_on_failed_lineage_event);
|
|
DECLARE_int32(krpc_port);
|
|
DECLARE_int64(max_result_cache_size);
|
|
DECLARE_bool(otel_trace_enabled);
|
|
DECLARE_bool(use_local_catalog);
|
|
|
|
namespace impala {
|
|
|
|
PROFILE_DEFINE_TIMER(ClientFetchLockWaitTimer, UNSTABLE,
|
|
"Cumulative time client fetch requests waiting for locks.");
|
|
PROFILE_DEFINE_SUMMARY_STATS_TIMER(GetInFlightProfileTimeStats, UNSTABLE,
|
|
"Summary stats of the time dumping profiles when the query is still in-flight.");
|
|
|
|
// Keys into the info string map of the runtime profile referring to specific
|
|
// items used by CM for monitoring purposes.
|
|
static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
|
|
static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
|
|
static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
|
|
static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
|
|
|
|
static const string QUERY_STATUS_KEY = "Query Status";
|
|
static const string RETRY_STATUS_KEY = "Retry Status";
|
|
|
|
const TExecRequest ClientRequestState::unknown_exec_request_;
|
|
|
|
ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* frontend,
|
|
ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session,
|
|
QueryDriver* query_driver)
|
|
: query_ctx_(query_ctx),
|
|
last_active_time_ms_(numeric_limits<int64_t>::max()),
|
|
child_query_executor_(new ChildQueryExecutor),
|
|
session_(move(session)),
|
|
coord_exec_called_(false),
|
|
// Profile is assigned name w/ id after planning
|
|
profile_(RuntimeProfile::Create(&profile_pool_, "Query", false)),
|
|
frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend", false)),
|
|
server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer", false)),
|
|
summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary", false)),
|
|
frontend_(frontend),
|
|
parent_server_(server),
|
|
start_time_us_(UnixMicros()),
|
|
fetch_rows_timeout_us_(MICROS_PER_MILLI * query_options().fetch_rows_timeout_ms),
|
|
parent_driver_(query_driver) {
|
|
|
|
if (FLAGS_otel_trace_enabled && should_otel_trace_query(sql_stmt(),
|
|
query_ctx.session.session_type)) {
|
|
// initialize OpenTelemetry for this query
|
|
VLOG(2) << "Initializing OpenTelemetry for query " << PrintId(query_id());
|
|
otel_span_manager_ = build_span_manager(this);
|
|
otel_span_manager_->StartChildSpanInit();
|
|
}
|
|
|
|
bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND;
|
|
// "Impala Backend Timeline" was specifically chosen to exploit the lexicographical
|
|
// ordering defined by the underlying std::map holding the timelines displayed in
|
|
// the web UI. This helps ensure that "Frontend Timeline" is displayed before
|
|
// "Impala Backend Timeline".
|
|
query_events_ = summary_profile_->AddEventSequence(
|
|
is_external_fe ? "Impala Backend Timeline" : "Query Timeline");
|
|
query_events_->Start();
|
|
profile_->AddChild(summary_profile_);
|
|
|
|
#ifndef NDEBUG
|
|
profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
|
|
"DEBUG build of Impala. Use RELEASE builds to measure query performance.");
|
|
#endif
|
|
row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer");
|
|
num_rows_fetched_counter_ = ADD_COUNTER(server_profile_, "NumRowsFetched", TUnit::UNIT);
|
|
row_materialization_rate_ =
|
|
server_profile_->AddDerivedCounter("RowMaterializationRate", TUnit::UNIT_PER_SECOND,
|
|
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, num_rows_fetched_counter_,
|
|
row_materialization_timer_));
|
|
num_rows_fetched_from_cache_counter_ =
|
|
ADD_COUNTER(server_profile_, "NumRowsFetchedFromCache", TUnit::UNIT);
|
|
client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
|
|
client_wait_time_stats_ =
|
|
ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats");
|
|
rpc_read_timer_ = ADD_TIMER(server_profile_, "RPCReadTimer");
|
|
rpc_write_timer_ = ADD_TIMER(server_profile_, "RPCWriteTimer");
|
|
rpc_count_ = ADD_COUNTER(server_profile_, "RPCCount", TUnit::UNIT);
|
|
get_inflight_profile_time_stats_ =
|
|
PROFILE_GetInFlightProfileTimeStats.Instantiate(server_profile_);
|
|
client_fetch_lock_wait_timer_ =
|
|
PROFILE_ClientFetchLockWaitTimer.Instantiate(server_profile_);
|
|
|
|
profile_->set_name("Query (id=" + PrintId(query_id()) + ")");
|
|
summary_profile_->AddInfoString("Session ID", PrintId(session_id()));
|
|
summary_profile_->AddInfoString("Session Type", PrintValue(session_type()));
|
|
if (session_type() == TSessionType::HIVESERVER2 ||
|
|
session_type() == TSessionType::EXTERNAL_FRONTEND) {
|
|
summary_profile_->AddInfoString(
|
|
"HiveServer2 Protocol Version", Substitute("V$0", 1 + session_->hs2_version));
|
|
}
|
|
// Certain API clients expect Start Time and End Time to be date-time strings
|
|
// of nanosecond precision, so we explicitly specify the precision here.
|
|
summary_profile_->AddInfoString("Start Time", ToStringFromUnixMicros(start_time_us(),
|
|
TimePrecision::Nanosecond));
|
|
summary_profile_->AddInfoString("End Time", "");
|
|
summary_profile_->AddInfoString("Duration", "");
|
|
summary_profile_->AddInfoString("Query Type", "N/A");
|
|
summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState()));
|
|
summary_profile_->AddInfoString(
|
|
"Impala Query State", ExecStateToString(exec_state()));
|
|
summary_profile_->AddInfoString("Query Status", "OK");
|
|
summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
|
|
summary_profile_->AddInfoString("User", effective_user());
|
|
summary_profile_->AddInfoString("Connected User", connected_user());
|
|
summary_profile_->AddInfoString("Delegated User", do_as_user());
|
|
summary_profile_->AddInfoString("Network Address",
|
|
TNetworkAddressToString(session_->network_address));
|
|
if (!session_->http_origin.empty()) {
|
|
/// If using hs2-http protocol, this is the origin of the session
|
|
/// as recorded in the X-Forwarded-For http message header.
|
|
summary_profile_->AddInfoString("Http Origin", session_->http_origin);
|
|
}
|
|
summary_profile_->AddInfoString("Default Db", default_db());
|
|
summary_profile_->AddInfoStringRedacted(
|
|
"Sql Statement", query_ctx_.client_request.stmt);
|
|
summary_profile_->AddInfoString("Coordinator",
|
|
TNetworkAddressToString(ExecEnv::GetInstance()->configured_backend_address()));
|
|
|
|
summary_profile_->AddChild(frontend_profile_);
|
|
|
|
AdmissionControlClient::Create(query_ctx_, &admission_control_client_);
|
|
}
|
|
|
|
ClientRequestState::~ClientRequestState() {
|
|
DCHECK(wait_thread_.get() == NULL) << "Finalize() needs to be called!";
|
|
DCHECK(!track_rpcs_); // Should get set to false in Finalize()
|
|
DCHECK(pending_rpcs_.empty()); // Should get cleared in Finalize()
|
|
UnRegisterRemainingRPCs(); // Avoid memory leaks if Finalize() didn't get called
|
|
}
|
|
|
|
Status ClientRequestState::SetResultCache(QueryResultSet* cache,
|
|
int64_t max_size) {
|
|
lock_guard<mutex> l(lock_);
|
|
DCHECK(result_cache_ == NULL);
|
|
result_cache_.reset(cache);
|
|
if (max_size > FLAGS_max_result_cache_size) {
|
|
return Status(
|
|
Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.",
|
|
max_size, FLAGS_max_result_cache_size));
|
|
}
|
|
result_cache_max_size_ = max_size;
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::SetRemoteSubmitTime(int64_t remote_submit_time) {
|
|
int64_t ack_submit_time = min(MonotonicStopWatch::Now(), remote_submit_time);
|
|
if (ack_submit_time < remote_submit_time) {
|
|
VLOG_QUERY << "Ignoring remote_submit_time (" << remote_submit_time
|
|
<< " ns) that is more than coordinator time (" << ack_submit_time
|
|
<< " ns) for query id=" << PrintId(query_id());
|
|
}
|
|
query_events_->Start(ack_submit_time);
|
|
}
|
|
|
|
void ClientRequestState::SetFrontendProfile(const TExecRequest& exec_request) {
|
|
// Should we defer creating and adding the child until here? probably.
|
|
TRuntimeProfileTree prof_tree;
|
|
prof_tree.nodes.emplace_back(std::move(exec_request.profile));
|
|
for (auto& child : exec_request.profile_children) {
|
|
prof_tree.nodes.emplace_back(std::move(child));
|
|
}
|
|
prof_tree.nodes.at(0).num_children = prof_tree.nodes.size() - 1;
|
|
frontend_profile_->Update(prof_tree, false);
|
|
}
|
|
|
|
void ClientRequestState::AddBlacklistedExecutorAddress(const NetworkAddressPB& addr) {
|
|
lock_guard<mutex> l(lock_);
|
|
if (!WasRetried()) blacklisted_executor_addresses_.emplace(addr);
|
|
}
|
|
|
|
void ClientRequestState::SetBlacklistedExecutorAddresses(
|
|
std::unordered_set<NetworkAddressPB>& executor_addresses) {
|
|
DCHECK(blacklisted_executor_addresses_.empty());
|
|
if (!executor_addresses.empty()) {
|
|
blacklisted_executor_addresses_.insert(
|
|
executor_addresses.begin(), executor_addresses.end());
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::Exec() {
|
|
const TExecRequest& exec_req = exec_request();
|
|
profile_->AddChild(server_profile_);
|
|
summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type()));
|
|
summary_profile_->AddInfoString("Query Options (set by configuration)",
|
|
DebugQueryOptions(query_ctx_.client_request.query_options));
|
|
summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
|
|
DebugQueryOptions(exec_req.query_options));
|
|
if (!exec_req.tables.empty()) {
|
|
summary_profile_->AddInfoString("Tables Queried", PrintTableList(exec_req.tables));
|
|
}
|
|
if (!exec_req.select_columns.empty()) {
|
|
summary_profile_->AddInfoString("Select Columns", join(exec_req.select_columns, ","));
|
|
}
|
|
if (!exec_req.where_columns.empty()) {
|
|
summary_profile_->AddInfoString("Where Columns", join(exec_req.where_columns, ","));
|
|
}
|
|
if (!exec_req.join_columns.empty()) {
|
|
summary_profile_->AddInfoString("Join Columns", join(exec_req.join_columns, ","));
|
|
}
|
|
if (!exec_req.aggregate_columns.empty()) {
|
|
summary_profile_->AddInfoString(
|
|
"Aggregate Columns", join(exec_req.aggregate_columns, ","));
|
|
}
|
|
if (!exec_req.orderby_columns.empty()) {
|
|
summary_profile_->AddInfoString(
|
|
"OrderBy Columns", join(exec_req.orderby_columns, ","));
|
|
}
|
|
if (query_ctx_.__isset.overridden_mt_dop_value) {
|
|
DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop);
|
|
summary_profile_->AddInfoString("MT_DOP limited by admission control",
|
|
Substitute("Requested MT_DOP=$0 reduced to MT_DOP=$1",
|
|
query_ctx_.overridden_mt_dop_value,
|
|
query_ctx_.client_request.query_options.mt_dop));
|
|
}
|
|
|
|
// Don't start executing the query if Cancel() was called between planning and Exec().
|
|
RETURN_IF_CANCELLED(this);
|
|
MarkActive();
|
|
|
|
switch (exec_req.stmt_type) {
|
|
case TStmtType::QUERY:
|
|
case TStmtType::DML:
|
|
DCHECK(exec_req.__isset.query_exec_request);
|
|
RETURN_IF_ERROR(
|
|
ExecQueryOrDmlRequest(exec_req.query_exec_request, true /*async*/));
|
|
break;
|
|
case TStmtType::EXPLAIN: {
|
|
request_result_set_.reset(new vector<TResultRow>(
|
|
exec_req.explain_result.results));
|
|
break;
|
|
}
|
|
case TStmtType::TESTCASE: {
|
|
DCHECK(exec_req.__isset.testcase_data_path);
|
|
SetResultSet(vector<string>(1, exec_req.testcase_data_path));
|
|
break;
|
|
}
|
|
case TStmtType::DDL: {
|
|
DCHECK(exec_req.__isset.catalog_op_request);
|
|
if (otel_trace_query()) {
|
|
otel_span_manager_->StartChildSpanQueryExecution();
|
|
}
|
|
LOG_AND_RETURN_IF_ERROR(ExecDdlRequest());
|
|
break;
|
|
}
|
|
case TStmtType::LOAD: {
|
|
DCHECK(exec_req.__isset.load_data_request);
|
|
LOG_AND_RETURN_IF_ERROR(ExecLoadDataRequest());
|
|
break;
|
|
}
|
|
case TStmtType::SET: {
|
|
DCHECK(exec_req.__isset.set_query_option_request);
|
|
lock_guard<mutex> l(session_->lock);
|
|
if (exec_req.set_query_option_request.__isset.key) {
|
|
// "SET key=value" updates the session query options.
|
|
DCHECK(exec_req.set_query_option_request.__isset.value);
|
|
const auto& key = exec_req.set_query_option_request.key;
|
|
const auto& value = exec_req.set_query_option_request.value;
|
|
RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options,
|
|
&session_->set_query_options_mask));
|
|
SetResultSet({}, {}, {});
|
|
if (iequals(key, "idle_session_timeout")) {
|
|
// IMPALA-2248: Session timeout is set as a query option
|
|
session_->last_accessed_ms = UnixMillis(); // do not expire session immediately
|
|
session_->UpdateTimeout();
|
|
VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout="
|
|
<< PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S);
|
|
}
|
|
} else if (exec_req.set_query_option_request.__isset.query_option_type
|
|
&& exec_req.set_query_option_request.query_option_type
|
|
== TQueryOptionType::UNSET_ALL) {
|
|
// "UNSET ALL"
|
|
RETURN_IF_ERROR(ResetAllQueryOptions(
|
|
&session_->set_query_options, &session_->set_query_options_mask));
|
|
SetResultSet({}, {}, {});
|
|
} else {
|
|
// "SET" or "SET ALL"
|
|
bool is_set_all =
|
|
exec_req.set_query_option_request.__isset.query_option_type
|
|
&& exec_req.set_query_option_request.query_option_type
|
|
== TQueryOptionType::SET_ALL;
|
|
PopulateResultForSet(is_set_all);
|
|
}
|
|
break;
|
|
}
|
|
case TStmtType::ADMIN_FN:
|
|
if (exec_req.admin_request.type == TAdminRequestType::SHUTDOWN) {
|
|
RETURN_IF_ERROR(ExecShutdownRequest());
|
|
} else if (exec_req.admin_request.type == TAdminRequestType::EVENT_PROCESSOR) {
|
|
RETURN_IF_ERROR(ExecEventProcessorCmd());
|
|
} else {
|
|
DCHECK(false);
|
|
}
|
|
break;
|
|
case TStmtType::CONVERT:
|
|
DCHECK(exec_req.__isset.convert_table_request);
|
|
LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest());
|
|
break;
|
|
case TStmtType::UNKNOWN:
|
|
DCHECK(false);
|
|
return Status("Exec request uninitialized during execution");
|
|
case TStmtType::KILL:
|
|
DCHECK(exec_req.__isset.kill_query_request);
|
|
LOG_AND_RETURN_IF_ERROR(ExecKillQueryRequest());
|
|
break;
|
|
case TStmtType::NO_OP:
|
|
if (exec_req.__isset.noop_result) {
|
|
SetResultSet(exec_req.noop_result);
|
|
}
|
|
return Status::OK();
|
|
break;
|
|
default:
|
|
return Status(Substitute("Unknown exec request stmt type: $0", exec_req.stmt_type));
|
|
}
|
|
|
|
if (async_exec_thread_.get() == nullptr) {
|
|
UpdateNonErrorExecState(ExecState::RUNNING);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::PopulateResultForSet(bool is_set_all) {
|
|
map<string, string> config;
|
|
TQueryOptionsToMap(query_options(), &config);
|
|
vector<string> keys, values, levels;
|
|
map<string, string>::const_iterator itr = config.begin();
|
|
for (; itr != config.end(); ++itr) {
|
|
const auto opt_level_id =
|
|
parent_server_->query_option_levels_[itr->first];
|
|
if (!is_set_all && (opt_level_id == TQueryOptionLevel::DEVELOPMENT ||
|
|
opt_level_id == TQueryOptionLevel::DEPRECATED ||
|
|
opt_level_id == TQueryOptionLevel::REMOVED)) {
|
|
continue;
|
|
}
|
|
keys.push_back(itr->first);
|
|
values.push_back(itr->second);
|
|
const auto opt_level = _TQueryOptionLevel_VALUES_TO_NAMES.find(opt_level_id);
|
|
DCHECK(opt_level !=_TQueryOptionLevel_VALUES_TO_NAMES.end());
|
|
levels.push_back(opt_level->second);
|
|
}
|
|
SetResultSet(keys, values, levels);
|
|
}
|
|
|
|
Status ClientRequestState::ExecLocalCatalogOp(
|
|
const TCatalogOpRequest& catalog_op) {
|
|
switch (catalog_op.op_type) {
|
|
case TCatalogOpType::USE: {
|
|
lock_guard<mutex> l(session_->lock);
|
|
session_->database = exec_request().catalog_op_request.use_db_params.db;
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_TABLES:
|
|
case TCatalogOpType::SHOW_VIEWS: {
|
|
const TShowTablesParams* params = &catalog_op.show_tables_params;
|
|
// A NULL pattern means match all tables of the specified table types. However,
|
|
// Thrift string types can't be NULL in C++, so we have to test if it's set rather
|
|
// than just blindly using the value.
|
|
const string* table_name_pattern =
|
|
params->__isset.show_pattern ? &(params->show_pattern) : nullptr;
|
|
TGetTablesResult table_names;
|
|
const set<TImpalaTableType::type>& table_types = params->table_types;
|
|
RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name_pattern,
|
|
&query_ctx_.session, table_types, &table_names));
|
|
SetResultSet(table_names.tables);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_METADATA_TABLES: {
|
|
const TShowTablesParams* params = &catalog_op.show_tables_params;
|
|
// A NULL pattern means match all tables of the specified table types. However,
|
|
// Thrift string types can't be NULL in C++, so we have to test if it's set rather
|
|
// than just blindly using the value.
|
|
const string* metadata_table_name_pattern =
|
|
params->__isset.show_pattern ? &(params->show_pattern) : nullptr;
|
|
DCHECK(params->__isset.tbl);
|
|
const string& table_name = params->tbl;
|
|
TGetTablesResult table_names;
|
|
RETURN_IF_ERROR(frontend_->GetMetadataTableNames(params->db, table_name,
|
|
metadata_table_name_pattern, &query_ctx_.session, &table_names));
|
|
SetResultSet(table_names.tables);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_DBS: {
|
|
const TShowDbsParams* params = &catalog_op.show_dbs_params;
|
|
TGetDbsResult dbs;
|
|
const string* db_pattern =
|
|
params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
|
|
RETURN_IF_ERROR(
|
|
frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs));
|
|
vector<string> names, comments;
|
|
names.reserve(dbs.dbs.size());
|
|
comments.reserve(dbs.dbs.size());
|
|
for (const TDatabase& db: dbs.dbs) {
|
|
names.push_back(db.db_name);
|
|
comments.push_back(db.metastore_db.description);
|
|
}
|
|
SetResultSet(names, comments);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_DATA_SRCS: {
|
|
const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
|
|
TGetDataSrcsResult result;
|
|
const string* pattern =
|
|
params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
|
|
RETURN_IF_ERROR(
|
|
frontend_->GetDataSrcMetadata(pattern, &result));
|
|
SetResultSet(result.data_src_names, result.locations, result.class_names,
|
|
result.api_versions);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_STATS: {
|
|
const TShowStatsParams& params = catalog_op.show_stats_params;
|
|
TResultSet response;
|
|
RETURN_IF_ERROR(frontend_->GetStats(params, &response));
|
|
// Set the result set and its schema from the response.
|
|
request_result_set_.reset(new vector<TResultRow>(response.rows));
|
|
result_metadata_ = response.schema;
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_FUNCTIONS: {
|
|
const TShowFunctionsParams* params = &catalog_op.show_fns_params;
|
|
TGetFunctionsResult functions;
|
|
const string* fn_pattern =
|
|
params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
|
|
RETURN_IF_ERROR(frontend_->GetFunctions(
|
|
params->category, params->db, fn_pattern, &query_ctx_.session, &functions));
|
|
SetResultSet(functions.fn_ret_types, functions.fn_signatures,
|
|
functions.fn_binary_types, functions.fn_persistence);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_ROLES: {
|
|
const TShowRolesParams& params = catalog_op.show_roles_params;
|
|
// If we have made it here, the user has privileges to execute this operation.
|
|
// Return the results.
|
|
TShowRolesResult result;
|
|
RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
|
|
SetResultSet(result.role_names);
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_GRANT_PRINCIPAL: {
|
|
const TShowGrantPrincipalParams& params = catalog_op.show_grant_principal_params;
|
|
TResultSet response;
|
|
RETURN_IF_ERROR(frontend_->GetPrincipalPrivileges(params, &response));
|
|
// Set the result set and its schema from the response.
|
|
request_result_set_.reset(new vector<TResultRow>(response.rows));
|
|
result_metadata_ = response.schema;
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::DESCRIBE_HISTORY: {
|
|
// This operation is supported for Iceberg tables only.
|
|
const TDescribeHistoryParams& params = catalog_op.describe_history_params;
|
|
TGetTableHistoryResult result;
|
|
RETURN_IF_ERROR(frontend_->GetTableHistory(params, &result));
|
|
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->resize(result.result.size());
|
|
for (int i = 0; i < result.result.size(); ++i) {
|
|
const TGetTableHistoryResultItem item = result.result[i];
|
|
TResultRow &result_row = (*request_result_set_.get())[i];
|
|
result_row.__isset.colVals = true;
|
|
result_row.colVals.resize(4);
|
|
const Timezone* local_tz = TimezoneDatabase::FindTimezone(
|
|
query_options().timezone);
|
|
TimestampValue tv = TimestampValue::FromUnixTimeMicros(
|
|
item.creation_time * 1000, local_tz);
|
|
result_row.colVals[0].__set_string_val(tv.ToString());
|
|
result_row.colVals[1].__set_string_val(std::to_string(item.snapshot_id));
|
|
result_row.colVals[2].__set_string_val(
|
|
(item.__isset.parent_id) ? std::to_string(item.parent_id) : "NULL");
|
|
result_row.colVals[3].__set_string_val(
|
|
(item.is_current_ancestor) ? "TRUE" : "FALSE");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::DESCRIBE_DB: {
|
|
TDescribeResult response;
|
|
RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params,
|
|
&response));
|
|
// Set the result set
|
|
request_result_set_.reset(new vector<TResultRow>(response.results));
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::DESCRIBE_TABLE: {
|
|
TDescribeResult response;
|
|
const TDescribeTableParams& params = catalog_op.describe_table_params;
|
|
RETURN_IF_ERROR(frontend_->DescribeTable(params, query_ctx_.session, &response));
|
|
// Set the result set
|
|
request_result_set_.reset(new vector<TResultRow>(response.results));
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_CREATE_TABLE: {
|
|
string response;
|
|
bool with_stats = false;
|
|
if (catalog_op.__isset.show_create_table_with_stats) {
|
|
with_stats = catalog_op.show_create_table_with_stats;
|
|
}
|
|
int32_t partition_limit = query_options().show_create_table_partition_limit; // Default value
|
|
if (catalog_op.__isset.show_create_table_partition_limit) {
|
|
partition_limit = catalog_op.show_create_table_partition_limit;
|
|
}
|
|
RETURN_IF_ERROR(frontend_->ShowCreateTable(
|
|
catalog_op.show_create_table_params, with_stats, partition_limit, &response));
|
|
SetResultSet(vector<string>(1, response));
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_CREATE_FUNCTION: {
|
|
string response;
|
|
RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params,
|
|
&response));
|
|
SetResultSet(vector<string>(1, response));
|
|
return Status::OK();
|
|
}
|
|
case TCatalogOpType::SHOW_FILES: {
|
|
TResultSet response;
|
|
RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
|
|
// Set the result set and its schema from the response.
|
|
request_result_set_.reset(new vector<TResultRow>(response.rows));
|
|
result_metadata_ = response.schema;
|
|
return Status::OK();
|
|
}
|
|
default: {
|
|
stringstream ss;
|
|
ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
|
|
return Status(ss.str());
|
|
}
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::ExecQueryOrDmlRequest(
|
|
const TQueryExecRequest& query_exec_request, bool isAsync) {
|
|
// we always need at least one plan fragment
|
|
DCHECK(query_exec_request.plan_exec_info.size() > 0);
|
|
|
|
if (query_exec_request.__isset.query_plan) {
|
|
stringstream plan_ss;
|
|
// Add some delimiters to make it clearer where the plan
|
|
// begins and the profile ends
|
|
plan_ss << "\n----------------\n"
|
|
<< query_exec_request.query_plan
|
|
<< "----------------";
|
|
summary_profile_->AddInfoStringRedacted("Plan", plan_ss.str());
|
|
}
|
|
// Add info strings consumed by CM: Estimated mem and tables missing stats.
|
|
if (query_exec_request.__isset.per_host_mem_estimate) {
|
|
stringstream ss;
|
|
ss << query_exec_request.per_host_mem_estimate;
|
|
summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str());
|
|
}
|
|
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
|
|
query_exec_request.query_ctx.__isset.tables_missing_stats &&
|
|
!query_exec_request.query_ctx.tables_missing_stats.empty()) {
|
|
summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY,
|
|
PrintTableList(query_exec_request.query_ctx.tables_missing_stats));
|
|
}
|
|
|
|
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
|
|
query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
|
|
!query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
|
|
summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY,
|
|
PrintTableList(query_exec_request.query_ctx.tables_with_corrupt_stats));
|
|
}
|
|
|
|
if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
|
|
!query_exec_request.query_ctx.tables_missing_diskids.empty()) {
|
|
summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY,
|
|
PrintTableList(query_exec_request.query_ctx.tables_missing_diskids));
|
|
}
|
|
|
|
// Don't start executing the query if Cancel() was called concurrently with Exec().
|
|
RETURN_IF_CANCELLED(this);
|
|
if (isAsync) {
|
|
// Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because
|
|
// the query should be in the PENDING state before the Exec RPC returns.
|
|
UpdateNonErrorExecState(ExecState::PENDING);
|
|
RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
|
|
&ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_,
|
|
true));
|
|
} else {
|
|
// Update query_status_ as necessary.
|
|
FinishExecQueryOrDmlRequest();
|
|
return query_status_;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::FinishExecQueryOrDmlRequest() {
|
|
const TExecRequest& exec_req = exec_request();
|
|
DCHECK(exec_req.__isset.query_exec_request);
|
|
UniqueIdPB query_id_pb;
|
|
TUniqueIdToUniqueIdPB(query_id(), &query_id_pb);
|
|
|
|
if (otel_trace_query() && !IsCTAS()) {
|
|
otel_span_manager_->StartChildSpanAdmissionControl();
|
|
}
|
|
|
|
const TQueryExecRequest* query_exec_request;
|
|
TQueryExecRequest req;
|
|
if (ExecEnv::GetInstance()->AdmissionServiceEnabled()) {
|
|
req = exec_req.query_exec_request;
|
|
if (req.__isset.query_plan) {
|
|
// Use the swap() to ensure the string's memory is deallocated.
|
|
// Using clear() sets the size to 0 but may not release the capacity.
|
|
std::string().swap(req.query_plan);
|
|
req.__isset.query_plan = false;
|
|
}
|
|
if (req.__isset.lineage_graph) {
|
|
req.lineage_graph = TLineageGraph();
|
|
req.__isset.lineage_graph = false;
|
|
}
|
|
if (req.__isset.result_set_metadata) {
|
|
req.result_set_metadata = TResultSetMetadata();
|
|
req.__isset.result_set_metadata = false;
|
|
}
|
|
if (req.__isset.finalize_params) {
|
|
req.finalize_params = TFinalizeParams();
|
|
req.__isset.finalize_params = false;
|
|
}
|
|
TClientRequest& client_req = req.query_ctx.client_request;
|
|
if (client_req.__isset.redacted_stmt) {
|
|
// Use the swap() to ensure the string's memory is deallocated.
|
|
std::string().swap(client_req.redacted_stmt);
|
|
client_req.__isset.redacted_stmt = false;
|
|
}
|
|
query_exec_request = &req;
|
|
} else {
|
|
query_exec_request = &exec_req.query_exec_request;
|
|
}
|
|
|
|
Status admit_status = admission_control_client_->SubmitForAdmission(
|
|
{query_id_pb, ExecEnv::GetInstance()->backend_id(), *query_exec_request,
|
|
exec_req.query_options, summary_profile_, blacklisted_executor_addresses_},
|
|
query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_,
|
|
otel_span_manager_.get());
|
|
|
|
if (otel_trace_query() && !IsCTAS()) {
|
|
otel_span_manager_->EndChildSpanAdmissionControl(admit_status);
|
|
otel_span_manager_->StartChildSpanQueryExecution();
|
|
}
|
|
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
if (!UpdateQueryStatus(admit_status).ok()) return;
|
|
}
|
|
|
|
DCHECK(schedule_.get() != nullptr);
|
|
// Note that we don't need to check for cancellation between admission and query
|
|
// startup. The query was not cancelled right before being admitted and the window here
|
|
// is small enough to not require special handling. Instead we start the query and then
|
|
// cancel it through the check below if necessary.
|
|
DebugActionNoFail(exec_req.query_options, "CRS_BEFORE_COORD_STARTS");
|
|
// Register the query with the server to support cancellation. This happens after
|
|
// admission because now the set of executors is fixed and an executor failure will
|
|
// cause a query failure.
|
|
parent_server_->RegisterQueryLocations(schedule_->backend_exec_params(), query_id());
|
|
coord_.reset(new Coordinator(this, exec_req, *schedule_.get(), query_events_));
|
|
Status exec_status = coord_->Exec();
|
|
|
|
DebugActionNoFail(exec_req.query_options, "CRS_AFTER_COORD_STARTS");
|
|
|
|
// Make coordinator profile visible, even upon failure.
|
|
if (coord_->query_profile() != nullptr) profile_->AddChild(coord_->query_profile());
|
|
|
|
bool cancelled = false;
|
|
Status cancellation_status;
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
if (!UpdateQueryStatus(exec_status).ok()) return;
|
|
// Coordinator::Exec() finished successfully - it is safe to concurrently access
|
|
// 'coord_'. This thread needs to cancel the coordinator if cancellation occurred
|
|
// *before* 'coord_' was accessible to other threads. Once the lock is dropped, any
|
|
// future calls to Cancel() are responsible for calling Coordinator::Cancel(), so
|
|
// while holding the lock we need to both perform a check for cancellation and make
|
|
// the coord_ visible.
|
|
coord_exec_called_.Store(true);
|
|
cancelled = is_cancelled_;
|
|
if (cancelled) {
|
|
VLOG_QUERY << "Cancelled right after starting the coordinator query id="
|
|
<< PrintId(query_id());
|
|
discard_result(UpdateQueryStatus(Status::CANCELLED));
|
|
}
|
|
}
|
|
|
|
if (cancelled) {
|
|
coord_->Cancel();
|
|
return;
|
|
}
|
|
UpdateNonErrorExecState(ExecState::RUNNING);
|
|
}
|
|
|
|
Status ClientRequestState::ExecDdlRequestImplSync() {
|
|
if (catalog_op_type() != TCatalogOpType::DDL &&
|
|
catalog_op_type() != TCatalogOpType::RESET_METADATA) {
|
|
Status status = ExecLocalCatalogOp(exec_request().catalog_op_request);
|
|
lock_guard<mutex> l(lock_);
|
|
return UpdateQueryStatus(status);
|
|
}
|
|
|
|
if (ddl_type() == TDdlType::COMPUTE_STATS) {
|
|
const TComputeStatsParams& compute_stats_params =
|
|
exec_request().catalog_op_request.ddl_params.compute_stats_params;
|
|
RuntimeProfile* child_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Child Queries");
|
|
profile_->AddChild(child_profile);
|
|
// Add child queries for computing table and column stats.
|
|
vector<ChildQuery> child_queries;
|
|
if (compute_stats_params.__isset.tbl_stats_query) {
|
|
RuntimeProfile* profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Table Stats Query");
|
|
child_profile->AddChild(profile);
|
|
child_queries.emplace_back(compute_stats_params.tbl_stats_query, this,
|
|
parent_server_, profile, &profile_pool_);
|
|
}
|
|
if (compute_stats_params.__isset.col_stats_query) {
|
|
RuntimeProfile* profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Column Stats Query");
|
|
child_profile->AddChild(profile);
|
|
child_queries.emplace_back(compute_stats_params.col_stats_query, this,
|
|
parent_server_, profile, &profile_pool_);
|
|
}
|
|
|
|
if (child_queries.size() > 0) {
|
|
RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
|
|
} else {
|
|
SetResultSet({"No partitions selected for incremental stats update."});
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
DCHECK(false) << "Not handled sync exec ddl request.";
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::ExecDdlRequestImpl(bool exec_in_worker_thread) {
|
|
bool is_CTAS = (catalog_op_type() == TCatalogOpType::DDL
|
|
&& ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
|
|
const TExecRequest& exec_req = exec_request();
|
|
|
|
catalog_op_executor_.reset(
|
|
new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
|
|
|
|
// Indirectly check if running in thread async_exec_thread_.
|
|
if (exec_in_worker_thread) {
|
|
VLOG_QUERY << "Running in worker thread";
|
|
DCHECK(exec_state() == ExecState::PENDING);
|
|
|
|
// 1. For any non-CTAS DDLs, transition to RUNNING
|
|
// 2. For CTAS DDLs, transition to RUNNING during FinishExecQueryOrDmlRequest()
|
|
// called by ExecQueryOrDmlRequest().
|
|
if (!is_CTAS) UpdateNonErrorExecState(ExecState::RUNNING);
|
|
}
|
|
|
|
// Optionally wait with a debug action before Exec() below.
|
|
DebugActionNoFail(exec_req.query_options, "CRS_DELAY_BEFORE_CATALOG_OP_EXEC");
|
|
|
|
Status status = catalog_op_executor_->Exec(exec_req.catalog_op_request);
|
|
query_events_->MarkEvent("CatalogDdlRequest finished");
|
|
if (otel_trace_query()) {
|
|
otel_span_manager_->AddChildSpanEvent("UpdateCatalogFinished");
|
|
}
|
|
AddCatalogTimeline();
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
|
|
// If this is a CTAS request, there will usually be more work to do
|
|
// after executing the CREATE TABLE statement (the INSERT portion of the operation).
|
|
// The exception is if the user specified IF NOT EXISTS and the table already
|
|
// existed, in which case we do not execute the INSERT.
|
|
if (catalog_op_type() == TCatalogOpType::DDL &&
|
|
ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
|
|
!catalog_op_executor_->ddl_exec_response()->new_table_created) {
|
|
DCHECK(exec_req.catalog_op_request.
|
|
ddl_params.create_table_params.if_not_exists);
|
|
return;
|
|
}
|
|
|
|
// Add newly created table to catalog cache.
|
|
status = parent_server_->ProcessCatalogUpdateResult(
|
|
*catalog_op_executor_->update_catalog_result(),
|
|
exec_req.query_options.sync_ddl, query_options(), query_events_);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
|
|
if (is_CTAS) {
|
|
// At this point, the remainder of the CTAS request executes
|
|
// like a normal DML request. As with other DML requests, it will
|
|
// wait for another catalog update if any partitions were altered as a result
|
|
// of the operation.
|
|
DCHECK(exec_req.__isset.query_exec_request);
|
|
RETURN_VOID_IF_ERROR(
|
|
ExecQueryOrDmlRequest(exec_req.query_exec_request, !exec_in_worker_thread));
|
|
}
|
|
|
|
// Set the results to be reported to the client. Do this under lock to avoid races
|
|
// with ImpalaServer::GetResultSetMetadata().
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
SetResultSet(catalog_op_executor_->ddl_exec_response());
|
|
}
|
|
}
|
|
|
|
bool ClientRequestState::ShouldRunExecDdlAsync() {
|
|
// Local catalog op DDL will run synchronously.
|
|
if (catalog_op_type() != TCatalogOpType::DDL
|
|
&& catalog_op_type() != TCatalogOpType::RESET_METADATA) {
|
|
return false;
|
|
}
|
|
|
|
// The exec DDL part of compute stats will run synchronously.
|
|
if (ddl_type() == TDdlType::COMPUTE_STATS) return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
Status ClientRequestState::ExecDdlRequest() {
|
|
string op_type = catalog_op_type() == TCatalogOpType::DDL ?
|
|
PrintValue(ddl_type()) : PrintValue(catalog_op_type());
|
|
bool async_ddl = ShouldRunExecDdlAsync();
|
|
bool async_ddl_enabled = exec_request().query_options.enable_async_ddl_execution;
|
|
string exec_mode = (async_ddl && async_ddl_enabled) ? "asynchronous" : "synchronous";
|
|
|
|
summary_profile_->AddInfoString("DDL Type", op_type);
|
|
summary_profile_->AddInfoString("DDL execution mode", exec_mode);
|
|
VLOG_QUERY << "DDL exec mode=" << exec_mode;
|
|
|
|
if (!async_ddl) return ExecDdlRequestImplSync();
|
|
|
|
if (async_ddl_enabled) {
|
|
// Transition the exec state out of INITIALIZED to PENDING to make available the
|
|
// runtime profile for the DDL. Later on in ExecDdlRequestImpl(), the state
|
|
// further transitions to RUNNING.
|
|
UpdateNonErrorExecState(ExecState::PENDING);
|
|
return Thread::Create("impala-server", "async_exec_thread_",
|
|
&ClientRequestState::ExecDdlRequestImpl, this, true /*exec in a worker thread*/,
|
|
&async_exec_thread_);
|
|
} else {
|
|
ExecDdlRequestImpl(false /*exec in the same thread as the caller*/);
|
|
return query_status_;
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
|
|
const TExecRequest& exec_req = exec_request();
|
|
if (exec_in_worker_thread) {
|
|
VLOG_QUERY << "Running in worker thread";
|
|
DCHECK(exec_state() == ExecState::PENDING);
|
|
UpdateNonErrorExecState(ExecState::RUNNING);
|
|
}
|
|
DebugActionNoFail(
|
|
exec_req.query_options, "CRS_DELAY_BEFORE_LOAD_DATA");
|
|
|
|
TLoadDataResp response;
|
|
Status status = frontend_->LoadData(exec_req.load_data_request, &response);
|
|
if (exec_req.load_data_request.iceberg_tbl) {
|
|
ExecLoadIcebergDataRequestImpl(response);
|
|
}
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->push_back(response.load_summary);
|
|
|
|
// We use TUpdateCatalogRequest to refresh the table metadata so that it will
|
|
// fire an insert event just like an insert statement.
|
|
TUpdatedPartition updatedPartition;
|
|
updatedPartition.files.insert(updatedPartition.files.end(),
|
|
response.loaded_files.begin(), response.loaded_files.end());
|
|
TUpdateCatalogRequest catalog_update;
|
|
// The partition_name is an empty string for unpartitioned tables.
|
|
catalog_update.updated_partitions[response.partition_name] = updatedPartition;
|
|
|
|
catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
|
|
catalog_update.__set_header(GetCatalogServiceRequestHeader());
|
|
catalog_update.target_table = exec_req.load_data_request.table_name.table_name;
|
|
catalog_update.db_name = exec_req.load_data_request.table_name.db_name;
|
|
catalog_update.is_overwrite = exec_req.load_data_request.overwrite;
|
|
|
|
CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(),
|
|
*ExecEnv::GetInstance()->GetCatalogdAddress().get(), &status);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
|
|
TUpdateCatalogResponse resp;
|
|
status = client.DoRpc(
|
|
&CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp);
|
|
query_events_->MarkEvent("UpdateCatalog finished");
|
|
if (resp.__isset.profile) {
|
|
for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) {
|
|
summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline);
|
|
}
|
|
}
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
|
|
status = parent_server_->ProcessCatalogUpdateResult(
|
|
resp.result,
|
|
exec_req.query_options.sync_ddl, query_options(), query_events_);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status, exec_in_worker_thread));
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) {
|
|
TLoadDataReq load_data_req = exec_request().load_data_request;
|
|
RuntimeProfile* child_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Child Queries");
|
|
profile_->AddChild(child_profile);
|
|
// Add child queries for computing table and column stats.
|
|
vector<ChildQuery> child_queries;
|
|
// Prepare CREATE
|
|
RuntimeProfile* create_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Create table query");
|
|
child_profile->AddChild(create_profile);
|
|
child_queries.emplace_back(response.create_tmp_tbl_query, this, parent_server_,
|
|
create_profile, &profile_pool_);
|
|
// Prepare INSERT
|
|
RuntimeProfile* insert_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Insert query");
|
|
child_profile->AddChild(insert_profile);
|
|
child_queries.emplace_back(load_data_req.insert_into_dst_tbl_query, this,
|
|
parent_server_, insert_profile, &profile_pool_);
|
|
// Prepare DROP
|
|
RuntimeProfile* drop_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Drop table query");
|
|
child_profile->AddChild(drop_profile);
|
|
child_queries.emplace_back(load_data_req.drop_tmp_tbl_query, this,
|
|
parent_server_, drop_profile, &profile_pool_);
|
|
// Execute queries
|
|
RETURN_VOID_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
|
|
vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
|
|
Status query_status = child_query_executor_->WaitForAll(completed_queries);
|
|
if (query_status.ok()) {
|
|
const char* path = response.create_location.c_str();
|
|
string delete_err = "Load was succesful, but failed to remove staging data under '"
|
|
+ response.create_location + "', HDFS error: ";
|
|
hdfsFS hdfs_conn;
|
|
Status hdfs_ret = HdfsFsCache::instance()->GetConnection(path, &hdfs_conn);
|
|
if (!hdfs_ret.ok()) {
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + hdfs_ret.GetDetail())));
|
|
}
|
|
if (hdfsDelete(hdfs_conn, path, 1)) {
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(delete_err + strerror(errno))));
|
|
}
|
|
} else {
|
|
const char* dst_path = load_data_req.source_path.c_str();
|
|
hdfsFS hdfs_dst_conn;
|
|
string revert_err = "Failed to load data and failed to revert data movement, "
|
|
"please check source and staging directory under '" + response.create_location
|
|
+ "', Query error: " + query_status.GetDetail() + " HDFS error: ";
|
|
Status hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_dst_conn);
|
|
if (!hdfs_ret.ok()) {
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + hdfs_ret.GetDetail())));
|
|
}
|
|
for (const string& src_path : response.loaded_files) {
|
|
hdfsFS hdfs_src_conn;
|
|
hdfs_ret = HdfsFsCache::instance()->GetConnection(dst_path, &hdfs_src_conn);
|
|
if (!hdfs_ret.ok()) {
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err
|
|
+ hdfs_ret.GetDetail())));
|
|
}
|
|
if (hdfsMove(hdfs_src_conn, src_path.c_str(), hdfs_dst_conn, dst_path)) {
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(Status(revert_err + strerror(errno))));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
Status ClientRequestState::ExecLoadDataRequest() {
|
|
if (exec_request().query_options.enable_async_load_data_execution) {
|
|
// Transition the exec state out of INITIALIZED to PENDING to make available the
|
|
// runtime profile for the DDL.
|
|
UpdateNonErrorExecState(ExecState::PENDING);
|
|
return Thread::Create("impala-server", "async_exec_thread_",
|
|
&ClientRequestState::ExecLoadDataRequestImpl, this, true, &async_exec_thread_);
|
|
}
|
|
|
|
// sync exection
|
|
ExecLoadDataRequestImpl(false /* not use a worker thread */);
|
|
return query_status_;
|
|
}
|
|
|
|
Status ClientRequestState::ExecShutdownRequest() {
|
|
const TShutdownParams& request = exec_request().admin_request.shutdown_params;
|
|
bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
|
|
int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port;
|
|
// Use the local shutdown code path if the host is unspecified or if it exactly matches
|
|
// the configured host/port. This avoids the possibility of RPC errors preventing
|
|
// shutdown.
|
|
if (!request.__isset.backend
|
|
|| (request.backend.hostname == FLAGS_hostname && port == FLAGS_krpc_port)) {
|
|
ShutdownStatusPB shutdown_status;
|
|
int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1;
|
|
RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status));
|
|
SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)});
|
|
return Status::OK();
|
|
}
|
|
|
|
// KRPC relies on resolved IP address, so convert hostname.
|
|
IpAddr ip_address;
|
|
Status ip_status = HostnameToIpAddr(request.backend.hostname, &ip_address);
|
|
if (!ip_status.ok()) {
|
|
VLOG(1) << "Could not convert hostname " << request.backend.hostname
|
|
<< " to ip address, error: " << ip_status.GetDetail();
|
|
return ip_status;
|
|
}
|
|
// Find BackendId for the given remote ip address and port from cluster membership.
|
|
// The searching is not efficient, but Shutdown Requests are not called frequently.
|
|
// The BackendId is used to generate UDS address for Unix domain socket. Leave the
|
|
// Id value as 0 if it's not found in cluster membership.
|
|
// Note that UDS is only used when FLAGS_rpc_use_unix_domain_socket is set as true.
|
|
UniqueIdPB backend_id;
|
|
backend_id.set_hi(0);
|
|
backend_id.set_lo(0);
|
|
if (ExecEnv::GetInstance()->rpc_mgr()->IsKrpcUsingUDS()) {
|
|
if (ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()
|
|
== UdsAddressUniqueIdPB::BACKEND_ID) {
|
|
ClusterMembershipMgr::SnapshotPtr membership_snapshot =
|
|
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot();
|
|
DCHECK(membership_snapshot.get() != nullptr);
|
|
for (const auto& it : membership_snapshot->current_backends) {
|
|
// Compare resolved IP addresses and ports.
|
|
if (it.second.ip_address() == ip_address && it.second.address().port() == port) {
|
|
DCHECK(it.second.has_backend_id());
|
|
backend_id = it.second.backend_id();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
string krpc_error = "RemoteShutdown() RPC failed: Network error";
|
|
string krpc_error2 = "RemoteShutdown() RPC failed: Timed out";
|
|
NetworkAddressPB krpc_addr = MakeNetworkAddressPB(ip_address, port, backend_id,
|
|
ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId());
|
|
std::unique_ptr<ControlServiceProxy> proxy;
|
|
Status get_proxy_status =
|
|
ControlService::GetProxy(krpc_addr, request.backend.hostname, &proxy);
|
|
if (!get_proxy_status.ok()) {
|
|
return Status(
|
|
Substitute("Could not get Proxy to ControlService at $0 with error: $1.",
|
|
NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg()));
|
|
}
|
|
|
|
RemoteShutdownParamsPB params;
|
|
if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s);
|
|
RemoteShutdownResultPB resp;
|
|
VLOG_QUERY << "Sending Shutdown RPC to " << NetworkAddressPBToString(krpc_addr);
|
|
|
|
const int num_retries = 3;
|
|
const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
|
|
const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
|
|
Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::RemoteShutdown,
|
|
params, &resp, query_ctx_, "RemoteShutdown() RPC failed", num_retries, timeout_ms,
|
|
backoff_time_ms, "CRS_SHUTDOWN_RPC");
|
|
|
|
if (!rpc_status.ok()) {
|
|
const string& msg = rpc_status.msg().msg();
|
|
VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id())
|
|
<< " failed to send RPC to " << NetworkAddressPBToString(krpc_addr) << " :"
|
|
<< msg;
|
|
string err_string = Substitute(
|
|
"Rpc to $0 failed with error '$1'", NetworkAddressPBToString(krpc_addr), msg);
|
|
// Attempt to detect if the the failure is because of not using a KRPC port.
|
|
if (backend_port_specified &&
|
|
(msg.find(krpc_error) != string::npos ||
|
|
msg.find(krpc_error2) != string::npos)) {
|
|
// Prior to IMPALA-7985 :shutdown() used the backend port.
|
|
err_string.append(" This may be because the port specified is wrong. You may have"
|
|
" specified the backend (thrift) port which :shutdown() can no"
|
|
" longer use. Please make sure the correct KRPC port is being"
|
|
" used, or don't specify any port in the :shutdown() command.");
|
|
}
|
|
return Status(err_string);
|
|
}
|
|
Status shutdown_status(resp.status());
|
|
RETURN_IF_ERROR(shutdown_status);
|
|
SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status())});
|
|
return Status::OK();
|
|
}
|
|
|
|
Status ClientRequestState::ExecEventProcessorCmd() {
|
|
catalog_op_executor_.reset(
|
|
new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
|
|
const TEventProcessorCmdParams& params =
|
|
exec_request().admin_request.event_processor_cmd_params;
|
|
TSetEventProcessorStatusRequest request;
|
|
TSetEventProcessorStatusResponse response;
|
|
request.__set_params(params);
|
|
request.__set_header(GetCatalogServiceRequestHeader());
|
|
Status rpc_status = catalog_op_executor_->SetEventProcessorStatus(request, &response);
|
|
if (!rpc_status.ok()) {
|
|
VLOG_QUERY << "SetEventProcessorStatus failed: " << rpc_status.msg().msg();
|
|
return rpc_status;
|
|
}
|
|
SetResultSet({response.info});
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::Finalize(const Status* cause) {
|
|
if (otel_trace_query()) {
|
|
// In a non-error case, end the query execution span since it will be the active span.
|
|
if (cause == nullptr || cause->ok()) {
|
|
otel_span_manager_->EndChildSpanQueryExecution();
|
|
}
|
|
|
|
// No need to end previous child span in an error case. This function silently closes
|
|
// the active child span if there is one.
|
|
otel_span_manager_->StartChildSpanClose(cause);
|
|
}
|
|
|
|
Cancel(cause, /*wait_until_finalized=*/true);
|
|
MarkActive();
|
|
// Make sure we join on wait_thread_ before we finish (and especially before this object
|
|
// is destroyed).
|
|
int64_t block_on_wait_time_us = 0;
|
|
BlockOnWait(0, &block_on_wait_time_us);
|
|
DCHECK_EQ(block_on_wait_time_us, 0);
|
|
|
|
// Update latest observed Kudu timestamp stored in the session from the coordinator.
|
|
// Needs to take the session_ lock which must not be taken while holding lock_, so this
|
|
// must happen before taking lock_ below.
|
|
Coordinator* coordinator = GetCoordinator();
|
|
if (coordinator != nullptr) {
|
|
// This is safe to access on coord_ after Wait() has been called.
|
|
uint64_t latest_kudu_ts =
|
|
coordinator->dml_exec_state()->GetKuduLatestObservedTimestamp();
|
|
if (latest_kudu_ts > 0) {
|
|
VLOG_RPC << "Updating session (id=" << PrintId(session_id()) << ") with latest "
|
|
<< "observed Kudu timestamp: " << latest_kudu_ts;
|
|
lock_guard<mutex> session_lock(session_->lock);
|
|
session_->kudu_latest_observed_ts = std::max<uint64_t>(
|
|
session_->kudu_latest_observed_ts, latest_kudu_ts);
|
|
}
|
|
}
|
|
|
|
// If the transaction didn't get committed by this point then we should just abort it.
|
|
if (InTransaction()) {
|
|
AbortTransaction();
|
|
} else if (InKuduTransaction()) {
|
|
AbortKuduTransaction();
|
|
}
|
|
|
|
UpdateEndTime();
|
|
|
|
{
|
|
unique_lock<mutex> l(lock_);
|
|
// Update result set cache metrics, and update mem limit accounting before tearing
|
|
// down the coordinator.
|
|
ClearResultCache();
|
|
}
|
|
// Wait until the audit events are flushed.
|
|
if (wait_thread_.get() != nullptr) {
|
|
wait_thread_->Join();
|
|
wait_thread_.reset();
|
|
} else {
|
|
// The query failed in the fe even before a wait thread is launched. Synchronously
|
|
// flush log events to audit authorization errors, if any.
|
|
LogQueryEvents();
|
|
}
|
|
DCHECK(wait_thread_.get() == nullptr);
|
|
|
|
// Update the timeline here so that all of the above work is captured in the timeline.
|
|
query_events_->MarkEvent("Unregister query");
|
|
UnRegisterRemainingRPCs();
|
|
if (otel_trace_query()) {
|
|
otel_span_manager_->AddChildSpanEvent("QueryUnregistered");
|
|
otel_span_manager_->EndChildSpanClose();
|
|
|
|
// End the root span and thus the entire trace is also ended.
|
|
otel_span_manager_.reset();
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
|
|
TResultSet metadata_op_result;
|
|
// Like the other Exec(), fill out as much profile information as we're able to.
|
|
summary_profile_->AddInfoString("Query Type", PrintValue(TStmtType::DDL));
|
|
RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
|
|
&metadata_op_result));
|
|
result_metadata_ = metadata_op_result.schema;
|
|
request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
|
|
UpdateNonErrorExecState(ExecState::RUNNING);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status ClientRequestState::WaitAsync() {
|
|
// TODO: IMPALA-7396: thread creation fault inject is disabled because it is not
|
|
// handled correctly.
|
|
return Thread::Create("query-exec-state", "wait-thread",
|
|
&ClientRequestState::Wait, this, &wait_thread_, false);
|
|
}
|
|
|
|
bool ClientRequestState::BlockOnWait(int64_t timeout_us, int64_t* block_on_wait_time_us) {
|
|
DCHECK_GE(timeout_us, 0);
|
|
unique_lock<mutex> l(lock_);
|
|
*block_on_wait_time_us = 0;
|
|
// Some metadata operations like GET_COLUMNS do not rely on WaitAsync() to launch
|
|
// the wait thread. In such cases this method is expected to be a no-op.
|
|
if (wait_thread_.get() == nullptr) return true;
|
|
while (!is_wait_done_) {
|
|
if (timeout_us == 0) {
|
|
block_on_wait_cv_.Wait(l);
|
|
return true;
|
|
} else {
|
|
MonotonicStopWatch wait_timeout_timer;
|
|
wait_timeout_timer.Start();
|
|
bool notified = block_on_wait_cv_.WaitFor(l, timeout_us);
|
|
if (notified) {
|
|
*block_on_wait_time_us = wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO;
|
|
}
|
|
return notified;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void ClientRequestState::Wait() {
|
|
// block until results are ready
|
|
Status status = WaitInternal();
|
|
// Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
|
|
// how long Impala waits for the client to fetch rows. For other statements, track the
|
|
// time until a Close() is received.
|
|
MarkInactive();
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
if (returns_result_set()) {
|
|
query_events()->MarkEvent("Rows available");
|
|
if (LIKELY(status.code() != TErrorCode::CANCELLED) && otel_trace_query()) {
|
|
otel_span_manager_->AddChildSpanEvent("RowsAvailable");
|
|
}
|
|
} else {
|
|
query_events()->MarkEvent("Request finished");
|
|
UpdateEndTime();
|
|
}
|
|
discard_result(UpdateQueryStatus(status));
|
|
}
|
|
|
|
if (status.ok()) {
|
|
if (stmt_type() == TStmtType::DDL) {
|
|
DCHECK(catalog_op_type() != TCatalogOpType::DDL || request_result_set_ != nullptr);
|
|
}
|
|
// It is possible the query already failed at this point and ExecState is ERROR. In
|
|
// this case, the call to UpdateNonErrorExecState(FINISHED) does not change the
|
|
// ExecState.
|
|
UpdateNonErrorExecState(ExecState::FINISHED);
|
|
}
|
|
// UpdateQueryStatus() or UpdateNonErrorExecState() have updated exec_state_.
|
|
DCHECK(exec_state() == ExecState::FINISHED || exec_state() == ExecState::ERROR
|
|
|| retry_state() == RetryState::RETRYING || retry_state() == RetryState::RETRIED);
|
|
// Notify all the threads blocked on Wait() to finish and then log the query events,
|
|
// if any.
|
|
{
|
|
unique_lock<mutex> l(lock_);
|
|
is_wait_done_ = true;
|
|
}
|
|
block_on_wait_cv_.NotifyAll();
|
|
LogQueryEvents();
|
|
}
|
|
|
|
Status ClientRequestState::WaitInternal() {
|
|
// Explain requests have already populated the result set. Nothing to do here.
|
|
if (exec_request().stmt_type == TStmtType::EXPLAIN) {
|
|
return Status::OK();
|
|
}
|
|
|
|
// Wait until the query has passed through admission control and is either running or
|
|
// cancelled or encountered an error.
|
|
if (async_exec_thread_.get() != nullptr) async_exec_thread_->Join();
|
|
|
|
vector<ChildQuery*> child_queries;
|
|
Status child_queries_status = child_query_executor_->WaitForAll(&child_queries);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_IF_ERROR(query_status_);
|
|
RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
|
|
}
|
|
if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished");
|
|
|
|
bool isCTAS = catalog_op_type() == TCatalogOpType::DDL
|
|
&& ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT;
|
|
|
|
if (GetCoordinator() != NULL) {
|
|
Status status = GetCoordinator()->Wait();
|
|
if (UNLIKELY(!status.ok())) {
|
|
if (InKuduTransaction()) AbortKuduTransaction();
|
|
return status;
|
|
}
|
|
RETURN_IF_ERROR(UpdateCatalog());
|
|
} else {
|
|
// When the coordinator is not available for CTAS that requires a coordinator, check
|
|
// further if the query has been cancelled. If so, return immediately as there will
|
|
// be no query result available (IMPALA-11006).
|
|
if (isCTAS) {
|
|
RETURN_IF_CANCELLED(this);
|
|
}
|
|
}
|
|
|
|
if (catalog_op_type() == TCatalogOpType::DDL &&
|
|
ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
|
|
RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
|
|
}
|
|
|
|
if (!returns_result_set()) {
|
|
// Queries that do not return a result are finished at this point. This includes
|
|
// DML operations.
|
|
eos_.Store(true);
|
|
} else if (isCTAS) {
|
|
SetCreateTableAsSelectResultSet();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status ClientRequestState::FetchRows(const int32_t max_rows,
|
|
QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
|
|
// Pause the wait timer, since the client has instructed us to do work on its behalf.
|
|
MarkActive();
|
|
|
|
// ImpalaServer::FetchInternal has already taken our lock_
|
|
discard_result(UpdateQueryStatus(
|
|
FetchRowsInternal(max_rows, fetched_rows, block_on_wait_time_us)));
|
|
|
|
MarkInactive();
|
|
return query_status_;
|
|
}
|
|
|
|
Status ClientRequestState::RestartFetch() {
|
|
// No result caching for this query. Restart is invalid.
|
|
if (result_cache_max_size_ <= 0) {
|
|
return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
|
|
"Restarting of fetch requires enabling of query result caching."));
|
|
}
|
|
// The cache overflowed on a previous fetch.
|
|
if (result_cache_.get() == NULL) {
|
|
stringstream ss;
|
|
ss << "The query result cache exceeded its limit of " << result_cache_max_size_
|
|
<< " rows. Restarting the fetch is not possible.";
|
|
return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
|
|
}
|
|
// Reset fetch state to start over.
|
|
eos_.Store(false);
|
|
num_rows_fetched_ = 0;
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::UpdateNonErrorExecState(ExecState new_state) {
|
|
lock_guard<mutex> l(lock_);
|
|
ExecState old_state = exec_state();
|
|
static string error_msg = "Illegal state transition: $0 -> $1, query_id=$2";
|
|
switch (new_state) {
|
|
case ExecState::PENDING:
|
|
DCHECK(old_state == ExecState::INITIALIZED)
|
|
<< Substitute(error_msg, ExecStateToString(old_state),
|
|
ExecStateToString(new_state), PrintId(query_id()));
|
|
UpdateExecState(new_state);
|
|
break;
|
|
case ExecState::RUNNING:
|
|
// It is possible for FinishExecQueryOrDmlRequest() to attempt a transition to
|
|
// running, even after the query has been cancelled with an error status (and is
|
|
// thus in the ERROR ExecState). In this case, just ignore the transition attempt.
|
|
if (old_state != ExecState::ERROR) {
|
|
// DDL statements and metadata ops don't use the PENDING state, so a query can
|
|
// transition directly from the INITIALIZED to RUNNING state.
|
|
DCHECK(old_state == ExecState::INITIALIZED || old_state == ExecState::PENDING)
|
|
<< Substitute(error_msg, ExecStateToString(old_state),
|
|
ExecStateToString(new_state), PrintId(query_id()));
|
|
UpdateExecState(new_state);
|
|
}
|
|
break;
|
|
case ExecState::FINISHED:
|
|
// Only transition to the FINISHED state if the query has not failed. It is not
|
|
// valid to transition from ERROR to FINISHED, so skip any attempt to do so.
|
|
if (old_state != ExecState::ERROR) {
|
|
// A query can transition from PENDING to FINISHED if it is cancelled by the
|
|
// client. NO_OP statements can also transition from INITIALIZED to FINISHED.
|
|
bool valid_transition =
|
|
old_state == ExecState::PENDING || old_state == ExecState::RUNNING
|
|
|| (stmt_type() == TStmtType::NO_OP && old_state == ExecState::INITIALIZED);
|
|
DCHECK(valid_transition)
|
|
<< Substitute(error_msg, ExecStateToString(old_state),
|
|
ExecStateToString(new_state), PrintId(query_id()));
|
|
UpdateExecState(new_state);
|
|
}
|
|
break;
|
|
default:
|
|
DCHECK(false) << "A non-error state expected but got: "
|
|
<< ExecStateToString(new_state);
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetOriginalId(const TUniqueId& original_id) {
|
|
// Copy the TUniqueId query_id from the original query.
|
|
original_id_ = make_unique<TUniqueId>(original_id);
|
|
summary_profile_->AddInfoString("Original Query Id", PrintId(*original_id_));
|
|
}
|
|
|
|
void ClientRequestState::MarkAsRetrying(const Status& status) {
|
|
retry_state_.Store(RetryState::RETRYING);
|
|
summary_profile_->AddInfoString(
|
|
RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRYING));
|
|
|
|
// Set the query status.
|
|
query_status_ = status;
|
|
summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail());
|
|
// The Query Status might be overwritten later if the retry fails. "Retry Cause"
|
|
// preserves the original error that triggered the retry.
|
|
summary_profile_->AddInfoStringRedacted("Retry Cause", query_status_.GetDetail());
|
|
}
|
|
|
|
Status ClientRequestState::UpdateQueryStatus(const Status& status, bool log_error) {
|
|
// Preserve the first non-ok status
|
|
if (!status.ok() && query_status_.ok()) {
|
|
UpdateExecState(ExecState::ERROR);
|
|
query_status_ = status;
|
|
summary_profile_->AddInfoStringRedacted(QUERY_STATUS_KEY, query_status_.GetDetail());
|
|
if (log_error) VLOG_QUERY << status.GetDetail();
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
|
|
QueryResultSet* fetched_rows, int64_t block_on_wait_time_us) {
|
|
// Wait() guarantees that we've transitioned at least to FINISHED state (and any
|
|
// state beyond that should have a non-OK query_status_ set).
|
|
DCHECK(exec_state() == ExecState::FINISHED);
|
|
|
|
if (eos_.Load()) return Status::OK();
|
|
|
|
if (request_result_set_ != NULL) {
|
|
int num_rows = 0;
|
|
const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
|
|
// max_rows <= 0 means no limit
|
|
while ((num_rows < max_rows || max_rows <= 0)
|
|
&& num_rows_fetched_ < all_rows.size()) {
|
|
RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_]));
|
|
++num_rows_fetched_;
|
|
++num_rows;
|
|
}
|
|
eos_.Store(num_rows_fetched_ == all_rows.size());
|
|
return Status::OK();
|
|
}
|
|
|
|
Coordinator* coordinator = GetCoordinator();
|
|
if (coordinator == nullptr) {
|
|
return Status("Client tried to fetch rows on a query that produces no results.");
|
|
}
|
|
|
|
int32_t num_rows_fetched_from_cache = 0;
|
|
if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
|
|
// Satisfy the fetch from the result cache if possible.
|
|
int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
|
|
num_rows_fetched_from_cache =
|
|
fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
|
|
num_rows_fetched_ += num_rows_fetched_from_cache;
|
|
COUNTER_ADD(num_rows_fetched_from_cache_counter_, num_rows_fetched_from_cache);
|
|
if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
|
|
}
|
|
|
|
// Maximum number of rows to be fetched from the coord.
|
|
int32_t max_coord_rows = max_rows;
|
|
if (max_rows > 0) {
|
|
DCHECK_LE(num_rows_fetched_from_cache, max_rows);
|
|
max_coord_rows = max_rows - num_rows_fetched_from_cache;
|
|
}
|
|
{
|
|
SCOPED_TIMER(row_materialization_timer_);
|
|
size_t before = fetched_rows->size();
|
|
bool eos = false;
|
|
|
|
// Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
|
|
// (already held) ensures that we do not call coord_->GetNext() multiple times
|
|
// concurrently.
|
|
// TODO: Simplify this.
|
|
lock_.unlock();
|
|
Status status =
|
|
coordinator->GetNext(fetched_rows, max_coord_rows, &eos, block_on_wait_time_us);
|
|
lock_.lock();
|
|
|
|
if (eos) eos_.Store(true);
|
|
|
|
int num_fetched = fetched_rows->size() - before;
|
|
DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
|
|
"Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
|
|
num_rows_fetched_ += num_fetched;
|
|
COUNTER_ADD(num_rows_fetched_counter_, num_fetched);
|
|
|
|
RETURN_IF_ERROR(status);
|
|
// Check if query status has changed during GetNext() call
|
|
if (!query_status_.ok()) {
|
|
eos_.Store(true);
|
|
return query_status_;
|
|
}
|
|
}
|
|
|
|
// Update the result cache if necessary.
|
|
if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
|
|
int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache;
|
|
if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
|
|
// Set the cache to NULL to indicate that adding the rows fetched from the coord
|
|
// would exceed the bound of the cache, and therefore, RestartFetch() should fail.
|
|
ClearResultCache();
|
|
return Status::OK();
|
|
}
|
|
|
|
// We guess the size of the cache after adding fetched_rows by looking at the size of
|
|
// fetched_rows itself, and using this estimate to confirm that the memtracker will
|
|
// allow us to use this much extra memory. In fact, this might be an overestimate, as
|
|
// the size of two result sets combined into one is not always the size of both result
|
|
// sets added together (the best example is the null bitset for each column: it might
|
|
// have only one entry in each result set, and as a result consume two bytes, but when
|
|
// the result sets are combined, only one byte is needed). Therefore after we add the
|
|
// new result set into the cache, we need to fix up the memory consumption to the
|
|
// actual levels to ensure we don't 'leak' bytes that we aren't using.
|
|
int64_t before = result_cache_->ByteSize();
|
|
|
|
// Upper-bound on memory required to add fetched_rows to the cache.
|
|
int64_t delta_bytes =
|
|
fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
|
|
MemTracker* query_mem_tracker = coordinator->query_mem_tracker();
|
|
// Count the cached rows towards the mem limit.
|
|
if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
|
|
string details("Failed to allocate memory for result cache.");
|
|
return query_mem_tracker->MemLimitExceeded(nullptr, details, delta_bytes);
|
|
}
|
|
// Append all rows fetched from the coordinator into the cache.
|
|
int num_rows_added = result_cache_->AddRows(
|
|
fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
|
|
|
|
int64_t after = result_cache_->ByteSize();
|
|
|
|
// Confirm that this was not an underestimate of the memory required.
|
|
DCHECK_GE(before + delta_bytes, after)
|
|
<< "Combined result sets consume more memory than both individually "
|
|
<< Substitute("(before: $0, delta_bytes: $1, after: $2)",
|
|
before, delta_bytes, after);
|
|
|
|
// Fix up the tracked values
|
|
if (before + delta_bytes > after) {
|
|
query_mem_tracker->Release(before + delta_bytes - after);
|
|
delta_bytes = after - before;
|
|
}
|
|
|
|
// Update result set cache metrics.
|
|
ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
|
|
ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized) {
|
|
// If planning is not done, attempt to cancel query in the frontend.
|
|
if (!is_planning_done_.load()) {
|
|
Status status = frontend_->CancelExecRequest(query_id());
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "Error cancelling planning for query " << PrintId(query_id())
|
|
<< ": " << status;
|
|
}
|
|
}
|
|
|
|
// Clean up completed RPCs before cancelling backends.
|
|
UnRegisterCompletedRPCs();
|
|
|
|
{
|
|
lock_guard<mutex> lock(lock_);
|
|
// If the query has reached a terminal state, no need to update the state.
|
|
bool already_done = eos_.Load() || exec_state() == ExecState::ERROR;
|
|
if (!already_done && cause != NULL) {
|
|
DCHECK(!cause->ok());
|
|
discard_result(UpdateQueryStatus(*cause));
|
|
query_events_->MarkEvent("Cancelled");
|
|
DCHECK(exec_state() == ExecState::ERROR
|
|
|| retry_state() == RetryState::RETRYING);
|
|
}
|
|
|
|
// To avoid recalling RemoteAdmissionControlClient::CancelAdmission() since it will
|
|
// send extra RPC.
|
|
if (!is_cancelled_) {
|
|
admission_control_client_->CancelAdmission();
|
|
is_cancelled_ = true;
|
|
}
|
|
} // Release lock_ before doing cancellation work.
|
|
|
|
// Cancel and close child queries before cancelling parent. 'lock_' should not be held
|
|
// because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation
|
|
// involves RPCs and can take quite some time.
|
|
child_query_executor_->Cancel();
|
|
|
|
// Ensure the parent query is cancelled if execution has started (if the query was not
|
|
// started, cancellation is handled by the 'async-exec-thread' thread). 'lock_' should
|
|
// not be held because cancellation involves RPCs and can block for a long time.
|
|
if (GetCoordinator() != nullptr) GetCoordinator()->Cancel(wait_until_finalized);
|
|
}
|
|
|
|
Status ClientRequestState::UpdateCatalog() {
|
|
const TExecRequest& exec_req = exec_request();
|
|
if (!exec_req.__isset.query_exec_request ||
|
|
exec_req.query_exec_request.stmt_type != TStmtType::DML) {
|
|
return Status::OK();
|
|
}
|
|
|
|
query_events_->MarkEvent("DML data written");
|
|
SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
|
|
|
|
const TQueryExecRequest& query_exec_request = exec_req.query_exec_request;
|
|
if (query_exec_request.__isset.finalize_params) {
|
|
const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
|
|
TUpdateCatalogRequest catalog_update;
|
|
catalog_update.__set_sync_ddl(exec_req.query_options.sync_ddl);
|
|
catalog_update.__set_header(GetCatalogServiceRequestHeader());
|
|
if (exec_req.query_options.__isset.debug_action) {
|
|
catalog_update.__set_debug_action(exec_req.query_options.debug_action);
|
|
}
|
|
DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
|
|
if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update, finalize_params)) {
|
|
VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
|
|
<< PrintId(query_id()) << ")";
|
|
} else {
|
|
// TODO: We track partitions written to, not created, which means
|
|
// that we do more work than is necessary, because written-to
|
|
// partitions don't always require a metastore change.
|
|
if (VLOG_IS_ON(1)) {
|
|
vector<string> part_list;
|
|
for (auto it : catalog_update.updated_partitions) part_list.push_back(it.first);
|
|
VLOG_QUERY << "Updating metastore with "
|
|
<< catalog_update.updated_partitions.size()
|
|
<< " altered partitions ("
|
|
<< join (part_list, ", ") << ")";
|
|
}
|
|
|
|
catalog_update.target_table = finalize_params.table_name;
|
|
catalog_update.db_name = finalize_params.table_db;
|
|
catalog_update.is_overwrite = finalize_params.is_overwrite;
|
|
if (InTransaction()) {
|
|
catalog_update.__set_transaction_id(finalize_params.transaction_id);
|
|
catalog_update.__set_write_id(finalize_params.write_id);
|
|
}
|
|
if (finalize_params.__isset.iceberg_params) {
|
|
TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation;
|
|
catalog_update.__isset.iceberg_operation = true;
|
|
if (!CreateIcebergCatalogOps(finalize_params, &cat_ice_op)) {
|
|
VLOG_QUERY << "No Iceberg partitions altered, not updating metastore "
|
|
<< "(query id: " << PrintId(query_id()) << ")";
|
|
return Status::OK();
|
|
}
|
|
}
|
|
|
|
Status cnxn_status;
|
|
CatalogServiceConnection client(ExecEnv::GetInstance()->catalogd_client_cache(),
|
|
*ExecEnv::GetInstance()->GetCatalogdAddress().get(), &cnxn_status);
|
|
RETURN_IF_ERROR(cnxn_status);
|
|
|
|
VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
|
|
TUpdateCatalogResponse resp;
|
|
Status status = DebugAction(query_options(), "CLIENT_REQUEST_UPDATE_CATALOG");
|
|
if (status.ok()) {
|
|
status = client.DoRpc(
|
|
&CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp);
|
|
query_events_->MarkEvent("UpdateCatalog finished");
|
|
}
|
|
if (resp.__isset.profile) {
|
|
for (const TEventSequence& catalog_timeline : resp.profile.event_sequences) {
|
|
string timeline_name = catalog_timeline.name;
|
|
// For CTAS, we already have a timeline for the CreateTable execution.
|
|
// Use another name for the INSERT timeline.
|
|
if (summary_profile_->GetEventSequence(timeline_name) != nullptr) {
|
|
timeline_name += " 2";
|
|
}
|
|
summary_profile_->AddEventSequence(timeline_name, catalog_timeline);
|
|
}
|
|
}
|
|
if (status.ok()) status = Status(resp.result.status);
|
|
if (!status.ok()) {
|
|
if (InTransaction()) AbortTransaction();
|
|
LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
|
|
return status;
|
|
}
|
|
if (InTransaction()) {
|
|
// UpdateCatalog() succeeded and already committed the transaction for us.
|
|
int64_t txn_id = GetTransactionId();
|
|
if (!frontend_->UnregisterTransaction(txn_id).ok()) {
|
|
LOG(ERROR) << Substitute("Failed to unregister transaction $0", txn_id);
|
|
}
|
|
ClearTransactionState();
|
|
query_events_->MarkEvent("Transaction committed");
|
|
}
|
|
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
|
|
exec_req.query_options.sync_ddl, query_options(), query_events_));
|
|
}
|
|
} else if (InKuduTransaction()) {
|
|
// Commit the Kudu transaction. Clear transaction state if it's successful.
|
|
// Otherwise, abort the Kudu transaction and clear transaction state.
|
|
// Note that TQueryExecRequest.finalize_params is not set for inserting rows to Kudu
|
|
// table.
|
|
Status status = CommitKuduTransaction();
|
|
if (UNLIKELY(!status.ok())) {
|
|
AbortKuduTransaction();
|
|
LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
|
|
return status;
|
|
}
|
|
}
|
|
query_events_->MarkEvent("DML Metastore update finished");
|
|
if (otel_trace_query()) {
|
|
otel_span_manager_->AddChildSpanEvent("MetastoreUpdateFinished");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
bool ClientRequestState::CreateIcebergCatalogOps(
|
|
const TFinalizeParams& finalize_params, TIcebergOperationParam* cat_ice_op) {
|
|
DCHECK(cat_ice_op != nullptr);
|
|
const TIcebergDmlFinalizeParams& ice_finalize_params = finalize_params.iceberg_params;
|
|
DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
|
|
bool update_catalog = true;
|
|
cat_ice_op->__set_operation(ice_finalize_params.operation);
|
|
cat_ice_op->__set_initial_snapshot_id(
|
|
ice_finalize_params.initial_snapshot_id);
|
|
cat_ice_op->__set_spec_id(ice_finalize_params.spec_id);
|
|
if (ice_finalize_params.operation == TIcebergOperation::INSERT) {
|
|
cat_ice_op->__set_iceberg_data_files_fb(
|
|
dml_exec_state->CreateIcebergDataFilesVector());
|
|
cat_ice_op->__set_is_overwrite(finalize_params.is_overwrite);
|
|
if (cat_ice_op->iceberg_data_files_fb.empty()) update_catalog = false;
|
|
} else if (ice_finalize_params.operation == TIcebergOperation::DELETE) {
|
|
cat_ice_op->__set_iceberg_delete_files_fb(
|
|
dml_exec_state->CreateIcebergDeleteFilesVector());
|
|
cat_ice_op->__set_data_files_referenced_by_position_deletes(
|
|
dml_exec_state->DataFilesReferencedByPositionDeletes());
|
|
if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false;
|
|
} else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) {
|
|
cat_ice_op->__set_iceberg_data_files_fb(
|
|
dml_exec_state->CreateIcebergDataFilesVector());
|
|
cat_ice_op->__set_iceberg_delete_files_fb(
|
|
dml_exec_state->CreateIcebergDeleteFilesVector());
|
|
cat_ice_op->__set_data_files_referenced_by_position_deletes(
|
|
dml_exec_state->DataFilesReferencedByPositionDeletes());
|
|
if (cat_ice_op->iceberg_delete_files_fb.empty()) {
|
|
DCHECK(cat_ice_op->iceberg_data_files_fb.empty());
|
|
update_catalog = false;
|
|
}
|
|
} else if (ice_finalize_params.operation == TIcebergOperation::OPTIMIZE) {
|
|
DCHECK(ice_finalize_params.__isset.optimize_params);
|
|
const TIcebergOptimizeParams& optimize_params = ice_finalize_params.optimize_params;
|
|
if (optimize_params.mode == TIcebergOptimizationMode::NOOP) {
|
|
update_catalog = false;
|
|
} else {
|
|
cat_ice_op->__set_iceberg_data_files_fb(
|
|
dml_exec_state->CreateIcebergDataFilesVector());
|
|
if (optimize_params.mode == TIcebergOptimizationMode::PARTIAL) {
|
|
DCHECK(optimize_params.__isset.selected_data_files_without_deletes);
|
|
cat_ice_op->__set_replaced_data_files_without_deletes(
|
|
optimize_params.selected_data_files_without_deletes);
|
|
}
|
|
}
|
|
} else if (ice_finalize_params.operation == TIcebergOperation::MERGE) {
|
|
cat_ice_op->__set_iceberg_data_files_fb(
|
|
dml_exec_state->CreateIcebergDataFilesVector());
|
|
cat_ice_op->__set_iceberg_delete_files_fb(
|
|
dml_exec_state->CreateIcebergDeleteFilesVector());
|
|
cat_ice_op->__set_data_files_referenced_by_position_deletes(
|
|
dml_exec_state->DataFilesReferencedByPositionDeletes());
|
|
if (cat_ice_op->iceberg_delete_files_fb.empty()
|
|
&& cat_ice_op->iceberg_data_files_fb.empty()) {
|
|
update_catalog = false;
|
|
}
|
|
}
|
|
if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement");
|
|
return update_catalog;
|
|
}
|
|
|
|
void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) {
|
|
if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
|
|
result_metadata_ = ddl_resp->result_set.schema;
|
|
request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows));
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetResultSet(const vector<string>& results) {
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->resize(results.size());
|
|
for (int i = 0; i < results.size(); ++i) {
|
|
(*request_result_set_.get())[i].__isset.colVals = true;
|
|
(*request_result_set_.get())[i].colVals.resize(1);
|
|
(*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetResultSet(const vector<string>& col1,
|
|
const vector<string>& col2) {
|
|
DCHECK_EQ(col1.size(), col2.size());
|
|
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->resize(col1.size());
|
|
for (int i = 0; i < col1.size(); ++i) {
|
|
(*request_result_set_.get())[i].__isset.colVals = true;
|
|
(*request_result_set_.get())[i].colVals.resize(2);
|
|
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
|
|
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetResultSet(const vector<string>& col1,
|
|
const vector<string>& col2, const vector<string>& col3) {
|
|
DCHECK_EQ(col1.size(), col2.size());
|
|
DCHECK_EQ(col1.size(), col3.size());
|
|
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->resize(col1.size());
|
|
for (int i = 0; i < col1.size(); ++i) {
|
|
(*request_result_set_.get())[i].__isset.colVals = true;
|
|
(*request_result_set_.get())[i].colVals.resize(3);
|
|
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
|
|
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
|
|
(*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetResultSet(const vector<string>& col1,
|
|
const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) {
|
|
DCHECK_EQ(col1.size(), col2.size());
|
|
DCHECK_EQ(col1.size(), col3.size());
|
|
DCHECK_EQ(col1.size(), col4.size());
|
|
|
|
request_result_set_.reset(new vector<TResultRow>);
|
|
request_result_set_->resize(col1.size());
|
|
for (int i = 0; i < col1.size(); ++i) {
|
|
(*request_result_set_.get())[i].__isset.colVals = true;
|
|
(*request_result_set_.get())[i].colVals.resize(4);
|
|
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
|
|
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
|
|
(*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
|
|
(*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::SetCreateTableAsSelectResultSet() {
|
|
DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
|
|
int64_t total_num_rows_inserted = 0;
|
|
// There will only be rows inserted in the case a new table was created as part of this
|
|
// operation.
|
|
if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
|
|
DCHECK(GetCoordinator());
|
|
total_num_rows_inserted = GetCoordinator()->dml_exec_state()->GetNumModifiedRows();
|
|
}
|
|
const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
|
|
VLOG_QUERY << summary_msg;
|
|
vector<string> results(1, summary_msg);
|
|
SetResultSet(results);
|
|
}
|
|
|
|
void ClientRequestState::MarkInactive() {
|
|
client_wait_sw_.Start();
|
|
lock_guard<mutex> l(expiration_data_lock_);
|
|
last_active_time_ms_ = UnixMillis();
|
|
DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
|
|
--ref_count_;
|
|
}
|
|
|
|
void ClientRequestState::MarkActive() {
|
|
client_wait_sw_.Stop();
|
|
int64_t elapsed_time = client_wait_sw_.ElapsedTime();
|
|
// If we have reached eos, then the query is already complete,
|
|
// and we should not accumulate more client wait time. This mostly
|
|
// impacts the finalization step, where the client is closing the
|
|
// query and does not need any more rows. Fetching may have already
|
|
// completed prior to this point, so finalization time should not
|
|
// count in that case. If the fetch was incomplete, then the client
|
|
// time should be counted for finalization as well.
|
|
if (!eos()) {
|
|
client_wait_timer_->Set(elapsed_time);
|
|
// The first call is before any MarkInactive() call has run and produces
|
|
// a zero-length sample. Skip this zero-length sample (but not any later
|
|
// zero-length samples).
|
|
if (elapsed_time != 0 || last_client_wait_time_ != 0) {
|
|
int64_t current_wait_time = elapsed_time - last_client_wait_time_;
|
|
client_wait_time_stats_->UpdateCounter(current_wait_time);
|
|
}
|
|
last_client_wait_time_ = elapsed_time;
|
|
}
|
|
lock_guard<mutex> l(expiration_data_lock_);
|
|
last_active_time_ms_ = UnixMillis();
|
|
++ref_count_;
|
|
}
|
|
|
|
// Used by RETURN_IF_CANCELLED.
|
|
bool ClientRequestState::is_cancelled() {
|
|
lock_guard<mutex> l(lock_);
|
|
return is_cancelled_;
|
|
}
|
|
|
|
std::optional<long> getIcebergSnapshotId(const TExecRequest& exec_req) {
|
|
DCHECK(exec_req.__isset.catalog_op_request);
|
|
DCHECK(exec_req.catalog_op_request.__isset.ddl_params);
|
|
DCHECK(exec_req.catalog_op_request.ddl_params.__isset.compute_stats_params);
|
|
|
|
const TComputeStatsParams& compute_stats_params =
|
|
exec_req.catalog_op_request.ddl_params.compute_stats_params;
|
|
if (compute_stats_params.__isset.iceberg_snapshot_id) {
|
|
return std::optional<long>(compute_stats_params.iceberg_snapshot_id);
|
|
} else {
|
|
return {};
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::UpdateTableAndColumnStats(
|
|
const vector<ChildQuery*>& child_queries) {
|
|
DCHECK_GE(child_queries.size(), 1);
|
|
DCHECK_LE(child_queries.size(), 2);
|
|
catalog_op_executor_.reset(
|
|
new CatalogOpExecutor(ExecEnv::GetInstance(), frontend_, server_profile_));
|
|
|
|
// If there was no column stats query, pass in empty thrift structures to
|
|
// ExecComputeStats(). Otherwise pass in the column stats result.
|
|
TTableSchema col_stats_schema;
|
|
TRowSet col_stats_data;
|
|
if (child_queries.size() > 1) {
|
|
col_stats_schema = child_queries[1]->result_schema();
|
|
col_stats_data = child_queries[1]->result_data();
|
|
}
|
|
|
|
const TExecRequest& exec_req = exec_request();
|
|
std::optional<long> snapshot_id = getIcebergSnapshotId(exec_req);
|
|
Status status = catalog_op_executor_->ExecComputeStats(
|
|
GetCatalogServiceRequestHeader(),
|
|
exec_req.catalog_op_request,
|
|
child_queries[0]->result_schema(),
|
|
child_queries[0]->result_data(),
|
|
col_stats_schema,
|
|
col_stats_data,
|
|
snapshot_id);
|
|
AddCatalogTimeline();
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_IF_ERROR(UpdateQueryStatus(status));
|
|
}
|
|
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
|
|
*catalog_op_executor_->update_catalog_result(),
|
|
exec_req.query_options.sync_ddl, query_options(), query_events_));
|
|
|
|
// Set the results to be reported to the client.
|
|
SetResultSet(catalog_op_executor_->ddl_exec_response());
|
|
query_events_->MarkEvent("Metastore update finished");
|
|
return Status::OK();
|
|
}
|
|
|
|
void ClientRequestState::ClearResultCache() {
|
|
if (result_cache_ == nullptr) return;
|
|
// Update result set cache metrics and mem limit accounting.
|
|
ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
|
|
int64_t total_bytes = result_cache_->ByteSize();
|
|
ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
|
|
Coordinator* coordinator = GetCoordinator();
|
|
if (coordinator != nullptr) {
|
|
DCHECK(coordinator->query_mem_tracker() != nullptr);
|
|
coordinator->query_mem_tracker()->Release(total_bytes);
|
|
}
|
|
result_cache_.reset();
|
|
}
|
|
|
|
void ClientRequestState::UpdateExecState(ExecState exec_state) {
|
|
{
|
|
lock_guard<mutex> l(exec_state_lock_);
|
|
exec_state_.Store(exec_state);
|
|
summary_profile_->AddInfoString("Query State", PrintValue(BeeswaxQueryState()));
|
|
summary_profile_->AddInfoString("Impala Query State", ExecStateToString(exec_state));
|
|
}
|
|
// Drop exec_state_lock_ before signalling
|
|
exec_state_cv_.NotifyAll();
|
|
}
|
|
|
|
void ClientRequestState::WaitForCompletionExecState() {
|
|
if (query_options().long_polling_time_ms <= 0) return;
|
|
int64_t timeout_us = query_options().long_polling_time_ms * MICROS_PER_MILLI;
|
|
unique_lock<mutex> l(exec_state_lock_);
|
|
timespec deadline;
|
|
TimeFromNowMicros(timeout_us, &deadline);
|
|
bool timed_out = false;
|
|
while (exec_state() != ExecState::FINISHED &&
|
|
exec_state() != ExecState::ERROR &&
|
|
!timed_out) {
|
|
timed_out = !exec_state_cv_.WaitUntil(l, deadline);
|
|
}
|
|
}
|
|
|
|
TOperationState::type ClientRequestState::TOperationState() const {
|
|
switch (exec_state()) {
|
|
case ExecState::INITIALIZED: return TOperationState::INITIALIZED_STATE;
|
|
case ExecState::PENDING: return TOperationState::PENDING_STATE;
|
|
case ExecState::RUNNING: return TOperationState::RUNNING_STATE;
|
|
case ExecState::FINISHED: return TOperationState::FINISHED_STATE;
|
|
case ExecState::ERROR: return TOperationState::ERROR_STATE;
|
|
default: {
|
|
DCHECK(false) << "Add explicit translation for all used ExecState values";
|
|
return TOperationState::ERROR_STATE;
|
|
}
|
|
}
|
|
}
|
|
|
|
beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
|
|
switch (exec_state()) {
|
|
case ExecState::INITIALIZED: return beeswax::QueryState::CREATED;
|
|
case ExecState::PENDING: return beeswax::QueryState::COMPILED;
|
|
case ExecState::RUNNING: return beeswax::QueryState::RUNNING;
|
|
case ExecState::FINISHED: return beeswax::QueryState::FINISHED;
|
|
case ExecState::ERROR: return beeswax::QueryState::EXCEPTION;
|
|
default: {
|
|
DCHECK(false) << "Add explicit translation for all used ExecState values";
|
|
return beeswax::QueryState::EXCEPTION;
|
|
}
|
|
}
|
|
}
|
|
|
|
// It is safe to use 'coord_' directly for the following two methods since they are safe
|
|
// to call concurrently with Coordinator::Exec(). See comments for 'coord_' and
|
|
// 'coord_exec_called_' for more details.
|
|
Status ClientRequestState::UpdateBackendExecStatus(
|
|
const ReportExecStatusRequestPB& request,
|
|
const TRuntimeProfileForest& thrift_profiles) {
|
|
DCHECK(coord_.get());
|
|
return coord_->UpdateBackendExecStatus(request, thrift_profiles);
|
|
}
|
|
|
|
void ClientRequestState::UpdateFilter(
|
|
const UpdateFilterParamsPB& params, RpcContext* context) {
|
|
DCHECK(coord_.get());
|
|
coord_->UpdateFilter(params, context);
|
|
}
|
|
|
|
bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) {
|
|
lock_guard<mutex> l(lock_);
|
|
*query_status = query_status_;
|
|
if (!query_status->ok()) return false;
|
|
// Coord may be NULL for a SELECT with LIMIT 0.
|
|
// Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
|
|
// need to revisit this, since that might lead us to insert a row without a
|
|
// coordinator, depending on how we choose to drive the table sink.
|
|
Coordinator* coord = GetCoordinator();
|
|
if (coord == nullptr) return false;
|
|
coord->dml_exec_state()->ToTDmlResult(dml_result);
|
|
return true;
|
|
}
|
|
|
|
void ClientRequestState::WaitUntilRetried() {
|
|
unique_lock<mutex> l(lock_);
|
|
DCHECK(retry_state() != RetryState::NOT_RETRIED);
|
|
while (retry_state() == RetryState::RETRYING) {
|
|
block_until_retried_cv_.Wait(l);
|
|
}
|
|
DCHECK(retry_state() == RetryState::RETRIED
|
|
|| exec_state() == ExecState::ERROR);
|
|
}
|
|
|
|
void ClientRequestState::MarkAsRetried(const TUniqueId& retried_id) {
|
|
DCHECK(retry_state() == RetryState::RETRYING)
|
|
<< Substitute("Illegal retry state transition: $0 -> RETRYING, query_id=$2",
|
|
RetryStateToString(retry_state()), PrintId(query_id()));
|
|
retry_state_.Store(RetryState::RETRIED);
|
|
summary_profile_->AddInfoStringRedacted(
|
|
RETRY_STATUS_KEY, RetryStateToString(RetryState::RETRIED));
|
|
summary_profile_->AddInfoString("Retried Query Id", PrintId(retried_id));
|
|
UpdateExecState(ExecState::ERROR);
|
|
block_until_retried_cv_.NotifyOne();
|
|
retried_id_ = make_unique<TUniqueId>(retried_id);
|
|
}
|
|
|
|
const string& ClientRequestState::effective_user() const {
|
|
return GetEffectiveUser(query_ctx_.session);
|
|
}
|
|
|
|
void ClientRequestState::UpdateEndTime() {
|
|
// Update the query's end time only if it isn't set previously.
|
|
if (end_time_us_.CompareAndSwap(0, UnixMicros())) {
|
|
// Certain API clients expect Start Time and End Time to be date-time strings
|
|
// of nanosecond precision, so we explicitly specify the precision here.
|
|
summary_profile_->AddInfoString(
|
|
"End Time", ToStringFromUnixMicros(end_time_us(), TimePrecision::Nanosecond));
|
|
int64_t duration = end_time_us() - start_time_us();
|
|
summary_profile_->AddInfoString("Duration", Substitute("$0 ($1 us)",
|
|
PrettyPrinter::Print(duration, TUnit::TIME_US), duration));
|
|
}
|
|
}
|
|
|
|
int64_t ClientRequestState::GetTransactionId() const {
|
|
DCHECK(InTransaction());
|
|
return exec_request().query_exec_request.finalize_params.transaction_id;
|
|
}
|
|
|
|
bool ClientRequestState::InTransaction() const {
|
|
return exec_request().query_exec_request.finalize_params.__isset.transaction_id &&
|
|
!transaction_closed_;
|
|
}
|
|
|
|
void ClientRequestState::AbortTransaction() {
|
|
DCHECK(InTransaction());
|
|
if (frontend_->AbortTransaction(GetTransactionId()).ok()) {
|
|
query_events_->MarkEvent("Transaction aborted");
|
|
} else {
|
|
VLOG(1) << Substitute("Unable to abort transaction with id: $0", GetTransactionId());
|
|
}
|
|
ClearTransactionState();
|
|
}
|
|
|
|
void ClientRequestState::ClearTransactionState() {
|
|
DCHECK(InTransaction());
|
|
transaction_closed_ = true;
|
|
}
|
|
|
|
bool ClientRequestState::InKuduTransaction() const {
|
|
// If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional
|
|
// is set as true by Frontend.doCreateExecRequest().
|
|
return (exec_request().query_exec_request.query_ctx.is_kudu_transactional
|
|
&& !transaction_closed_);
|
|
}
|
|
|
|
void ClientRequestState::AbortKuduTransaction() {
|
|
DCHECK(InKuduTransaction());
|
|
if (frontend_->AbortKuduTransaction(query_ctx_.query_id).ok()) {
|
|
query_events_->MarkEvent("Kudu transaction aborted");
|
|
} else {
|
|
VLOG(1) << Substitute("Unable to abort Kudu transaction with query-id: $0",
|
|
PrintId(query_ctx_.query_id));
|
|
}
|
|
transaction_closed_ = true;
|
|
}
|
|
|
|
Status ClientRequestState::CommitKuduTransaction() {
|
|
DCHECK(InKuduTransaction());
|
|
// Skip calling Commit() for Kudu Transaction with a debug action so that test code
|
|
// could explicitly control over calling Commit().
|
|
Status status = DebugAction(exec_request().query_options, "CRS_NOT_COMMIT_KUDU_TXN");
|
|
if (UNLIKELY(!status.ok())) {
|
|
VLOG(1) << Substitute("Skip to commit Kudu transaction with query-id: $0",
|
|
PrintId(query_ctx_.query_id));
|
|
transaction_closed_ = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
status = frontend_->CommitKuduTransaction(query_ctx_.query_id);
|
|
if (status.ok()) {
|
|
query_events_->MarkEvent("Kudu transaction committed");
|
|
transaction_closed_ = true;
|
|
} else {
|
|
VLOG(1) << Substitute("Unable to commit Kudu transaction with query-id: $0",
|
|
PrintId(query_ctx_.query_id));
|
|
}
|
|
return status;
|
|
}
|
|
|
|
void ClientRequestState::LogQueryEvents() {
|
|
// Wait until the results are available. This guarantees the completion of non QUERY
|
|
// statements like DDL/DML etc. Query events are logged if the query reaches a FINISHED
|
|
// state. For certain query types, events are logged regardless of the query state.
|
|
Status status;
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
status = query_status();
|
|
}
|
|
bool log_events = true;
|
|
switch (stmt_type()) {
|
|
case TStmtType::QUERY:
|
|
case TStmtType::DML:
|
|
case TStmtType::DDL:
|
|
case TStmtType::UNKNOWN:
|
|
log_events = status.ok();
|
|
break;
|
|
case TStmtType::EXPLAIN:
|
|
case TStmtType::LOAD:
|
|
case TStmtType::SET:
|
|
case TStmtType::ADMIN_FN:
|
|
default:
|
|
break;
|
|
}
|
|
|
|
// Log audit events that are due to an AuthorizationException.
|
|
if (parent_server_->IsAuditEventLoggingEnabled() &&
|
|
(Frontend::IsAuthorizationError(status) || log_events)) {
|
|
// TODO: deal with an error status
|
|
discard_result(LogAuditRecord(status));
|
|
}
|
|
|
|
if (log_events && (parent_server_->AreQueryHooksEnabled() ||
|
|
parent_server_->IsLineageLoggingEnabled())) {
|
|
// TODO: deal with an error status
|
|
discard_result(LogLineageRecord());
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::LogAuditRecord(const Status& query_status) {
|
|
const TExecRequest& request = exec_request();
|
|
stringstream ss;
|
|
rapidjson::StringBuffer buffer;
|
|
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
|
|
|
|
writer.StartObject();
|
|
// Each log entry is a timestamp mapped to a JSON object
|
|
ss << UnixMillis();
|
|
writer.String(ss.str().c_str());
|
|
writer.StartObject();
|
|
writer.String("query_id");
|
|
writer.String(PrintId(query_id()).c_str());
|
|
writer.String("session_id");
|
|
writer.String(PrintId(session_id()).c_str());
|
|
writer.String("start_time");
|
|
writer.String(ToStringFromUnixMicros(start_time_us()).c_str());
|
|
writer.String("authorization_failure");
|
|
writer.Bool(Frontend::IsAuthorizationError(query_status));
|
|
writer.String("status");
|
|
writer.String(query_status.GetDetail().c_str());
|
|
writer.String("user");
|
|
writer.String(effective_user().c_str());
|
|
writer.String("impersonator");
|
|
if (do_as_user().empty()) {
|
|
// If there is no do_as_user() is empty, the "impersonator" field should be Null.
|
|
writer.Null();
|
|
} else {
|
|
// Otherwise, the delegator is the current connected user.
|
|
writer.String(connected_user().c_str());
|
|
}
|
|
writer.String("statement_type");
|
|
if (request.stmt_type == TStmtType::DDL) {
|
|
if (request.catalog_op_request.op_type == TCatalogOpType::DDL) {
|
|
writer.String(PrintValue(request.catalog_op_request.ddl_params.ddl_type).c_str());
|
|
} else {
|
|
writer.String(PrintValue(request.catalog_op_request.op_type).c_str());
|
|
}
|
|
} else {
|
|
writer.String(PrintValue(request.stmt_type).c_str());
|
|
}
|
|
writer.String("network_address");
|
|
writer.String(TNetworkAddressToString(
|
|
session()->network_address).c_str());
|
|
writer.String("sql_statement");
|
|
string stmt = replace_all_copy(sql_stmt(), "\n", " ");
|
|
Redact(&stmt);
|
|
writer.String(stmt.c_str());
|
|
writer.String("catalog_objects");
|
|
|
|
writer.StartArray();
|
|
for (const TAccessEvent& event: request.access_events) {
|
|
writer.StartObject();
|
|
writer.String("name");
|
|
writer.String(event.name.c_str());
|
|
writer.String("object_type");
|
|
writer.String(PrintValue(event.object_type).c_str());
|
|
writer.String("privilege");
|
|
writer.String(event.privilege.c_str());
|
|
writer.EndObject();
|
|
}
|
|
writer.EndArray();
|
|
writer.EndObject();
|
|
writer.EndObject();
|
|
Status status = parent_server_->AppendAuditEntry(buffer.GetString());
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "Unable to record audit event record: " << status.GetDetail();
|
|
if (FLAGS_abort_on_failed_audit_event) {
|
|
CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to "
|
|
"abort_on_failed_audit_event=true");
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
|
|
Status ClientRequestState::LogLineageRecord() {
|
|
const TExecRequest& request = exec_request();
|
|
if (request.stmt_type == TStmtType::EXPLAIN || (!request.__isset.query_exec_request &&
|
|
!request.__isset.catalog_op_request)) {
|
|
return Status::OK();
|
|
}
|
|
TLineageGraph lineage_graph;
|
|
if (request.__isset.query_exec_request &&
|
|
request.query_exec_request.__isset.lineage_graph) {
|
|
lineage_graph = request.query_exec_request.lineage_graph;
|
|
} else if (request.__isset.catalog_op_request &&
|
|
request.catalog_op_request.__isset.lineage_graph) {
|
|
lineage_graph = request.catalog_op_request.lineage_graph;
|
|
} else {
|
|
return Status::OK();
|
|
}
|
|
|
|
if (catalog_op_executor_ != nullptr && catalog_op_type() == TCatalogOpType::DDL) {
|
|
const TDdlExecResponse* response = ddl_exec_response();
|
|
//Set table location in the lineage graph. Currently, this is only set for external
|
|
// tables in frontend.
|
|
if (response->__isset.table_location) {
|
|
lineage_graph.__set_table_location(response->table_location);
|
|
}
|
|
// Update vertices that have -1 table_create_time for a newly created table/view.
|
|
if (response->__isset.table_name &&
|
|
response->__isset.table_create_time) {
|
|
for (auto &vertex: lineage_graph.vertices) {
|
|
if (!vertex.__isset.metadata) continue;
|
|
if (vertex.metadata.table_name == response->table_name &&
|
|
vertex.metadata.table_create_time == -1) {
|
|
vertex.metadata.__set_table_create_time(response->table_create_time);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set the query end time in TLineageGraph. Must use UNIX time directly rather than
|
|
// e.g. converting from end_time() (IMPALA-4440).
|
|
lineage_graph.__set_ended(UnixMillis() / 1000);
|
|
|
|
string lineage_record;
|
|
LineageUtil::TLineageToJSON(lineage_graph, &lineage_record);
|
|
|
|
if (parent_server_->AreQueryHooksEnabled()) {
|
|
// invoke QueryEventHooks
|
|
TQueryCompleteContext query_complete_context;
|
|
query_complete_context.__set_lineage_string(lineage_record);
|
|
const Status& status = ExecEnv::GetInstance()->frontend()->CallQueryCompleteHooks(
|
|
query_complete_context);
|
|
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "Failed to send query lineage info to FE CallQueryCompleteHooks"
|
|
<< status.GetDetail();
|
|
if (FLAGS_abort_on_failed_lineage_event) {
|
|
CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to "
|
|
"abort_on_failed_lineage_event=true");
|
|
}
|
|
}
|
|
}
|
|
|
|
// lineage logfile writing is deprecated in favor of the
|
|
// QueryEventHooks (see FE). this behavior is being retained
|
|
// for now but may be removed in the future.
|
|
if (parent_server_->IsLineageLoggingEnabled()) {
|
|
const Status& status = parent_server_->AppendLineageEntry(lineage_record);
|
|
if (!status.ok()) {
|
|
LOG(ERROR) << "Unable to record query lineage record: " << status.GetDetail();
|
|
if (FLAGS_abort_on_failed_lineage_event) {
|
|
CLEAN_EXIT_WITH_ERROR("Shutting down Impala Server due to "
|
|
"abort_on_failed_lineage_event=true");
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
string ClientRequestState::ExecStateToString(ExecState state) {
|
|
static const unordered_map<ClientRequestState::ExecState, const char*>
|
|
exec_state_strings{{ClientRequestState::ExecState::INITIALIZED, "INITIALIZED"},
|
|
{ClientRequestState::ExecState::PENDING, "PENDING"},
|
|
{ClientRequestState::ExecState::RUNNING, "RUNNING"},
|
|
{ClientRequestState::ExecState::FINISHED, "FINISHED"},
|
|
{ClientRequestState::ExecState::ERROR, "ERROR"}};
|
|
return exec_state_strings.at(state);
|
|
}
|
|
|
|
string ClientRequestState::RetryStateToString(RetryState state) {
|
|
static const unordered_map<ClientRequestState::RetryState, const char*>
|
|
retry_state_strings{{ClientRequestState::RetryState::NOT_RETRIED, "NOT_RETRIED"},
|
|
{ClientRequestState::RetryState::RETRYING, "RETRYING"},
|
|
{ClientRequestState::RetryState::RETRIED, "RETRIED"}};
|
|
return retry_state_strings.at(state);
|
|
}
|
|
|
|
TCatalogServiceRequestHeader ClientRequestState::GetCatalogServiceRequestHeader() {
|
|
TCatalogServiceRequestHeader header = TCatalogServiceRequestHeader();
|
|
header.__set_requesting_user(effective_user());
|
|
header.__set_client_ip(session()->network_address.hostname);
|
|
header.__set_want_minimal_response(FLAGS_use_local_catalog);
|
|
header.__set_redacted_sql_stmt(
|
|
query_ctx_.client_request.__isset.redacted_stmt ?
|
|
query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt);
|
|
header.__set_query_id(query_ctx_.query_id);
|
|
header.__set_coordinator_hostname(FLAGS_hostname);
|
|
return header;
|
|
}
|
|
|
|
void ClientRequestState::RegisterRPC() {
|
|
RpcEventHandler::InvocationContext* rpc_context =
|
|
RpcEventHandler::GetThreadRPCContext();
|
|
// The existence of rpc_context means that this is called from an RPC
|
|
if (rpc_context) {
|
|
lock_guard<mutex> l(lock_);
|
|
if (track_rpcs_ && pending_rpcs_.find(rpc_context) == pending_rpcs_.end()) {
|
|
rpc_context->Register();
|
|
pending_rpcs_.insert(rpc_context);
|
|
rpc_count_->Add(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::UnRegisterCompletedRPCs() {
|
|
lock_guard<mutex> l(lock_);
|
|
for (auto iter = pending_rpcs_.begin(); iter != pending_rpcs_.end();) {
|
|
RpcEventHandler::InvocationContext* rpc_context = *iter;
|
|
uint64_t read_ns = 0, write_ns = 0;
|
|
if (rpc_context->UnRegisterCompleted(read_ns, write_ns)) {
|
|
rpc_read_timer_->Add(read_ns);
|
|
rpc_write_timer_->Add(write_ns);
|
|
iter = pending_rpcs_.erase(iter);
|
|
} else {
|
|
++iter;
|
|
}
|
|
}
|
|
}
|
|
|
|
void ClientRequestState::UnRegisterRemainingRPCs() {
|
|
lock_guard<mutex> l(lock_);
|
|
for (auto rpc_context: pending_rpcs_) {
|
|
rpc_context->UnRegister();
|
|
}
|
|
track_rpcs_ = false;
|
|
pending_rpcs_.clear();
|
|
}
|
|
|
|
void ClientRequestState::CopyRPCs(ClientRequestState& from_request) {
|
|
lock_guard<mutex> l_to(lock_);
|
|
lock_guard<mutex> l_from(from_request.lock_);
|
|
rpc_read_timer_->Add(from_request.rpc_read_timer_->value());
|
|
rpc_write_timer_->Add(from_request.rpc_write_timer_->value());
|
|
rpc_count_->Add(from_request.rpc_count_->value());
|
|
for (auto rpc_context: from_request.pending_rpcs_) {
|
|
rpc_context->Register();
|
|
pending_rpcs_.insert(rpc_context);
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::ExecMigrateRequest() {
|
|
ExecMigrateRequestImpl();
|
|
SetResultSet({"Table has been migrated."});
|
|
return query_status_;
|
|
}
|
|
|
|
void ClientRequestState::ExecMigrateRequestImpl() {
|
|
// A convert table request holds the query strings for the sub-queries. These are
|
|
// populated by ConvertTableToIcebergStmt in the Frontend during analysis.
|
|
const TConvertTableRequest& params = exec_request().convert_table_request;
|
|
{
|
|
RuntimeProfile* child_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Child Queries 1");
|
|
profile_->AddChild(child_profile);
|
|
vector<ChildQuery> child_queries;
|
|
|
|
// Prepare: SET some table properties for the original table.
|
|
RuntimeProfile* set_hdfs_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Set properties for HDFS table query");
|
|
child_profile->AddChild(set_hdfs_table_profile);
|
|
child_queries.emplace_back(params.set_hdfs_table_properties_query, this,
|
|
parent_server_, set_hdfs_table_profile, &profile_pool_);
|
|
|
|
// Prepare: RENAME the HDFS table to a temporary HDFS table.
|
|
RuntimeProfile* rename_hdfs_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Rename HDFS table query");
|
|
child_profile->AddChild(rename_hdfs_table_profile);
|
|
child_queries.emplace_back(params.rename_hdfs_table_to_temporary_query,
|
|
this, parent_server_, rename_hdfs_table_profile, &profile_pool_);
|
|
|
|
// Prepare: REFRESH the temporary HDFS table.
|
|
RuntimeProfile* refresh_hdfs_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Refresh temporary HDFS table query");
|
|
child_profile->AddChild(refresh_hdfs_table_profile);
|
|
child_queries.emplace_back(params.refresh_temporary_hdfs_table_query, this,
|
|
parent_server_, refresh_hdfs_table_profile, &profile_pool_);
|
|
|
|
// Execute child queries
|
|
unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor());
|
|
RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries)));
|
|
vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
|
|
Status query_status = query_executor->WaitForAll(completed_queries);
|
|
if (!query_status.ok()) AddTableResetHints(params, &query_status);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status));
|
|
}
|
|
}
|
|
// Create an external Iceberg table using the data of the HDFS table.
|
|
Status status = frontend_->Convert(exec_request());
|
|
if (!status.ok()) AddTableResetHints(params, &status);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
|
|
}
|
|
{
|
|
RuntimeProfile* child_profile =
|
|
RuntimeProfile::Create(&profile_pool_, "Child Queries 2");
|
|
profile_->AddChild(child_profile);
|
|
vector<ChildQuery> child_queries;
|
|
|
|
if (params.__isset.create_iceberg_table_query) {
|
|
// Prepare: CREATE the Iceberg table that inherits HDFS table location.
|
|
RuntimeProfile* create_iceberg_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Create Iceberg table query");
|
|
child_profile->AddChild(create_iceberg_table_profile);
|
|
child_queries.emplace_back(params.create_iceberg_table_query, this,
|
|
parent_server_, create_iceberg_table_profile, &profile_pool_);
|
|
} else {
|
|
// Prepare: Invalidate metadata for tables in Hive catalog to guarantee that it is
|
|
// propagated instantly to all coordinators.
|
|
RuntimeProfile* invalidate_metadata_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Invalidate metadata Iceberg table query");
|
|
child_profile->AddChild(invalidate_metadata_profile);
|
|
child_queries.emplace_back(params.invalidate_metadata_query, this,
|
|
parent_server_, invalidate_metadata_profile, &profile_pool_);
|
|
}
|
|
|
|
if (params.__isset.post_create_alter_table_query) {
|
|
// Prepare: ALTER TABLE query after creating the Iceberg table.
|
|
RuntimeProfile* post_create_alter_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "ALTER TABLE after create Iceberg table query");
|
|
child_profile->AddChild(post_create_alter_table_profile);
|
|
child_queries.emplace_back(params.post_create_alter_table_query, this,
|
|
parent_server_, post_create_alter_table_profile, &profile_pool_);
|
|
}
|
|
|
|
// Prepare: DROP the temporary HDFS table.
|
|
RuntimeProfile* drop_tmp_hdfs_table_profile = RuntimeProfile::Create(
|
|
&profile_pool_, "Drop temporary HDFS table query");
|
|
child_profile->AddChild(drop_tmp_hdfs_table_profile);
|
|
child_queries.emplace_back(params.drop_temporary_hdfs_table_query, this,
|
|
parent_server_, drop_tmp_hdfs_table_profile, &profile_pool_);
|
|
|
|
// Execute queries
|
|
unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor());
|
|
RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries)));
|
|
vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
|
|
Status query_status = query_executor->WaitForAll(completed_queries);
|
|
{
|
|
lock_guard<mutex> l(lock_);
|
|
RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status));
|
|
}
|
|
}
|
|
}
|
|
|
|
Status ClientRequestState::TryKillQueryLocally(
|
|
const TUniqueId& query_id, const string& requesting_user, bool is_admin) {
|
|
Status status = ExecEnv::GetInstance()->impala_server()->KillQuery(
|
|
query_id, requesting_user, is_admin);
|
|
if (status.ok()) {
|
|
SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
|
|
return query_status_;
|
|
}
|
|
return status;
|
|
}
|
|
|
|
Status ClientRequestState::TryKillQueryRemotely(
|
|
const TUniqueId& query_id, const KillQueryRequestPB& request) {
|
|
// The initial status should be INVALID_QUERY_HANDLE so that if there is no other
|
|
// coordinator in the cluster, it will be the status to return.
|
|
Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
|
|
ExecutorGroup all_coordinators =
|
|
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators;
|
|
// Skipping the current impalad.
|
|
unique_ptr<ExecutorGroup> other_coordinators{ExecutorGroup::GetFilteredExecutorGroup(
|
|
&all_coordinators, {ExecEnv::GetInstance()->krpc_address()})};
|
|
// If we get an RPC error, instead of returning immediately, we record it and move
|
|
// on to the next coordinator.
|
|
Status rpc_errors = Status::OK();
|
|
for (const auto& backend : other_coordinators->GetAllExecutorDescriptors()) {
|
|
// The logic here is similar to ExecShutdownRequest()
|
|
NetworkAddressPB krpc_addr = MakeNetworkAddressPB(backend.ip_address(),
|
|
backend.address().port(), backend.backend_id(),
|
|
ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId());
|
|
VLOG_QUERY << "Sending KillQuery() RPC to " << NetworkAddressPBToString(krpc_addr);
|
|
unique_ptr<ControlServiceProxy> proxy;
|
|
Status get_proxy_status =
|
|
ControlService::GetProxy(krpc_addr, backend.address().hostname(), &proxy);
|
|
if (!get_proxy_status.ok()) {
|
|
Status get_proxy_status_to_report{Substitute(
|
|
"KillQuery: Could not get Proxy to ControlService at $0 with error: $1.",
|
|
NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())};
|
|
rpc_errors.MergeStatus(get_proxy_status_to_report);
|
|
LOG(ERROR) << get_proxy_status_to_report.GetDetail();
|
|
continue;
|
|
}
|
|
KillQueryResponsePB response;
|
|
const int num_retries = 3;
|
|
const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
|
|
const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
|
|
// Currently, a KILL QUERY statement is not interruptible.
|
|
Status rpc_status = RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::KillQuery,
|
|
request, &response, query_ctx_, "KillQuery() RPC failed", num_retries, timeout_ms,
|
|
backoff_time_ms, "CRS_KILL_QUERY_RPC");
|
|
if (!rpc_status.ok()) {
|
|
LOG(ERROR) << rpc_status.GetDetail();
|
|
rpc_errors.MergeStatus(rpc_status);
|
|
continue;
|
|
}
|
|
// Currently, we only support killing one query in one KILL QUERY statement.
|
|
DCHECK_EQ(response.statuses_size(), 1);
|
|
status = Status(response.statuses(0));
|
|
if (status.ok()) {
|
|
// Kill succeeded.
|
|
VLOG_QUERY << "KillQuery: Found the coordinator at "
|
|
<< NetworkAddressPBToString(krpc_addr);
|
|
SetResultSet({Substitute("Query $0 is killed.", PrintId(query_id))});
|
|
return query_status_;
|
|
} else if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
|
|
LOG(ERROR) << "KillQuery: Found the coordinator at "
|
|
<< NetworkAddressPBToString(krpc_addr)
|
|
<< " but failed to kill the query: "
|
|
<< status.GetDetail();
|
|
// Kill failed, but we found the coordinator of the query.
|
|
return status;
|
|
}
|
|
}
|
|
// We did't find the coordinator of the query after trying all other coordinators.
|
|
// If there is any RPC error, return it.
|
|
if (!rpc_errors.ok()) {
|
|
return rpc_errors;
|
|
}
|
|
// If there is no RPC error, return INVALID_QUERY_HANDLE.
|
|
return status;
|
|
}
|
|
|
|
Status ClientRequestState::ExecKillQueryRequest() {
|
|
TUniqueId query_id = exec_request().kill_query_request.query_id;
|
|
string requesting_user = exec_request().kill_query_request.requesting_user;
|
|
bool is_admin = exec_request().kill_query_request.is_admin;
|
|
|
|
VLOG_QUERY << "Exec KillQuery: query_id=" << PrintId(query_id)
|
|
<< ", requesting_user=" << requesting_user << ", is_admin=" << is_admin;
|
|
|
|
// First try cancelling the query locally.
|
|
Status status = TryKillQueryLocally(query_id, requesting_user, is_admin);
|
|
if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
|
|
return status;
|
|
}
|
|
|
|
// The current impalad is NOT the coordinator of the query. Now we have to broadcast
|
|
// the kill request to all other coordinators.
|
|
UniqueIdPB query_id_pb;
|
|
TUniqueIdToUniqueIdPB(query_id, &query_id_pb);
|
|
KillQueryRequestPB request;
|
|
*request.add_query_ids() = query_id_pb;
|
|
*request.mutable_requesting_user() = requesting_user;
|
|
request.set_is_admin(is_admin);
|
|
status = TryKillQueryRemotely(query_id, request);
|
|
if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) {
|
|
return status;
|
|
}
|
|
// All the error messages are "Invalid or unknown query handle".
|
|
return Status("Could not find query on any coordinator.");
|
|
}
|
|
|
|
void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params,
|
|
Status* status) const {
|
|
string table_reset_hint("Your table might have been renamed. To reset the name "
|
|
"try running:\n" + params.reset_table_name_query + ";");
|
|
status->MergeStatus(Status(table_reset_hint));
|
|
}
|
|
|
|
int64_t ClientRequestState::num_rows_fetched_counter() const {
|
|
if (LIKELY(num_rows_fetched_counter_ != nullptr)) {
|
|
return num_rows_fetched_counter_->value();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int64_t ClientRequestState::row_materialization_rate() const {
|
|
if (LIKELY(row_materialization_rate_ != nullptr)) {
|
|
return row_materialization_rate_->value();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int64_t ClientRequestState::row_materialization_timer() const {
|
|
if (LIKELY(row_materialization_timer_ != nullptr)) {
|
|
return row_materialization_timer_->value();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void ClientRequestState::AddCatalogTimeline() {
|
|
if (catalog_op_executor_ != nullptr
|
|
&& catalog_op_executor_->catalog_profile() != nullptr) {
|
|
for (const TEventSequence& catalog_timeline :
|
|
catalog_op_executor_->catalog_profile()->event_sequences) {
|
|
summary_profile_->AddEventSequence(catalog_timeline.name, catalog_timeline);
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|