mirror of
https://github.com/apache/impala.git
synced 2025-12-23 21:08:39 -05:00
IMPALA-9181: Serialize TQueryCtx once per query
When issuing Exec() rpcs to backends, we currently serialize the TQueryCtx once per backend. This is inefficient as the TQueryCtx is the same for all backends and really only needs to be serialized once. Serializing the TQueryCtx can be expensive as it contains both the full text of the original query and the descriptor table, which can be quite large. In a synthetic dataset I tested with, scanning a table with 100k partitions leads to a descriptor table size of ~20MB. This patch serializes the TQueryCtx in the coordinator and then passes it to each BackendState when calling Exec(). Followup work might consider if we really need all of the info in the TQueryCtx to be distributed to all backends. Testing: - Passed full run of existing tests. - Single node perf run showed no significant change. Change-Id: I6a4dd302fd5602ec2775492a041ddd51e7d7a6c6 Reviewed-on: http://gerrit.cloudera.org:8080/14777 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
1dc729f2b2
commit
a1588e4498
@@ -26,6 +26,7 @@
|
||||
#include "kudu/rpc/rpc_controller.h"
|
||||
#include "kudu/rpc/rpc_sidecar.h"
|
||||
#include "kudu/util/monotime.h"
|
||||
#include "kudu/util/slice.h"
|
||||
#include "kudu/util/status.h"
|
||||
#include "rpc/rpc-mgr.inline.h"
|
||||
#include "runtime/backend-client.h"
|
||||
@@ -105,7 +106,7 @@ void Coordinator::BackendState::Init(const vector<FragmentStats*>& fragment_stat
|
||||
|
||||
void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
|
||||
const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
|
||||
TExecQueryFInstancesSidecar* sidecar) {
|
||||
TExecPlanFragmentInfo* fragment_info) {
|
||||
request->set_coord_state_idx(state_idx_);
|
||||
request->set_min_mem_reservation_bytes(backend_exec_params_->min_mem_reservation_bytes);
|
||||
request->set_initial_mem_reservation_total_claims(
|
||||
@@ -113,19 +114,20 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
|
||||
request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit());
|
||||
|
||||
// set fragment_ctxs and fragment_instance_ctxs
|
||||
sidecar->__isset.fragment_ctxs = true;
|
||||
sidecar->__isset.fragment_instance_ctxs = true;
|
||||
sidecar->fragment_instance_ctxs.resize(backend_exec_params_->instance_params.size());
|
||||
fragment_info->__isset.fragment_ctxs = true;
|
||||
fragment_info->__isset.fragment_instance_ctxs = true;
|
||||
fragment_info->fragment_instance_ctxs.resize(
|
||||
backend_exec_params_->instance_params.size());
|
||||
for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
|
||||
TPlanFragmentInstanceCtx& instance_ctx = sidecar->fragment_instance_ctxs[i];
|
||||
TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i];
|
||||
const FInstanceExecParams& params = *backend_exec_params_->instance_params[i];
|
||||
int fragment_idx = params.fragment_exec_params.fragment.idx;
|
||||
|
||||
// add a TPlanFragmentCtx, if we don't already have it
|
||||
if (sidecar->fragment_ctxs.empty()
|
||||
|| sidecar->fragment_ctxs.back().fragment.idx != fragment_idx) {
|
||||
sidecar->fragment_ctxs.emplace_back();
|
||||
TPlanFragmentCtx& fragment_ctx = sidecar->fragment_ctxs.back();
|
||||
if (fragment_info->fragment_ctxs.empty()
|
||||
|| fragment_info->fragment_ctxs.back().fragment.idx != fragment_idx) {
|
||||
fragment_info->fragment_ctxs.emplace_back();
|
||||
TPlanFragmentCtx& fragment_ctx = fragment_info->fragment_ctxs.back();
|
||||
fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
|
||||
fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
|
||||
}
|
||||
@@ -163,10 +165,9 @@ void Coordinator::BackendState::SetExecError(const Status& status) {
|
||||
status_ = Status::Expected(err_msg);
|
||||
}
|
||||
|
||||
void Coordinator::BackendState::Exec(
|
||||
const DebugOptions& debug_options,
|
||||
void Coordinator::BackendState::Exec(const DebugOptions& debug_options,
|
||||
const FilterRoutingTable& filter_routing_table,
|
||||
CountingBarrier* exec_complete_barrier) {
|
||||
const kudu::Slice& serialized_query_ctx, CountingBarrier* exec_complete_barrier) {
|
||||
const auto trigger = MakeScopeExitTrigger([&]() {
|
||||
// Ensure that 'last_report_time_ms_' is set prior to the barrier being notified.
|
||||
last_report_time_ms_ = GenerateReportTimestamp();
|
||||
@@ -188,9 +189,8 @@ void Coordinator::BackendState::Exec(
|
||||
}
|
||||
|
||||
ExecQueryFInstancesRequestPB request;
|
||||
TExecQueryFInstancesSidecar sidecar;
|
||||
sidecar.__set_query_ctx(query_ctx_);
|
||||
SetRpcParams(debug_options, filter_routing_table, &request, &sidecar);
|
||||
TExecPlanFragmentInfo fragment_info;
|
||||
SetRpcParams(debug_options, filter_routing_table, &request, &fragment_info);
|
||||
|
||||
RpcController rpc_controller;
|
||||
rpc_controller.set_timeout(
|
||||
@@ -202,7 +202,7 @@ void Coordinator::BackendState::Exec(
|
||||
uint8_t* serialized_buf = nullptr;
|
||||
uint32_t serialized_len = 0;
|
||||
Status serialize_status =
|
||||
serializer.SerializeToBuffer(&sidecar, &serialized_len, &serialized_buf);
|
||||
serializer.SerializeToBuffer(&fragment_info, &serialized_len, &serialized_buf);
|
||||
if (UNLIKELY(!serialize_status.ok())) {
|
||||
SetExecError(serialize_status);
|
||||
return;
|
||||
@@ -212,6 +212,7 @@ void Coordinator::BackendState::Exec(
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: eliminate the extra copy here by using a Slice
|
||||
unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
|
||||
sidecar_buf->assign_copy(serialized_buf, serialized_len);
|
||||
unique_ptr<RpcSidecar> rpc_sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
|
||||
@@ -223,7 +224,19 @@ void Coordinator::BackendState::Exec(
|
||||
SetExecError(FromKuduStatus(sidecar_status, "Failed to add sidecar"));
|
||||
return;
|
||||
}
|
||||
request.set_sidecar_idx(sidecar_idx);
|
||||
request.set_plan_fragment_info_sidecar_idx(sidecar_idx);
|
||||
|
||||
// Add the serialized TQueryCtx as a sidecar.
|
||||
unique_ptr<RpcSidecar> query_ctx_sidecar = RpcSidecar::FromSlice(serialized_query_ctx);
|
||||
int query_ctx_sidecar_idx;
|
||||
kudu::Status query_ctx_sidecar_status =
|
||||
rpc_controller.AddOutboundSidecar(move(query_ctx_sidecar), &query_ctx_sidecar_idx);
|
||||
if (!query_ctx_sidecar_status.ok()) {
|
||||
SetExecError(
|
||||
FromKuduStatus(query_ctx_sidecar_status, "Failed to add TQueryCtx sidecar"));
|
||||
return;
|
||||
}
|
||||
request.set_query_ctx_sidecar_idx(query_ctx_sidecar_idx);
|
||||
|
||||
VLOG_FILE << "making rpc: ExecQueryFInstances"
|
||||
<< " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
|
||||
|
||||
@@ -39,6 +39,10 @@
|
||||
#include "util/runtime-profile.h"
|
||||
#include "util/stopwatch.h"
|
||||
|
||||
namespace kudu {
|
||||
class Slice;
|
||||
}
|
||||
|
||||
namespace impala {
|
||||
|
||||
class ProgressUpdater;
|
||||
@@ -78,7 +82,7 @@ class Coordinator::BackendState {
|
||||
/// on their node_id/instance_idx.
|
||||
void Exec(const DebugOptions& debug_options,
|
||||
const FilterRoutingTable& filter_routing_table,
|
||||
CountingBarrier* rpc_complete_barrier);
|
||||
const kudu::Slice& serialized_query_ctx, CountingBarrier* rpc_complete_barrier);
|
||||
|
||||
/// Update overall execution status, including the instances' exec status/profiles
|
||||
/// and the error log, if this backend is not already done. Updates the fragment
|
||||
@@ -321,11 +325,11 @@ class Coordinator::BackendState {
|
||||
/// The query id of the Coordinator that owns this BackendState.
|
||||
const TUniqueId& query_id_;
|
||||
|
||||
/// Fill in 'request' and 'sidecar' based on state. Uses filter_routing_table to remove
|
||||
/// filters that weren't selected during its construction.
|
||||
/// Fill in 'request' and 'fragment_info' based on state. Uses 'filter_routing_table' to
|
||||
/// remove filters that weren't selected during its construction.
|
||||
void SetRpcParams(const DebugOptions& debug_options,
|
||||
const FilterRoutingTable& filter_routing_table,
|
||||
ExecQueryFInstancesRequestPB* request, TExecQueryFInstancesSidecar* sidecar);
|
||||
ExecQueryFInstancesRequestPB* request, TExecPlanFragmentInfo* fragment_info);
|
||||
|
||||
/// Expects that 'status' is an error. Sets 'status_' to a formatted version of its
|
||||
/// message.
|
||||
|
||||
@@ -136,7 +136,7 @@ Status Coordinator::Exec() {
|
||||
// runtime-related state changes past this point (examples: fragment instance
|
||||
// profiles, etc.)
|
||||
|
||||
StartBackendExec();
|
||||
RETURN_IF_ERROR(StartBackendExec());
|
||||
RETURN_IF_ERROR(FinishBackendStartup());
|
||||
|
||||
// set coord_instance_ and coord_sink_
|
||||
@@ -358,7 +358,7 @@ void Coordinator::InitFilterRoutingTable() {
|
||||
filter_routing_table_->is_complete = true;
|
||||
}
|
||||
|
||||
void Coordinator::StartBackendExec() {
|
||||
Status Coordinator::StartBackendExec() {
|
||||
int num_backends = backend_states_.size();
|
||||
backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
|
||||
|
||||
@@ -368,9 +368,21 @@ void Coordinator::StartBackendExec() {
|
||||
<< PrintId(query_id());
|
||||
query_events_->MarkEvent(Substitute("Ready to start on $0 backends", num_backends));
|
||||
|
||||
// Serialize the TQueryCtx once and pass it to each backend. The serialized buffer must
|
||||
// stay valid until exec_rpcs_complete_barrier_ has been signalled.
|
||||
ThriftSerializer serializer(true);
|
||||
uint8_t* serialized_buf = nullptr;
|
||||
uint32_t serialized_len = 0;
|
||||
Status serialize_status =
|
||||
serializer.SerializeToBuffer(&query_ctx(), &serialized_len, &serialized_buf);
|
||||
if (UNLIKELY(!serialize_status.ok())) {
|
||||
return UpdateExecState(serialize_status, nullptr, FLAGS_hostname);
|
||||
}
|
||||
kudu::Slice query_ctx_slice(serialized_buf, serialized_len);
|
||||
|
||||
for (BackendState* backend_state: backend_states_) {
|
||||
ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
|
||||
[backend_state, this, &debug_options]() {
|
||||
[backend_state, this, &debug_options, &query_ctx_slice]() {
|
||||
DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC");
|
||||
// Safe for Exec() to read 'filter_routing_table_' because it is complete
|
||||
// at this point and won't be destroyed while this function is executing,
|
||||
@@ -378,8 +390,8 @@ void Coordinator::StartBackendExec() {
|
||||
// signalled.
|
||||
DCHECK(filter_mode_ == TRuntimeFilterMode::OFF
|
||||
|| filter_routing_table_->is_complete);
|
||||
backend_state->Exec(
|
||||
debug_options, *filter_routing_table_, &exec_rpcs_complete_barrier_);
|
||||
backend_state->Exec(debug_options, *filter_routing_table_, query_ctx_slice,
|
||||
&exec_rpcs_complete_barrier_);
|
||||
});
|
||||
}
|
||||
exec_rpcs_complete_barrier_.Wait();
|
||||
@@ -389,6 +401,7 @@ void Coordinator::StartBackendExec() {
|
||||
query_events_->MarkEvent(
|
||||
Substitute("All $0 execution backends ($1 fragment instances) started",
|
||||
num_backends, schedule_.GetNumFragmentInstances()));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status Coordinator::FinishBackendStartup() {
|
||||
|
||||
@@ -485,7 +485,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
|
||||
|
||||
/// Helper for Exec(). Populates backend_states_, starts query execution at all
|
||||
/// backends in parallel, and blocks until startup completes.
|
||||
void StartBackendExec();
|
||||
Status StartBackendExec();
|
||||
|
||||
/// Helper for Exec(). Checks for errors encountered when starting backend execution,
|
||||
/// using any non-OK status, if any, as the overall status. Returns the overall
|
||||
|
||||
@@ -43,14 +43,14 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory
|
||||
"every log_mem_usage_interval'th fragment completion.");
|
||||
|
||||
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
|
||||
const TExecQueryFInstancesSidecar& sidecar) {
|
||||
TUniqueId query_id = sidecar.query_ctx.query_id;
|
||||
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
|
||||
TUniqueId query_id = query_ctx.query_id;
|
||||
VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
|
||||
<< " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address);
|
||||
<< " coord=" << TNetworkAddressToString(query_ctx.coord_address);
|
||||
bool dummy;
|
||||
QueryState* qs =
|
||||
GetOrCreateQueryState(sidecar.query_ctx, request->per_backend_mem_limit(), &dummy);
|
||||
Status status = qs->Init(request, sidecar);
|
||||
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
|
||||
Status status = qs->Init(request, fragment_info);
|
||||
if (!status.ok()) {
|
||||
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
|
||||
ReleaseQueryState(qs);
|
||||
|
||||
@@ -50,7 +50,7 @@ class QueryExecMgr : public CacheLineAligned {
|
||||
/// After this function returns, it is legal to call QueryState::Cancel(), regardless of
|
||||
/// the return value of this function.
|
||||
Status StartQuery(const ExecQueryFInstancesRequestPB* request,
|
||||
const TExecQueryFInstancesSidecar& sidecar);
|
||||
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info);
|
||||
|
||||
/// Creates a QueryState for the given query with the provided parameters. Only valid
|
||||
/// to call if the QueryState does not already exist. The caller must call
|
||||
|
||||
@@ -135,7 +135,7 @@ QueryState::~QueryState() {
|
||||
}
|
||||
|
||||
Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
|
||||
const TExecQueryFInstancesSidecar& sidecar) {
|
||||
const TExecPlanFragmentInfo& fragment_info) {
|
||||
// Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
|
||||
// Init() on failure. We need to do this before any returns because Init() always
|
||||
// returns a resource refcount to its caller.
|
||||
@@ -197,17 +197,18 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
|
||||
|
||||
// don't copy query_ctx, it's large and we already did that in the c'tor
|
||||
exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
|
||||
TExecQueryFInstancesSidecar& non_const_params =
|
||||
const_cast<TExecQueryFInstancesSidecar&>(sidecar);
|
||||
exec_rpc_sidecar_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
|
||||
exec_rpc_sidecar_.__isset.fragment_ctxs = true;
|
||||
exec_rpc_sidecar_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
|
||||
exec_rpc_sidecar_.__isset.fragment_instance_ctxs = true;
|
||||
TExecPlanFragmentInfo& non_const_fragment_info =
|
||||
const_cast<TExecPlanFragmentInfo&>(fragment_info);
|
||||
fragment_info_.fragment_ctxs.swap(non_const_fragment_info.fragment_ctxs);
|
||||
fragment_info_.__isset.fragment_ctxs = true;
|
||||
fragment_info_.fragment_instance_ctxs.swap(
|
||||
non_const_fragment_info.fragment_instance_ctxs);
|
||||
fragment_info_.__isset.fragment_instance_ctxs = true;
|
||||
|
||||
instances_prepared_barrier_.reset(
|
||||
new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
|
||||
new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
|
||||
instances_finished_barrier_.reset(
|
||||
new CountingBarrier(exec_rpc_sidecar_.fragment_instance_ctxs.size()));
|
||||
new CountingBarrier(fragment_info_.fragment_instance_ctxs.size()));
|
||||
|
||||
// Claim the query-wide minimum reservation. Do this last so that we don't need
|
||||
// to handle releasing it if a later step fails.
|
||||
@@ -302,8 +303,8 @@ void QueryState::ConstructReport(bool instances_started,
|
||||
ReportExecStatusRequestPB* report, TRuntimeProfileForest* profiles_forest) {
|
||||
report->Clear();
|
||||
TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id());
|
||||
DCHECK(exec_rpc_params().has_coord_state_idx());
|
||||
report->set_coord_state_idx(exec_rpc_params().coord_state_idx());
|
||||
DCHECK(exec_rpc_params_.has_coord_state_idx());
|
||||
report->set_coord_state_idx(exec_rpc_params_.coord_state_idx());
|
||||
{
|
||||
std::unique_lock<SpinLock> l(status_lock_);
|
||||
overall_status_.ToProto(report->mutable_overall_status());
|
||||
@@ -504,13 +505,13 @@ bool QueryState::WaitForFinishOrTimeout(int32_t timeout_ms) {
|
||||
|
||||
bool QueryState::StartFInstances() {
|
||||
VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
|
||||
<< " #instances=" << exec_rpc_sidecar_.fragment_instance_ctxs.size();
|
||||
<< " #instances=" << fragment_info_.fragment_instance_ctxs.size();
|
||||
DCHECK_GT(refcnt_.Load(), 0);
|
||||
DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
|
||||
|
||||
DCHECK_GT(exec_rpc_sidecar_.fragment_ctxs.size(), 0);
|
||||
TPlanFragmentCtx* fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[0];
|
||||
int num_unstarted_instances = exec_rpc_sidecar_.fragment_instance_ctxs.size();
|
||||
DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0);
|
||||
TPlanFragmentCtx* fragment_ctx = &fragment_info_.fragment_ctxs[0];
|
||||
int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
|
||||
int fragment_ctx_idx = 0;
|
||||
|
||||
// set up desc tbl
|
||||
@@ -523,12 +524,12 @@ bool QueryState::StartFInstances() {
|
||||
|
||||
fragment_events_start_time_ = MonotonicStopWatch::Now();
|
||||
for (const TPlanFragmentInstanceCtx& instance_ctx :
|
||||
exec_rpc_sidecar_.fragment_instance_ctxs) {
|
||||
fragment_info_.fragment_instance_ctxs) {
|
||||
// determine corresponding TPlanFragmentCtx
|
||||
if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
|
||||
++fragment_ctx_idx;
|
||||
DCHECK_LT(fragment_ctx_idx, exec_rpc_sidecar_.fragment_ctxs.size());
|
||||
fragment_ctx = &exec_rpc_sidecar_.fragment_ctxs[fragment_ctx_idx];
|
||||
DCHECK_LT(fragment_ctx_idx, fragment_info_.fragment_ctxs.size());
|
||||
fragment_ctx = &fragment_info_.fragment_ctxs[fragment_ctx_idx];
|
||||
// we expect fragment and instance contexts to follow the same order
|
||||
DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
|
||||
}
|
||||
@@ -646,7 +647,7 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
|
||||
<< " fragment_idx=" << fis->instance_ctx().fragment_idx
|
||||
<< " per_fragment_instance_idx="
|
||||
<< fis->instance_ctx().per_fragment_instance_idx
|
||||
<< " coord_state_idx=" << exec_rpc_params().coord_state_idx()
|
||||
<< " coord_state_idx=" << exec_rpc_params_.coord_state_idx()
|
||||
<< " #in-flight="
|
||||
<< ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
|
||||
Status status = fis->Exec();
|
||||
|
||||
@@ -133,10 +133,6 @@ class QueryState {
|
||||
|
||||
/// The following getters are only valid after Init().
|
||||
ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
|
||||
const ExecQueryFInstancesRequestPB& exec_rpc_params() const { return exec_rpc_params_; }
|
||||
const TExecQueryFInstancesSidecar& exec_rpc_sidecar() const {
|
||||
return exec_rpc_sidecar_;
|
||||
}
|
||||
|
||||
/// The following getters are only valid after Init() and should be called only from
|
||||
/// the backend execution (ie. not the coordinator side, since they require holding
|
||||
@@ -173,7 +169,7 @@ class QueryState {
|
||||
/// Uses few cycles and never blocks. Not idempotent, not thread-safe.
|
||||
/// The remaining public functions must be called only after Init().
|
||||
Status Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
|
||||
const TExecQueryFInstancesSidecar& sidecar) WARN_UNUSED_RESULT;
|
||||
const TExecPlanFragmentInfo& fragment_info) WARN_UNUSED_RESULT;
|
||||
|
||||
/// Performs the runtime-intensive parts of initial setup and starts all fragment
|
||||
/// instances belonging to this query. Each instance receives its own execution
|
||||
@@ -319,11 +315,9 @@ class QueryState {
|
||||
/// Set in Init().
|
||||
std::unique_ptr<ControlServiceProxy> proxy_;
|
||||
|
||||
/// Set in Init(); exec_rpc_sidecar_.query_ctx is *not* set to avoid duplication
|
||||
/// with query_ctx_.
|
||||
/// TODO: find a way not to have to copy this
|
||||
/// Set in Init(). TODO: find a way not to have to copy this
|
||||
ExecQueryFInstancesRequestPB exec_rpc_params_;
|
||||
TExecQueryFInstancesSidecar exec_rpc_sidecar_;
|
||||
TExecPlanFragmentInfo fragment_info_;
|
||||
|
||||
/// Buffer reservation for this query (owned by obj_pool_). Set in Init().
|
||||
ReservationTracker* buffer_reservation_ = nullptr;
|
||||
|
||||
@@ -157,15 +157,13 @@ Status TestEnv::CreateQueryState(
|
||||
ExecQueryFInstancesRequestPB rpc_params;
|
||||
// create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
|
||||
rpc_params.set_coord_state_idx(0);
|
||||
TExecQueryFInstancesSidecar sidecar;
|
||||
sidecar.__set_query_ctx(TQueryCtx());
|
||||
sidecar.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
|
||||
sidecar.__set_fragment_instance_ctxs(
|
||||
TExecPlanFragmentInfo fragment_info;
|
||||
fragment_info.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
|
||||
fragment_info.__set_fragment_instance_ctxs(
|
||||
vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
|
||||
RETURN_IF_ERROR(qs->Init(&rpc_params, sidecar));
|
||||
FragmentInstanceState* fis = qs->obj_pool()->Add(
|
||||
new FragmentInstanceState(qs, qs->exec_rpc_sidecar().fragment_ctxs[0],
|
||||
qs->exec_rpc_sidecar().fragment_instance_ctxs[0]));
|
||||
RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
|
||||
FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
|
||||
qs->fragment_info_.fragment_ctxs[0], qs->fragment_info_.fragment_instance_ctxs[0]));
|
||||
RuntimeState* rs = qs->obj_pool()->Add(
|
||||
new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
|
||||
runtime_states_.push_back(rs);
|
||||
|
||||
@@ -113,15 +113,15 @@ Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ControlService::GetExecQueryFInstancesSidecar(
|
||||
const ExecQueryFInstancesRequestPB& request, RpcContext* rpc_context,
|
||||
TExecQueryFInstancesSidecar* sidecar) {
|
||||
// Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into
|
||||
// 'thrift_obj'.
|
||||
template <typename T>
|
||||
static Status GetSidecar(int sidecar_idx, RpcContext* rpc_context, T* thrift_obj) {
|
||||
kudu::Slice sidecar_slice;
|
||||
KUDU_RETURN_IF_ERROR(
|
||||
rpc_context->GetInboundSidecar(request.sidecar_idx(), &sidecar_slice),
|
||||
"Failed to get thrift profile sidecar");
|
||||
KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(sidecar_idx, &sidecar_slice),
|
||||
"Failed to get sidecar");
|
||||
uint32_t len = sidecar_slice.size();
|
||||
RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, sidecar));
|
||||
RETURN_IF_ERROR(DeserializeThriftMsg(sidecar_slice.data(), &len, true, thrift_obj));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -129,24 +129,35 @@ void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* req
|
||||
ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
|
||||
DebugActionNoFail(FLAGS_debug_actions, "EXEC_QUERY_FINSTANCES_DELAY");
|
||||
DCHECK(request->has_coord_state_idx());
|
||||
DCHECK(request->has_sidecar_idx());
|
||||
TExecQueryFInstancesSidecar sidecar;
|
||||
const Status& sidecar_status =
|
||||
GetExecQueryFInstancesSidecar(*request, rpc_context, &sidecar);
|
||||
if (!sidecar_status.ok()) {
|
||||
RespondAndReleaseRpc(sidecar_status, response, rpc_context);
|
||||
DCHECK(request->has_plan_fragment_info_sidecar_idx());
|
||||
DCHECK(request->has_query_ctx_sidecar_idx());
|
||||
// Deserialize the sidecars. The QueryState will make a copy of the TQueryCtx and
|
||||
// TExecPlanFragmentInfo, so we can deallocate the deserialized values after
|
||||
// StartQuery(). TODO: can we avoid this extra copy?
|
||||
TExecPlanFragmentInfo fragment_info;
|
||||
const Status& fragment_info_sidecar_status =
|
||||
GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
|
||||
if (!fragment_info_sidecar_status.ok()) {
|
||||
RespondAndReleaseRpc(fragment_info_sidecar_status, response, rpc_context);
|
||||
return;
|
||||
}
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), sidecar.query_ctx.query_id);
|
||||
TQueryCtx query_ctx;
|
||||
const Status& query_ctx_sidecar_status =
|
||||
GetSidecar(request->query_ctx_sidecar_idx(), rpc_context, &query_ctx);
|
||||
if (!query_ctx_sidecar_status.ok()) {
|
||||
RespondAndReleaseRpc(query_ctx_sidecar_status, response, rpc_context);
|
||||
return;
|
||||
}
|
||||
ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_ctx.query_id);
|
||||
VLOG_QUERY << "ExecQueryFInstances():"
|
||||
<< " query_id=" << PrintId(sidecar.query_ctx.query_id)
|
||||
<< " coord=" << TNetworkAddressToString(sidecar.query_ctx.coord_address)
|
||||
<< " #instances=" << sidecar.fragment_instance_ctxs.size();
|
||||
Status resp_status =
|
||||
ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, sidecar);
|
||||
<< " query_id=" << PrintId(query_ctx.query_id)
|
||||
<< " coord=" << TNetworkAddressToString(query_ctx.coord_address)
|
||||
<< " #instances=" << fragment_info.fragment_instance_ctxs.size();
|
||||
Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
|
||||
request, query_ctx, fragment_info);
|
||||
if (!resp_status.ok()) {
|
||||
LOG(INFO) << "ExecQueryFInstances() failed: query_id="
|
||||
<< PrintId(sidecar.query_ctx.query_id) << ": " << resp_status.GetDetail();
|
||||
LOG(INFO) << "ExecQueryFInstances() failed: query_id=" << PrintId(query_ctx.query_id)
|
||||
<< ": " << resp_status.GetDetail();
|
||||
}
|
||||
RespondAndReleaseRpc(resp_status, response, rpc_context);
|
||||
}
|
||||
|
||||
@@ -92,12 +92,6 @@ class ControlService : public ControlServiceIf {
|
||||
const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
|
||||
TRuntimeProfileForest* thrift_profiles);
|
||||
|
||||
/// Helper for deserializing the ExecQueryFInstances sidecar attached in the inbound
|
||||
/// call within 'rpc_context'. On success, returns the deserialized sidecar in
|
||||
/// 'sidecar'. On failure, returns the error status;
|
||||
static Status GetExecQueryFInstancesSidecar(const ExecQueryFInstancesRequestPB& request,
|
||||
RpcContext* rpc_context, TExecQueryFInstancesSidecar* sidecar);
|
||||
|
||||
/// Helper for serializing 'status' as part of 'response'. Also releases memory
|
||||
/// of the RPC payload previously accounted towards the internal memory tracker.
|
||||
template <typename ResponsePBType>
|
||||
|
||||
@@ -222,9 +222,11 @@ message ExecQueryFInstancesRequestPB {
|
||||
// the coordinator.
|
||||
optional int32 coord_state_idx = 1;
|
||||
|
||||
// Sidecar index of the TExecQueryFInstancesSidecar, which contains the query and plan
|
||||
// fragment contexts.
|
||||
optional int32 sidecar_idx = 2;
|
||||
// Sidecar index of the TQueryCtx.
|
||||
optional int32 query_ctx_sidecar_idx = 2;
|
||||
|
||||
// Sidecar index of the TExecPlanFragmentInfo.
|
||||
optional int32 plan_fragment_info_sidecar_idx = 3;
|
||||
|
||||
// The minimum query-wide memory reservation (in bytes) required for the backend
|
||||
// executing the instances in fragment_instance_ctxs. This is the peak minimum
|
||||
@@ -232,17 +234,17 @@ message ExecQueryFInstancesRequestPB {
|
||||
// point in query execution. It may be less than the initial reservation total claims
|
||||
// (below) if execution of some operators never overlaps, which allows reuse of
|
||||
// reservations.
|
||||
optional int64 min_mem_reservation_bytes = 3;
|
||||
optional int64 min_mem_reservation_bytes = 4;
|
||||
|
||||
// Total of the initial buffer reservations that we expect to be claimed on this
|
||||
// backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
|
||||
// operators in all fragment instances that execute on this backend. This is used for
|
||||
// an optimization in InitialReservation. Measured in bytes.
|
||||
optional int64 initial_mem_reservation_total_claims = 4;
|
||||
optional int64 initial_mem_reservation_total_claims = 5;
|
||||
|
||||
// The backend memory limit (in bytes) as set by the admission controller. Used by the
|
||||
// query mem tracker to enforce the memory limit.
|
||||
optional int64 per_backend_mem_limit = 5;
|
||||
optional int64 per_backend_mem_limit = 6;
|
||||
}
|
||||
|
||||
message ExecQueryFInstancesResponsePB {
|
||||
|
||||
@@ -658,14 +658,15 @@ enum ImpalaInternalServiceVersion {
|
||||
|
||||
// The following contains the per-rpc structs for the parameters and the result.
|
||||
|
||||
// TODO: convert this fully to protobuf.
|
||||
struct TExecQueryFInstancesSidecar {
|
||||
1: optional TQueryCtx query_ctx
|
||||
|
||||
2: optional list<TPlanFragmentCtx> fragment_ctxs
|
||||
// Contains info about plan fragment execution needed for the ExecQueryFInstances rpc.
|
||||
// Rather than fully coverting this to protobuf, which would be a large change, for now we
|
||||
// serialize it ourselves and send it with ExecQueryFInstances as a sidecar.
|
||||
// TODO: investigate if it's worth converting this fully to protobuf
|
||||
struct TExecPlanFragmentInfo {
|
||||
1: optional list<TPlanFragmentCtx> fragment_ctxs
|
||||
|
||||
// the order corresponds to the order of fragments in fragment_ctxs
|
||||
3: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
|
||||
2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
|
||||
}
|
||||
|
||||
// Parameters for RequestPoolService.resolveRequestPool()
|
||||
|
||||
Reference in New Issue
Block a user