IMPALA-9692 (part 2): Refactor parts of TExecPlanFragmentInfo to protobuf

The new admission control service will be written in protobuf, so
there are various admission control related structures currently
stored in Thrift that it would be convenient to convert to protobuf,
to minimize the amount of converting back and forth that needs to be
done.

This patch converts some portions of TExecPlanFragmentInfo to
protobuf. TExecPlanFragmentInfo is sent as a sidecar with the Exec()
rpc, so the refactored parts are now just directly included in the
ExecQueryFInstancesRequestPB.

The portions that are converted are those that are part of the
QuerySchedule, in particular the TPlanFragmentDestination,
TScanRangeParams, and TJoinBuildInput.

This patch is just a refactor and doesn't contain any functional
changes.

One notable related change is that DataSink::CreateSink() has two
parameters removed - TPlanFragmentCtx (which no longer exists) and
TPlanFragmentInstanceCtx. These variables and the new PB eqivalents
are available via the RuntimeState that was already being passed in as
another parameter and don't need to be individually passed in.

Testing:
- Passed a full run of existing tests.
- Ran the single node perf test and didn't detect any regressions.

Change-Id: I3a8e46767b257bbf677171ac2f4efb1b623ba41b
Reviewed-on: http://gerrit.cloudera.org:8080/15844
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Thomas Tauber-Marshall
2020-03-24 14:04:53 -07:00
committed by Impala Public Jenkins
parent a1485177d4
commit b67c0906f5
56 changed files with 699 additions and 385 deletions

View File

@@ -84,10 +84,11 @@ class Planner {
TNetworkAddress dummy; TNetworkAddress dummy;
ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx); ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_)); runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
TPlanFragmentCtx* fragment_ctx = TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new TPlanFragment());
runtime_state_->obj_pool()->Add(new TPlanFragmentCtx()); PlanFragmentCtxPB* fragment_ctx =
runtime_state_->obj_pool()->Add(new PlanFragmentCtxPB());
fragment_state_ = runtime_state_->obj_pool()->Add( fragment_state_ = runtime_state_->obj_pool()->Add(
new FragmentState(runtime_state_->query_state(), *fragment_ctx)); new FragmentState(runtime_state_->query_state(), *fragment, *fragment_ctx));
return frontend_.GetExecRequest(query_ctx, result); return frontend_.GetExecRequest(query_ctx, result);
} }

View File

@@ -482,9 +482,10 @@ int main(int argc, char **argv) {
} }
status = test_env.CreateQueryState(0, nullptr, &state); status = test_env.CreateQueryState(0, nullptr, &state);
QueryState* qs = state->query_state(); QueryState* qs = state->query_state();
TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx()); TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new PlanFragmentCtxPB());
FragmentState* fragment_state = FragmentState* fragment_state =
qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx)); qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
if (!status.ok()) { if (!status.ok()) {
cout << "Could not create RuntimeState"; cout << "Could not create RuntimeState";
return -1; return -1;

View File

@@ -52,8 +52,10 @@ class LlvmCodeGenTest : public testing:: Test {
RuntimeState* runtime_state_; RuntimeState* runtime_state_;
ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_)); ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
QueryState* qs = runtime_state_->query_state(); QueryState* qs = runtime_state_->query_state();
TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx()); TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx)); PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new PlanFragmentCtxPB());
fragment_state_ =
qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
} }
virtual void TearDown() { virtual void TearDown() {

View File

@@ -34,6 +34,7 @@
#include "util/runtime-profile-counters.h" #include "util/runtime-profile-counters.h"
#include "util/thread.h" #include "util/thread.h"
#include "util/time.h" #include "util/time.h"
#include "util/uid-util.h"
#include "gen-cpp/PlanNodes_types.h" #include "gen-cpp/PlanNodes_types.h"
@@ -189,16 +190,17 @@ Status BlockingJoinNode::OpenImpl(RuntimeState* state, JoinBuilder** separate_bu
if (UseSeparateBuild(state->query_options())) { if (UseSeparateBuild(state->query_options())) {
// Find the input fragment's build sink. We do this in the Open() phase so we don't // Find the input fragment's build sink. We do this in the Open() phase so we don't
// block this finstance's Prepare() phase on the build finstance's Prepare() phase. // block this finstance's Prepare() phase on the build finstance's Prepare() phase.
const vector<TJoinBuildInput>& build_inputs = const google::protobuf::RepeatedPtrField<JoinBuildInputPB>& build_inputs =
state->instance_ctx().join_build_inputs; state->instance_ctx_pb().join_build_inputs();
auto it = std::find_if(build_inputs.begin(), build_inputs.end(), auto it = std::find_if(build_inputs.begin(), build_inputs.end(),
[this](const TJoinBuildInput& bi) { return bi.join_node_id == id_; }); [this](const JoinBuildInputPB& bi) { return bi.join_node_id() == id_; });
DCHECK(it != build_inputs.end()); DCHECK(it != build_inputs.end());
FragmentInstanceState* build_finstance; FragmentInstanceState* build_finstance;
RETURN_IF_ERROR(state->query_state()->GetFInstanceState( TUniqueId input_finstance_id;
it->input_finstance_id, &build_finstance)); UniqueIdPBToTUniqueId(it->input_finstance_id(), &input_finstance_id);
TDataSinkType::type build_sink_type = RETURN_IF_ERROR(
build_finstance->fragment_ctx().fragment.output_sink.type; state->query_state()->GetFInstanceState(input_finstance_id, &build_finstance));
TDataSinkType::type build_sink_type = build_finstance->fragment().output_sink.type;
DCHECK(IsJoinBuildSink(build_sink_type)); DCHECK(IsJoinBuildSink(build_sink_type));
*separate_builder = build_finstance->GetJoinBuildSink(); *separate_builder = build_finstance->GetJoinBuildSink();
DCHECK(*separate_builder != nullptr); DCHECK(*separate_builder != nullptr);

View File

@@ -53,8 +53,7 @@ class DataSinkConfig {
virtual ~DataSinkConfig() {} virtual ~DataSinkConfig() {}
/// Create its corresponding DataSink. Place the sink in state->obj_pool(). /// Create its corresponding DataSink. Place the sink in state->obj_pool().
virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, virtual DataSink* CreateSink(RuntimeState* state) const = 0;
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
/// Codegen expressions in the sink. Overridden by sink type which supports codegen. /// Codegen expressions in the sink. Overridden by sink type which supports codegen.
/// No-op by default. /// No-op by default.

View File

@@ -104,19 +104,19 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
// TODO(marcel): add int tuple_idx_[] indexed by TupleId somewhere in runtime-state.h // TODO(marcel): add int tuple_idx_[] indexed by TupleId somewhere in runtime-state.h
tuple_idx_ = 0; tuple_idx_ = 0;
// Convert TScanRangeParams to ScanRanges // Convert ScanRangeParamsPB to ScanRanges
DCHECK(scan_range_params_ != NULL) DCHECK(scan_range_params_ != NULL)
<< "Must call SetScanRanges() before calling Prepare()"; << "Must call SetScanRanges() before calling Prepare()";
for (const TScanRangeParams& params: *scan_range_params_) { for (const ScanRangeParamsPB& params : *scan_range_params_) {
DCHECK(params.scan_range.__isset.hbase_key_range); DCHECK(params.scan_range().has_hbase_key_range());
const THBaseKeyRange& key_range = params.scan_range.hbase_key_range; const HBaseKeyRangePB& key_range = params.scan_range().hbase_key_range();
scan_range_vector_.push_back(HBaseTableScanner::ScanRange()); scan_range_vector_.push_back(HBaseTableScanner::ScanRange());
HBaseTableScanner::ScanRange& sr = scan_range_vector_.back(); HBaseTableScanner::ScanRange& sr = scan_range_vector_.back();
if (key_range.__isset.startKey) { if (key_range.has_startkey()) {
sr.set_start_key(key_range.startKey); sr.set_start_key(key_range.startkey());
} }
if (key_range.__isset.stopKey) { if (key_range.has_stopkey()) {
sr.set_stop_key(key_range.stopKey); sr.set_stop_key(key_range.stopkey());
} }
} }
runtime_profile_->AddInfoString("Table Name", hbase_table->fully_qualified_name()); runtime_profile_->AddInfoString("Table Name", hbase_table->fully_qualified_name());

View File

@@ -31,9 +31,8 @@
namespace impala { namespace impala {
DataSink* HBaseTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* HBaseTableSinkConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const { TDataSinkId sink_id = state->fragment().idx;
TDataSinkId sink_id = fragment_ctx.fragment.idx;
return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state)); return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state));
} }

View File

@@ -33,9 +33,7 @@ namespace impala {
class HBaseTableSinkConfig : public DataSinkConfig { class HBaseTableSinkConfig : public DataSinkConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
~HBaseTableSinkConfig() override {} ~HBaseTableSinkConfig() override {}
}; };

View File

@@ -48,6 +48,7 @@
#include "runtime/query-state.h" #include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h" #include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h" #include "runtime/runtime-state.h"
#include "util/compression-util.h"
#include "util/disk-info.h" #include "util/disk-info.h"
#include "util/hdfs-util.h" #include "util/hdfs-util.h"
#include "util/impalad-metrics.h" #include "util/impalad-metrics.h"
@@ -223,14 +224,16 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
*min_max_row_desc, state, &min_max_conjuncts_)); *min_max_row_desc, state, &min_max_conjuncts_));
} }
const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = state->instance_ctxs(); const vector<const PlanFragmentInstanceCtxPB*>& instance_ctxs =
state->instance_ctx_pbs();
for (auto ctx : instance_ctxs) { for (auto ctx : instance_ctxs) {
auto ranges = ctx->per_node_scan_ranges.find(tnode.node_id); auto ranges = ctx->per_node_scan_ranges().find(tnode.node_id);
if (ranges == ctx->per_node_scan_ranges.end()) continue; if (ranges == ctx->per_node_scan_ranges().end()) continue;
for (const TScanRangeParams& scan_range_param : ranges->second) { for (const ScanRangeParamsPB& scan_range_param : ranges->second.scan_ranges()) {
const THdfsFileSplit& split = scan_range_param.scan_range.hdfs_file_split; DCHECK(scan_range_param.scan_range().has_hdfs_file_split());
const HdfsFileSplitPB& split = scan_range_param.scan_range().hdfs_file_split();
HdfsPartitionDescriptor* partition_desc = HdfsPartitionDescriptor* partition_desc =
hdfs_table_->GetPartition(split.partition_id); hdfs_table_->GetPartition(split.partition_id());
scanned_file_formats_.insert(partition_desc->file_format()); scanned_file_formats_.insert(partition_desc->file_format());
} }
} }
@@ -328,28 +331,29 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
scan_node_pool_.reset(new MemPool(mem_tracker())); scan_node_pool_.reset(new MemPool(mem_tracker()));
HdfsFsCache::HdfsFsMap fs_cache; HdfsFsCache::HdfsFsMap fs_cache;
// Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate // Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and populate
// partition_ids_, file_descs_, and per_type_files_. // partition_ids_, file_descs_, and per_type_files_.
DCHECK(scan_range_params_ != NULL) DCHECK(scan_range_params_ != NULL)
<< "Must call SetScanRanges() before calling Prepare()"; << "Must call SetScanRanges() before calling Prepare()";
int num_ranges_missing_volume_id = 0; int num_ranges_missing_volume_id = 0;
for (const TScanRangeParams& params: *scan_range_params_) { for (const ScanRangeParamsPB& params : *scan_range_params_) {
DCHECK(params.scan_range.__isset.hdfs_file_split); DCHECK(params.scan_range().has_hdfs_file_split());
const THdfsFileSplit& split = params.scan_range.hdfs_file_split; const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
partition_ids_.insert(split.partition_id); partition_ids_.insert(split.partition_id());
HdfsPartitionDescriptor* partition_desc = HdfsPartitionDescriptor* partition_desc =
hdfs_table_->GetPartition(split.partition_id); hdfs_table_->GetPartition(split.partition_id());
if (partition_desc == NULL) { if (partition_desc == NULL) {
// TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702. // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id() LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
<< " partition_id=" << split.partition_id << " partition_id=" << split.partition_id() << "\n"
<< "\n" << PrintThrift(state->instance_ctx()); << PrintThrift(state->instance_ctx()) << "\n"
<< state->instance_ctx_pb().DebugString();
return Status("Query encountered invalid metadata, likely due to IMPALA-1702." return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
" Try rerunning the query."); " Try rerunning the query.");
} }
filesystem::path file_path(partition_desc->location()); filesystem::path file_path(partition_desc->location());
file_path.append(split.relative_path, filesystem::path::codecvt()); file_path.append(split.relative_path(), filesystem::path::codecvt());
const string& native_file_path = file_path.native(); const string& native_file_path = file_path.native();
auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path); auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
@@ -359,10 +363,10 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
// Add new file_desc to file_descs_ and per_type_files_ // Add new file_desc to file_descs_ and per_type_files_
file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path)); file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
file_descs_[file_desc_map_key] = file_desc; file_descs_[file_desc_map_key] = file_desc;
file_desc->file_length = split.file_length; file_desc->file_length = split.file_length();
file_desc->mtime = split.mtime; file_desc->mtime = split.mtime();
file_desc->file_compression = split.file_compression; file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
file_desc->is_erasure_coded = split.is_erasure_coded; file_desc->is_erasure_coded = split.is_erasure_coded();
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
native_file_path, &file_desc->fs, &fs_cache)); native_file_path, &file_desc->fs, &fs_cache));
per_type_files_[partition_desc->file_format()].push_back(file_desc); per_type_files_[partition_desc->file_format()].push_back(file_desc);
@@ -371,19 +375,19 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
file_desc = file_desc_it->second; file_desc = file_desc_it->second;
} }
bool expected_local = params.__isset.is_remote && !params.is_remote; bool expected_local = params.has_is_remote() && !params.is_remote();
if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id; if (expected_local && params.volume_id() == -1) ++num_ranges_missing_volume_id;
int cache_options = BufferOpts::NO_CACHING; int cache_options = BufferOpts::NO_CACHING;
if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) { if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
cache_options |= BufferOpts::USE_HDFS_CACHE; cache_options |= BufferOpts::USE_HDFS_CACHE;
} }
if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) { if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) {
cache_options |= BufferOpts::USE_DATA_CACHE; cache_options |= BufferOpts::USE_DATA_CACHE;
} }
file_desc->splits.push_back( file_desc->splits.push_back(
AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length, AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length(),
split.offset, split.partition_id, params.volume_id, expected_local, split.offset(), split.partition_id(), params.volume_id(), expected_local,
file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options))); file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options)));
} }
@@ -1009,17 +1013,17 @@ void HdfsScanNodeBase::TransferToScanNodePool(MemPool* pool) {
} }
void HdfsScanNodeBase::UpdateHdfsSplitStats( void HdfsScanNodeBase::UpdateHdfsSplitStats(
const vector<TScanRangeParams>& scan_range_params_list, const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& scan_range_params_list,
PerVolumeStats* per_volume_stats) { PerVolumeStats* per_volume_stats) {
pair<int, int64_t> init_value(0, 0); pair<int, int64_t> init_value(0, 0);
for (const TScanRangeParams& scan_range_params: scan_range_params_list) { for (const ScanRangeParamsPB& scan_range_params : scan_range_params_list) {
const TScanRange& scan_range = scan_range_params.scan_range; const ScanRangePB& scan_range = scan_range_params.scan_range();
if (!scan_range.__isset.hdfs_file_split) continue; if (!scan_range.has_hdfs_file_split()) continue;
const THdfsFileSplit& split = scan_range.hdfs_file_split; const HdfsFileSplitPB& split = scan_range.hdfs_file_split();
pair<int, int64_t>* stats = pair<int, int64_t>* stats =
FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value); FindOrInsert(per_volume_stats, scan_range_params.volume_id(), init_value);
++(stats->first); ++(stats->first);
stats->second += split.length; stats->second += split.length();
} }
} }

View File

@@ -414,7 +414,7 @@ class HdfsScanNodeBase : public ScanNode {
/// Update the per volume stats with the given scan range params list /// Update the per volume stats with the given scan range params list
static void UpdateHdfsSplitStats( static void UpdateHdfsSplitStats(
const std::vector<TScanRangeParams>& scan_range_params_list, const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& scan_range_params_list,
PerVolumeStats* per_volume_stats); PerVolumeStats* per_volume_stats);
/// Output the per_volume_stats to stringstream. The output format is a list of: /// Output the per_volume_stats to stringstream. The output format is a list of:

View File

@@ -64,9 +64,8 @@ Status HdfsTableSinkConfig::Init(
return Status::OK(); return Status::OK();
} }
DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* HdfsTableSinkConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const { TDataSinkId sink_id = state->fragment().idx;
TDataSinkId sink_id = fragment_ctx.fragment.idx;
return state->obj_pool()->Add( return state->obj_pool()->Add(
new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state)); new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
} }

View File

@@ -96,9 +96,7 @@ struct OutputPartition {
class HdfsTableSinkConfig : public DataSinkConfig { class HdfsTableSinkConfig : public DataSinkConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
void Close() override; void Close() override;
/// Expressions for computing the target partitions to which a row is written. /// Expressions for computing the target partitions to which a row is written.

View File

@@ -78,12 +78,13 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL); DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
// Initialize the list of scan tokens to process from the TScanRangeParams. // Initialize the list of scan tokens to process from the ScanRangeParamsPB.
DCHECK(scan_range_params_ != NULL); DCHECK(scan_range_params_ != NULL);
int num_remote_tokens = 0; int num_remote_tokens = 0;
for (const TScanRangeParams& params: *scan_range_params_) { for (const ScanRangeParamsPB& params : *scan_range_params_) {
if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens; if (params.has_is_remote() && params.is_remote()) ++num_remote_tokens;
scan_tokens_.push_back(params.scan_range.kudu_scan_token); DCHECK(params.scan_range().has_kudu_scan_token());
scan_tokens_.push_back(params.scan_range().kudu_scan_token());
} }
COUNTER_SET(kudu_remote_tokens_, num_remote_tokens); COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);

View File

@@ -67,9 +67,8 @@ namespace impala {
// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693). // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024; const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
DataSink* KuduTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* KuduTableSinkConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const { TDataSinkId sink_id = state->fragment().idx;
TDataSinkId sink_id = fragment_ctx.fragment.idx;
return state->obj_pool()->Add( return state->obj_pool()->Add(
new KuduTableSink(sink_id, *this, tsink_->table_sink, state)); new KuduTableSink(sink_id, *this, tsink_->table_sink, state));
} }

View File

@@ -32,9 +32,7 @@ class KuduTableDescriptor;
class KuduTableSinkConfig : public DataSinkConfig { class KuduTableSinkConfig : public DataSinkConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
~KuduTableSinkConfig() override {} ~KuduTableSinkConfig() override {}
}; };

View File

@@ -28,11 +28,9 @@
using namespace impala; using namespace impala;
DataSink* NljBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* NljBuilderConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const {
// We have one fragment per sink, so we can use the fragment index as the sink ID. // We have one fragment per sink, so we can use the fragment index as the sink ID.
TDataSinkId sink_id = fragment_ctx.fragment.idx; TDataSinkId sink_id = state->fragment().idx;
return NljBuilder::CreateSeparateBuilder(sink_id, *this, state); return NljBuilder::CreateSeparateBuilder(sink_id, *this, state);
} }

View File

@@ -29,9 +29,7 @@ namespace impala {
class NljBuilderConfig : public JoinBuilderConfig { class NljBuilderConfig : public JoinBuilderConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
~NljBuilderConfig() override {} ~NljBuilderConfig() override {}

View File

@@ -58,10 +58,9 @@ static string ConstructBuilderName(int join_node_id) {
return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id); return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id);
} }
DataSink* PhjBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* PhjBuilderConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
// We have one fragment per sink, so we can use the fragment index as the sink ID. // We have one fragment per sink, so we can use the fragment index as the sink ID.
TDataSinkId sink_id = fragment_ctx.fragment.idx; TDataSinkId sink_id = state->fragment().idx;
ObjectPool* pool = state->obj_pool(); ObjectPool* pool = state->obj_pool();
return pool->Add(new PhjBuilder(sink_id, *this, state)); return pool->Add(new PhjBuilder(sink_id, *this, state));
} }

View File

@@ -51,9 +51,7 @@ class ScalarExprEvaluator;
/// DataSinkConfig::CreateSink() are not implemented for it. /// DataSinkConfig::CreateSink() are not implemented for it.
class PhjBuilderConfig : public JoinBuilderConfig { class PhjBuilderConfig : public JoinBuilderConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
/// Creates a PhjBuilder for embedded use within a PartitionedHashJoinNode. /// Creates a PhjBuilder for embedded use within a PartitionedHashJoinNode.
PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client, PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,

View File

@@ -37,13 +37,12 @@ using std::unique_lock;
namespace impala { namespace impala {
DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* PlanRootSinkConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const { TDataSinkId sink_id = state->fragment().idx;
TDataSinkId sink_id = fragment_ctx.fragment.idx;
ObjectPool* pool = state->obj_pool(); ObjectPool* pool = state->obj_pool();
if (state->query_options().spool_query_results) { if (state->query_options().spool_query_results) {
return pool->Add(new BufferedPlanRootSink( return pool->Add(new BufferedPlanRootSink(
sink_id, *this, state, fragment_instance_ctx.debug_options)); sink_id, *this, state, state->instance_ctx().debug_options));
} else { } else {
return pool->Add(new BlockingPlanRootSink(sink_id, *this, state)); return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
} }

View File

@@ -29,9 +29,7 @@ class ScalarExprEvaluator;
class PlanRootSinkConfig : public DataSinkConfig { class PlanRootSinkConfig : public DataSinkConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
~PlanRootSinkConfig() override {} ~PlanRootSinkConfig() override {}
}; };

View File

@@ -27,7 +27,7 @@
namespace impala { namespace impala {
class BlockingRowBatchQueue; class BlockingRowBatchQueue;
class TScanRange; class ScanRangeParamsPB;
class ScanPlanNode : public PlanNode { class ScanPlanNode : public PlanNode {
public: public:
@@ -115,7 +115,8 @@ class ScanNode : public ExecNode {
/// This should be called before Prepare(), and the argument must be not destroyed until /// This should be called before Prepare(), and the argument must be not destroyed until
/// after Prepare(). /// after Prepare().
void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) { void SetScanRanges(
const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& scan_range_params) {
scan_range_params_ = &scan_range_params; scan_range_params_ = &scan_range_params;
} }
@@ -144,7 +145,7 @@ class ScanNode : public ExecNode {
RuntimeState* runtime_state_ = nullptr; RuntimeState* runtime_state_ = nullptr;
/// The scan ranges this scan node is responsible for. Not owned. /// The scan ranges this scan node is responsible for. Not owned.
const std::vector<TScanRangeParams>* scan_range_params_; const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>* scan_range_params_;
/// Total bytes read from the scanner. Initialised in subclasses that track /// Total bytes read from the scanner. Initialised in subclasses that track
/// bytes read, including HDFS and HBase by calling AddBytesReadCounters(). /// bytes read, including HDFS and HBase by calling AddBytesReadCounters().

View File

@@ -117,8 +117,10 @@ class ExprCodegenTest : public ::testing::Test {
ASSERT_OK(test_env_->Init()); ASSERT_OK(test_env_->Init());
ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_)); ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
QueryState* qs = runtime_state_->query_state(); QueryState* qs = runtime_state_->query_state();
TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx()); TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx)); PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new PlanFragmentCtxPB());
fragment_state_ =
qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
FunctionContext::TypeDesc return_type; FunctionContext::TypeDesc return_type;
return_type.type = FunctionContext::TYPE_DECIMAL; return_type.type = FunctionContext::TYPE_DECIMAL;

View File

@@ -24,12 +24,14 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
# Mark the protobuf files as generated # Mark the protobuf files as generated
set_source_files_properties(${COMMON_PROTO_SRCS} PROPERTIES GENERATED TRUE) set_source_files_properties(${COMMON_PROTO_SRCS} PROPERTIES GENERATED TRUE)
set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE) set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
set_source_files_properties(${PLANNER_PROTO_SRCS} PROPERTIES GENERATED TRUE)
add_library(Rpc add_library(Rpc
authentication.cc authentication.cc
${COMMON_PROTO_SRCS} ${COMMON_PROTO_SRCS}
cookie-util.cc cookie-util.cc
impala-service-pool.cc impala-service-pool.cc
${PLANNER_PROTO_SRCS}
rpc-mgr.cc rpc-mgr.cc
rpc-trace.cc rpc-trace.cc
TAcceptQueueServer.cpp TAcceptQueueServer.cpp

View File

@@ -120,32 +120,41 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit()); request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit());
// set fragment_ctxs and fragment_instance_ctxs // set fragment_ctxs and fragment_instance_ctxs
fragment_info->__isset.fragment_ctxs = true; fragment_info->__isset.fragments = true;
fragment_info->__isset.fragment_instance_ctxs = true; fragment_info->__isset.fragment_instance_ctxs = true;
fragment_info->fragment_instance_ctxs.resize( fragment_info->fragment_instance_ctxs.resize(
backend_exec_params_->instance_params.size()); backend_exec_params_->instance_params.size());
for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) { for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i]; TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i];
PlanFragmentInstanceCtxPB* instance_ctx_pb = request->add_fragment_instance_ctxs();
const FInstanceExecParams& params = *backend_exec_params_->instance_params[i]; const FInstanceExecParams& params = *backend_exec_params_->instance_params[i];
int fragment_idx = params.fragment_exec_params.fragment.idx; int fragment_idx = params.fragment_exec_params.fragment.idx;
// add a TPlanFragmentCtx, if we don't already have it // add a TPlanFragment, if we don't already have it
if (fragment_info->fragment_ctxs.empty() if (fragment_info->fragments.empty()
|| fragment_info->fragment_ctxs.back().fragment.idx != fragment_idx) { || fragment_info->fragments.back().idx != fragment_idx) {
fragment_info->fragment_ctxs.emplace_back(); fragment_info->fragments.push_back(params.fragment_exec_params.fragment);
TPlanFragmentCtx& fragment_ctx = fragment_info->fragment_ctxs.back(); PlanFragmentCtxPB* fragment_ctx = request->add_fragment_ctxs();
fragment_ctx.__set_fragment(params.fragment_exec_params.fragment); fragment_ctx->set_fragment_idx(params.fragment_exec_params.fragment.idx);
fragment_ctx.__set_destinations(params.fragment_exec_params.destinations); *fragment_ctx->mutable_destinations() = {
params.fragment_exec_params.destinations.begin(),
params.fragment_exec_params.destinations.end()};
} }
instance_ctx.fragment_idx = fragment_idx; instance_ctx.fragment_idx = fragment_idx;
instance_ctx_pb->set_fragment_idx(fragment_idx);
instance_ctx.fragment_instance_id = params.instance_id; instance_ctx.fragment_instance_id = params.instance_id;
instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx; instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges); for (const auto& entry : params.per_node_scan_ranges) {
ScanRangesPB& scan_ranges =
(*instance_ctx_pb->mutable_per_node_scan_ranges())[entry.first];
*scan_ranges.mutable_scan_ranges() = {entry.second.begin(), entry.second.end()};
}
instance_ctx.__set_per_exch_num_senders( instance_ctx.__set_per_exch_num_senders(
params.fragment_exec_params.per_exch_num_senders); params.fragment_exec_params.per_exch_num_senders);
instance_ctx.__set_sender_id(params.sender_id); instance_ctx.__set_sender_id(params.sender_id);
instance_ctx.__set_join_build_inputs(params.join_build_inputs); *instance_ctx_pb->mutable_join_build_inputs() = {
params.join_build_inputs.begin(), params.join_build_inputs.end()};
if (params.num_join_build_outputs != -1) { if (params.num_join_build_outputs != -1) {
instance_ctx.__set_num_join_build_outputs(params.num_join_build_outputs); instance_ctx.__set_num_join_build_outputs(params.num_join_build_outputs);
} }
@@ -715,9 +724,9 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
// add total split size to fragment_stats->bytes_assigned() // add total split size to fragment_stats->bytes_assigned()
for (const PerNodeScanRanges::value_type& entry: exec_params_.per_node_scan_ranges) { for (const PerNodeScanRanges::value_type& entry: exec_params_.per_node_scan_ranges) {
for (const TScanRangeParams& scan_range_params: entry.second) { for (const ScanRangeParamsPB& scan_range_params : entry.second) {
if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue; if (!scan_range_params.scan_range().has_hdfs_file_split()) continue;
total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length; total_split_size_ += scan_range_params.scan_range().hdfs_file_split().length();
} }
} }
(*fragment_stats->bytes_assigned())(total_split_size_); (*fragment_stats->bytes_assigned())(total_split_size_);

View File

@@ -154,10 +154,11 @@ class DataStreamTest : public testing::Test {
ABORT_IF_ERROR(exec_env_->InitForFeTests()); ABORT_IF_ERROR(exec_env_->InitForFeTests());
exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024); exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get())); runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
TPlanFragmentCtx* fragment_ctx = TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new TPlanFragment());
runtime_state_->obj_pool()->Add(new TPlanFragmentCtx()); PlanFragmentCtxPB* fragment_ctx =
runtime_state_->obj_pool()->Add(new PlanFragmentCtxPB());
fragment_state_ = runtime_state_->obj_pool()->Add( fragment_state_ = runtime_state_->obj_pool()->Add(
new FragmentState(runtime_state_->query_state(), *fragment_ctx)); new FragmentState(runtime_state_->query_state(), *fragment, *fragment_ctx));
mem_pool_.reset(new MemPool(&tracker_)); mem_pool_.reset(new MemPool(&tracker_));
// Register a BufferPool client for allocating buffers for row batches. // Register a BufferPool client for allocating buffers for row batches.
@@ -176,8 +177,8 @@ class DataStreamTest : public testing::Test {
tsort_info_.is_asc_order.push_back(true); tsort_info_.is_asc_order.push_back(true);
tsort_info_.nulls_first.push_back(true); tsort_info_.nulls_first.push_back(true);
next_instance_id_.lo = 0; next_instance_id_.set_lo(0);
next_instance_id_.hi = 0; next_instance_id_.set_hi(0);
stream_mgr_ = exec_env_->stream_mgr(); stream_mgr_ = exec_env_->stream_mgr();
broadcast_sink_.dest_node_id = DEST_NODE_ID; broadcast_sink_.dest_node_id = DEST_NODE_ID;
@@ -243,7 +244,7 @@ class DataStreamTest : public testing::Test {
void Reset() { void Reset() {
sender_info_.clear(); sender_info_.clear();
receiver_info_.clear(); receiver_info_.clear();
dest_.clear(); dest_.Clear();
} }
ObjectPool obj_pool_; ObjectPool obj_pool_;
@@ -256,7 +257,7 @@ class DataStreamTest : public testing::Test {
boost::scoped_ptr<ExecEnv> exec_env_; boost::scoped_ptr<ExecEnv> exec_env_;
scoped_ptr<RuntimeState> runtime_state_; scoped_ptr<RuntimeState> runtime_state_;
FragmentState* fragment_state_; FragmentState* fragment_state_;
TUniqueId next_instance_id_; UniqueIdPB next_instance_id_;
string stmt_; string stmt_;
// The sorting expression for the single BIGINT column. // The sorting expression for the single BIGINT column.
vector<ScalarExpr*> ordering_exprs_; vector<ScalarExpr*> ordering_exprs_;
@@ -282,7 +283,7 @@ class DataStreamTest : public testing::Test {
TDataStreamSink broadcast_sink_; TDataStreamSink broadcast_sink_;
TDataStreamSink random_sink_; TDataStreamSink random_sink_;
TDataStreamSink hash_sink_; TDataStreamSink hash_sink_;
vector<TPlanFragmentDestination> dest_; google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB> dest_;
struct SenderInfo { struct SenderInfo {
unique_ptr<thread> thread_handle; unique_ptr<thread> thread_handle;
@@ -318,14 +319,12 @@ class DataStreamTest : public testing::Test {
// Create an instance id and add it to dest_ // Create an instance id and add it to dest_
void GetNextInstanceId(TUniqueId* instance_id) { void GetNextInstanceId(TUniqueId* instance_id) {
dest_.push_back(TPlanFragmentDestination()); PlanFragmentDestinationPB* dest = dest_.Add();
TPlanFragmentDestination& dest = dest_.back(); *dest->mutable_fragment_instance_id() = next_instance_id_;
dest.fragment_instance_id = next_instance_id_; *dest->mutable_thrift_backend() = MakeNetworkAddressPB("localhost", FLAGS_port);
dest.thrift_backend.hostname = "localhost"; *dest->mutable_krpc_backend() = FromTNetworkAddress(krpc_address_);
dest.thrift_backend.port = FLAGS_port; UniqueIdPBToTUniqueId(next_instance_id_, instance_id);
dest.__set_krpc_backend(krpc_address_); next_instance_id_.set_lo(next_instance_id_.lo() + 1);
*instance_id = next_instance_id_;
++next_instance_id_.lo;
} }
// RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot // RowDescriptor to mimic "select bigint_col from alltypesagg", except the slot
@@ -592,10 +591,11 @@ class DataStreamTest : public testing::Test {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_); RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num; VLOG_QUERY << "create sender " << sender_num;
const TDataSink sink = GetSink(partition_type); const TDataSink sink = GetSink(partition_type);
TPlanFragmentCtx fragment_ctx; TPlanFragment fragment;
fragment_ctx.fragment.output_sink = sink; fragment.output_sink = sink;
fragment_ctx.destinations = dest_; PlanFragmentCtxPB fragment_ctx;
FragmentState fragment_state(state.query_state(), fragment_ctx); *fragment_ctx.mutable_destinations() = dest_;
FragmentState fragment_state(state.query_state(), fragment, fragment_ctx);
DataSinkConfig* data_sink = nullptr; DataSinkConfig* data_sink = nullptr;
EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &fragment_state, &data_sink)); EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &fragment_state, &data_sink));

View File

@@ -54,6 +54,7 @@
#include "common/names.h" #include "common/names.h"
using google::protobuf::RepeatedPtrField;
using kudu::rpc::RpcContext; using kudu::rpc::RpcContext;
using namespace impala; using namespace impala;
using namespace apache::thrift; using namespace apache::thrift;
@@ -67,11 +68,14 @@ static const string PREPARE_TIMER_NAME = "PrepareTime";
static const string EXEC_TIMER_NAME = "ExecTime"; static const string EXEC_TIMER_NAME = "ExecTime";
FragmentInstanceState::FragmentInstanceState(QueryState* query_state, FragmentInstanceState::FragmentInstanceState(QueryState* query_state,
FragmentState* fragment_state, const TPlanFragmentInstanceCtx& instance_ctx) FragmentState* fragment_state, const TPlanFragmentInstanceCtx& instance_ctx,
const PlanFragmentInstanceCtxPB& instance_ctx_pb)
: query_state_(query_state), : query_state_(query_state),
fragment_state_(fragment_state), fragment_state_(fragment_state),
fragment_(fragment_state->fragment()),
instance_ctx_(instance_ctx),
fragment_ctx_(fragment_state->fragment_ctx()), fragment_ctx_(fragment_state->fragment_ctx()),
instance_ctx_(instance_ctx) {} instance_ctx_pb_(instance_ctx_pb) {}
Status FragmentInstanceState::Exec() { Status FragmentInstanceState::Exec() {
bool is_prepared = false; bool is_prepared = false;
@@ -139,8 +143,8 @@ Status FragmentInstanceState::Prepare() {
// Do not call RETURN_IF_ERROR or explicitly return before this line, // Do not call RETURN_IF_ERROR or explicitly return before this line,
// runtime_state_ != nullptr is a postcondition of this function. // runtime_state_ != nullptr is a postcondition of this function.
runtime_state_ = obj_pool()->Add(new RuntimeState( runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,
query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance())); instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
// total_time_counter() is in the runtime_state_ so start it up now. // total_time_counter() is in the runtime_state_ so start it up now.
SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(profile()->total_time_counter());
@@ -197,12 +201,12 @@ Status FragmentInstanceState::Prepare() {
// set scan ranges // set scan ranges
vector<ExecNode*> scan_nodes; vector<ExecNode*> scan_nodes;
vector<TScanRangeParams> no_scan_ranges; ScanRangesPB no_scan_ranges;
exec_tree_->CollectScanNodes(&scan_nodes); exec_tree_->CollectScanNodes(&scan_nodes);
for (ExecNode* scan_node: scan_nodes) { for (ExecNode* scan_node: scan_nodes) {
const vector<TScanRangeParams>& scan_ranges = FindWithDefault( const ScanRangesPB& scan_ranges = FindWithDefault(
instance_ctx_.per_node_scan_ranges, scan_node->id(), no_scan_ranges); instance_ctx_pb_.per_node_scan_ranges(), scan_node->id(), no_scan_ranges);
static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges); static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
} }
RuntimeProfile::Counter* prepare_timer = RuntimeProfile::Counter* prepare_timer =
@@ -216,7 +220,7 @@ Status FragmentInstanceState::Prepare() {
// prepare sink_ // prepare sink_
const DataSinkConfig* sink_config = fragment_state_->sink_config(); const DataSinkConfig* sink_config = fragment_state_->sink_config();
DCHECK(sink_config != nullptr); DCHECK(sink_config != nullptr);
sink_ = sink_config->CreateSink(fragment_ctx_, instance_ctx_, runtime_state_); sink_ = sink_config->CreateSink(runtime_state_);
RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker())); RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
RuntimeProfile* sink_profile = sink_->profile(); RuntimeProfile* sink_profile = sink_->profile();
if (sink_profile != nullptr) profile()->AddChild(sink_profile); if (sink_profile != nullptr) profile()->AddChild(sink_profile);
@@ -396,7 +400,7 @@ void FragmentInstanceState::Close() {
// If we haven't already released this thread token in Prepare(), release // If we haven't already released this thread token in Prepare(), release
// it before calling Close(). // it before calling Close().
if (fragment_ctx_.fragment.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) { if (fragment_.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) {
ReleaseThreadToken(); ReleaseThreadToken();
} }
@@ -536,13 +540,13 @@ const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB stat
} }
PlanRootSink* FragmentInstanceState::GetRootSink() const { PlanRootSink* FragmentInstanceState::GetRootSink() const {
return fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ? return fragment_.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
static_cast<PlanRootSink*>(sink_) : static_cast<PlanRootSink*>(sink_) :
nullptr; nullptr;
} }
bool FragmentInstanceState::HasJoinBuildSink() const { bool FragmentInstanceState::HasJoinBuildSink() const {
return IsJoinBuildSink(fragment_ctx_.fragment.output_sink.type); return IsJoinBuildSink(fragment_.output_sink.type);
} }
JoinBuilder* FragmentInstanceState::GetJoinBuildSink() const { JoinBuilder* FragmentInstanceState::GetJoinBuildSink() const {
@@ -562,11 +566,11 @@ RuntimeProfile* FragmentInstanceState::profile() const {
} }
void FragmentInstanceState::PrintVolumeIds() { void FragmentInstanceState::PrintVolumeIds() {
if (instance_ctx_.per_node_scan_ranges.empty()) return; if (instance_ctx_pb_.per_node_scan_ranges().empty()) return;
HdfsScanNodeBase::PerVolumeStats per_volume_stats; HdfsScanNodeBase::PerVolumeStats per_volume_stats;
for (const PerNodeScanRanges::value_type& entry: instance_ctx_.per_node_scan_ranges) { for (const auto& entry : instance_ctx_pb_.per_node_scan_ranges()) {
HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second, &per_volume_stats); HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second.scan_ranges(), &per_volume_stats);
} }
stringstream str; stringstream str;

View File

@@ -44,7 +44,7 @@ class RpcContext;
namespace impala { namespace impala {
class FragmentState; class FragmentState;
class TPlanFragmentCtx; class TPlanFragment;
class TPlanFragmentInstanceCtx; class TPlanFragmentInstanceCtx;
class TBloomFilter; class TBloomFilter;
class TUniqueId; class TUniqueId;
@@ -81,7 +81,8 @@ class JoinBuilder;
class FragmentInstanceState { class FragmentInstanceState {
public: public:
FragmentInstanceState(QueryState* query_state, FragmentState* fragment_state, FragmentInstanceState(QueryState* query_state, FragmentState* fragment_state,
const TPlanFragmentInstanceCtx& instance_ctx); const TPlanFragmentInstanceCtx& instance_ctx,
const PlanFragmentInstanceCtxPB& instance_ctx_pb);
/// Main loop of fragment instance execution. Blocks until execution finishes and /// Main loop of fragment instance execution. Blocks until execution finishes and
/// automatically releases resources. Returns execution status. /// automatically releases resources. Returns execution status.
@@ -131,8 +132,10 @@ class FragmentInstanceState {
RuntimeState* runtime_state() { return runtime_state_; } RuntimeState* runtime_state() { return runtime_state_; }
RuntimeProfile* profile() const; RuntimeProfile* profile() const;
const TQueryCtx& query_ctx() const; const TQueryCtx& query_ctx() const;
const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; } const TPlanFragment& fragment() const { return fragment_; }
const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; } const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
const PlanFragmentInstanceCtxPB& instance_ctx_pb() const { return instance_ctx_pb_; }
const TUniqueId& query_id() const { return query_ctx().query_id; } const TUniqueId& query_id() const { return query_ctx().query_id; }
const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; } const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
FInstanceExecStatePB current_state() const { return current_state_.Load(); } FInstanceExecStatePB current_state() const { return current_state_.Load(); }
@@ -158,8 +161,10 @@ class FragmentInstanceState {
private: private:
QueryState* query_state_; QueryState* query_state_;
FragmentState* fragment_state_; FragmentState* fragment_state_;
const TPlanFragmentCtx& fragment_ctx_; const TPlanFragment& fragment_;
const TPlanFragmentInstanceCtx& instance_ctx_; const TPlanFragmentInstanceCtx& instance_ctx_;
const PlanFragmentCtxPB& fragment_ctx_;
const PlanFragmentInstanceCtxPB& instance_ctx_pb_;
/// All following member variables that are initialized to nullptr are set /// All following member variables that are initialized to nullptr are set
/// in Prepare(). /// in Prepare().

View File

@@ -38,26 +38,35 @@ const string FragmentState::FSTATE_THREAD_GROUP_NAME = "fragment-init";
const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen"; const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen";
Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info, Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) { const ExecQueryFInstancesRequestPB& exec_request, QueryState* state,
std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) {
int fragment_ctx_idx = 0; int fragment_ctx_idx = 0;
const TPlanFragmentCtx& frag_ctx = fragment_info.fragment_ctxs[fragment_ctx_idx]; const TPlanFragment& frag = fragment_info.fragments[fragment_ctx_idx];
const PlanFragmentCtxPB& frag_ctx = exec_request.fragment_ctxs(fragment_ctx_idx);
FragmentState* fragment_state = FragmentState* fragment_state =
state->obj_pool()->Add(new FragmentState(state, frag_ctx)); state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
fragment_map[fragment_state->fragment_idx()] = fragment_state; fragment_map[fragment_state->fragment_idx()] = fragment_state;
for (const TPlanFragmentInstanceCtx& instance_ctx : for (int i = 0; i < fragment_info.fragment_instance_ctxs.size(); ++i) {
fragment_info.fragment_instance_ctxs) { const TPlanFragmentInstanceCtx& instance_ctx =
// determine corresponding TPlanFragmentCtx fragment_info.fragment_instance_ctxs[i];
const PlanFragmentInstanceCtxPB& instance_ctx_pb =
exec_request.fragment_instance_ctxs(i);
DCHECK_EQ(instance_ctx.fragment_idx, instance_ctx_pb.fragment_idx());
// determine corresponding TPlanFragment
if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) { if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) {
++fragment_ctx_idx; ++fragment_ctx_idx;
DCHECK_LT(fragment_ctx_idx, fragment_info.fragment_ctxs.size()); DCHECK_LT(fragment_ctx_idx, fragment_info.fragments.size());
const TPlanFragmentCtx& fragment_ctx = const TPlanFragment& fragment = fragment_info.fragments[fragment_ctx_idx];
fragment_info.fragment_ctxs[fragment_ctx_idx]; const PlanFragmentCtxPB& fragment_ctx =
fragment_state = state->obj_pool()->Add(new FragmentState(state, fragment_ctx)); exec_request.fragment_ctxs(fragment_ctx_idx);
DCHECK_EQ(fragment.idx, fragment_ctx.fragment_idx());
fragment_state =
state->obj_pool()->Add(new FragmentState(state, fragment, fragment_ctx));
fragment_map[fragment_state->fragment_idx()] = fragment_state; fragment_map[fragment_state->fragment_idx()] = fragment_state;
// we expect fragment and instance contexts to follow the same order // we expect fragment and instance contexts to follow the same order
DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx); DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx);
} }
fragment_state->AddInstance(&instance_ctx); fragment_state->AddInstance(&instance_ctx, &instance_ctx_pb);
} }
// Init all fragments. // Init all fragments.
for (auto& elem : fragment_map) { for (auto& elem : fragment_map) {
@@ -67,9 +76,9 @@ Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& fragme
} }
Status FragmentState::Init() { Status FragmentState::Init() {
RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_ctx_.fragment.plan, &plan_tree_)); RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_.plan, &plan_tree_));
RETURN_IF_ERROR(DataSinkConfig::CreateConfig(fragment_ctx_.fragment.output_sink, RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
plan_tree_->row_descriptor_, this, &sink_config_)); fragment_.output_sink, plan_tree_->row_descriptor_, this, &sink_config_));
return Status::OK(); return Status::OK();
} }
@@ -79,8 +88,8 @@ Status FragmentState::InvokeCodegen() {
codegen_invoked_ = true; codegen_invoked_ = true;
codegen_status_ = CodegenHelper(); codegen_status_ = CodegenHelper();
if (!codegen_status_.ok()) { if (!codegen_status_.ok()) {
string error_ctx = Substitute("Fragment failed during codegen, fragment index: $0", string error_ctx = Substitute(
fragment_ctx_.fragment.display_name); "Fragment failed during codegen, fragment index: $0", fragment_.display_name);
codegen_status_.AddDetail(error_ctx); codegen_status_.AddDetail(error_ctx);
query_state_->ErrorDuringFragmentCodegen(codegen_status_); query_state_->ErrorDuringFragmentCodegen(codegen_status_);
} }
@@ -111,12 +120,11 @@ Status FragmentState::CodegenHelper() {
return Status::OK(); return Status::OK();
} }
FragmentState::FragmentState(QueryState* query_state, FragmentState::FragmentState(QueryState* query_state, const TPlanFragment& fragment,
const TPlanFragmentCtx& fragment_ctx) const PlanFragmentCtxPB& fragment_ctx)
: query_state_(query_state), : query_state_(query_state), fragment_(fragment), fragment_ctx_(fragment_ctx) {
fragment_ctx_(fragment_ctx) { runtime_profile_ = RuntimeProfile::Create(
runtime_profile_ = RuntimeProfile::Create(query_state->obj_pool(), query_state->obj_pool(), Substitute("Fragment $0", fragment_.display_name));
Substitute("Fragment $0", fragment_ctx_.fragment.display_name));
query_state_->host_profile()->AddChild(runtime_profile_); query_state_->host_profile()->AddChild(runtime_profile_);
} }

View File

@@ -37,10 +37,13 @@ class RuntimeProfile;
class FragmentState { class FragmentState {
public: public:
/// Create a map of fragment index to its FragmentState object and only populate the /// Create a map of fragment index to its FragmentState object and only populate the
/// thrift references of the fragment and instance context objects from 'fragment_info'. /// thrift and protobuf references of the fragment and instance context objects from
/// 'fragment_info' and 'exec_request'.
static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info, static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& fragment_info,
QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map); const ExecQueryFInstancesRequestPB& exec_request, QueryState* state,
FragmentState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx); std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map);
FragmentState(QueryState* query_state, const TPlanFragment& fragment,
const PlanFragmentCtxPB& fragment_ctx);
~FragmentState(); ~FragmentState();
/// Called by all the fragment instance threads that execute this fragment. The first /// Called by all the fragment instance threads that execute this fragment. The first
@@ -54,12 +57,16 @@ class FragmentState {
void ReleaseResources(); void ReleaseResources();
ObjectPool* obj_pool() { return &obj_pool_; } ObjectPool* obj_pool() { return &obj_pool_; }
int fragment_idx() const { return fragment_ctx_.fragment.idx; } int fragment_idx() const { return fragment_.idx; }
const TQueryOptions& query_options() const { return query_state_->query_options(); } const TQueryOptions& query_options() const { return query_state_->query_options(); }
const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; } const TPlanFragment& fragment() const { return fragment_; }
const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const { const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
return instance_ctxs_; return instance_ctxs_;
} }
const std::vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs() const {
return instance_ctx_pbs_;
}
const PlanNode* plan_tree() const { return plan_tree_; } const PlanNode* plan_tree() const { return plan_tree_; }
const DataSinkConfig* sink_config() const { return sink_config_; } const DataSinkConfig* sink_config() const { return sink_config_; }
const TUniqueId& query_id() const { return query_state_->query_id(); } const TUniqueId& query_id() const { return query_state_->query_id(); }
@@ -153,8 +160,11 @@ class FragmentState {
QueryState* query_state_; QueryState* query_state_;
/// References to the thrift structs for this fragment. /// References to the thrift structs for this fragment.
const TPlanFragmentCtx& fragment_ctx_; const TPlanFragment& fragment_;
std::vector<const TPlanFragmentInstanceCtx*> instance_ctxs_; std::vector<const TPlanFragmentInstanceCtx*> instance_ctxs_;
/// References to the protobuf structs for this fragment.
const PlanFragmentCtxPB& fragment_ctx_;
std::vector<const PlanFragmentInstanceCtxPB*> instance_ctx_pbs_;
/// Lives in obj_pool(). Not mutated after being initialized in InitAndCodegen() except /// Lives in obj_pool(). Not mutated after being initialized in InitAndCodegen() except
/// for being closed. /// for being closed.
@@ -181,10 +191,12 @@ class FragmentState {
/// fragment instance to call InvokeCodegen() does the actual codegen work. /// fragment instance to call InvokeCodegen() does the actual codegen work.
bool codegen_invoked_ = false; bool codegen_invoked_ = false;
/// Used by the CreateFragmentStateMap to add TPlanFragmentInstanceCtx thrift objects /// Used by the CreateFragmentStateMap to add the TPlanFragmentInstanceCtx and the
/// for the fragment that this object represents. /// PlanFragmentInstanceCtxPB for the fragment that this object represents.
void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx) { void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx,
const PlanFragmentInstanceCtxPB* instance_ctx_pb) {
instance_ctxs_.push_back(instance_ctx); instance_ctxs_.push_back(instance_ctx);
instance_ctx_pbs_.push_back(instance_ctx_pb);
} }
/// Helper method used by InvokeCodegen(). Does the actual codegen work. /// Helper method used by InvokeCodegen(). Does the actual codegen work.

View File

@@ -89,18 +89,18 @@ Status KrpcDataStreamSenderConfig::Init(
exchange_hash_seed_ = exchange_hash_seed_ =
KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi; KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi;
} }
num_channels_ = state->fragment_ctx().destinations.size(); num_channels_ = state->fragment_ctx().destinations().size();
state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_); state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
return Status::OK(); return Status::OK();
} }
DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* KrpcDataStreamSenderConfig::CreateSink(RuntimeState* state) const {
const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const {
// We have one fragment per sink, so we can use the fragment index as the sink ID. // We have one fragment per sink, so we can use the fragment index as the sink ID.
TDataSinkId sink_id = fragment_ctx.fragment.idx; TDataSinkId sink_id = state->fragment().idx;
return state->obj_pool()->Add(new KrpcDataStreamSender(sink_id, return state->obj_pool()->Add(
fragment_instance_ctx.sender_id, *this, tsink_->stream_sink, new KrpcDataStreamSender(sink_id, state->instance_ctx().sender_id, *this,
fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state)); tsink_->stream_sink, state->fragment_ctx().destinations(),
FLAGS_data_stream_sender_buffer_size, state));
} }
void KrpcDataStreamSenderConfig::Close() { void KrpcDataStreamSenderConfig::Close() {
@@ -148,8 +148,8 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
// data is getting accumulated before being sent; it only applies when data is added via // data is getting accumulated before being sent; it only applies when data is added via
// AddRow() and not sent directly via SendBatch(). // AddRow() and not sent directly via SendBatch().
Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc, Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
const std::string& hostname, const TNetworkAddress& destination, const std::string& hostname, const NetworkAddressPB& destination,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size) const UniqueIdPB& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)
: parent_(parent), : parent_(parent),
row_desc_(row_desc), row_desc_(row_desc),
hostname_(hostname), hostname_(hostname),
@@ -216,8 +216,8 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
// The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver. // The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver.
const std::string hostname_; const std::string hostname_;
const TNetworkAddress address_; const NetworkAddressPB address_;
const TUniqueId fragment_instance_id_; const UniqueIdPB fragment_instance_id_;
const PlanNodeId dest_node_id_; const PlanNodeId dest_node_id_;
// The row batch for accumulating rows copied from AddRow(). // The row batch for accumulating rows copied from AddRow().
@@ -373,7 +373,8 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
// Create a DataStreamService proxy to the destination. // Create a DataStreamService proxy to the destination.
RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_)); RETURN_IF_ERROR(
DataStreamService::GetProxy(FromNetworkAddressPB(address_), hostname_, &proxy_));
return Status::OK(); return Status::OK();
} }
@@ -390,7 +391,7 @@ template <typename ResponsePBType>
void KrpcDataStreamSender::Channel::LogSlowRpc( void KrpcDataStreamSender::Channel::LogSlowRpc(
const char* rpc_name, int64_t total_time_ns, const ResponsePBType& resp) { const char* rpc_name, int64_t total_time_ns, const ResponsePBType& resp) {
int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns(); int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
LOG(INFO) << "Slow " << rpc_name << " RPC to " << TNetworkAddressToString(address_) LOG(INFO) << "Slow " << rpc_name << " RPC to " << address_
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): " << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". " << "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". "
<< "Receiver time: " << "Receiver time: "
@@ -400,7 +401,7 @@ void KrpcDataStreamSender::Channel::LogSlowRpc(
void KrpcDataStreamSender::Channel::LogSlowFailedRpc( void KrpcDataStreamSender::Channel::LogSlowFailedRpc(
const char* rpc_name, int64_t total_time_ns, const kudu::Status& err) { const char* rpc_name, int64_t total_time_ns, const kudu::Status& err) {
LOG(INFO) << "Slow " << rpc_name << " RPC to " << TNetworkAddressToString(address_) LOG(INFO) << "Slow " << rpc_name << " RPC to " << address_
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): " << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". " << "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) << ". "
<< "Error: " << err.ToString(); << "Error: " << err.ToString();
@@ -424,7 +425,7 @@ Status KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock
} }
int64_t elapsed_time_ns = timer.ElapsedTime(); int64_t elapsed_time_ns = timer.ElapsedTime();
if (IsSlowRpc(elapsed_time_ns)) { if (IsSlowRpc(elapsed_time_ns)) {
LOG(INFO) << "Long delay waiting for RPC to " << TNetworkAddressToString(address_) LOG(INFO) << "Long delay waiting for RPC to " << address_
<< " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): " << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
<< "took " << PrettyPrinter::Print(elapsed_time_ns, TUnit::TIME_NS); << "took " << PrettyPrinter::Print(elapsed_time_ns, TUnit::TIME_NS);
} }
@@ -437,9 +438,9 @@ Status KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock
DCHECK(!rpc_in_flight_); DCHECK(!rpc_in_flight_);
if (UNLIKELY(!rpc_status_.ok())) { if (UNLIKELY(!rpc_status_.ok())) {
LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " failed: " LOG(ERROR) << "channel send to " << address_ << " failed: (fragment_instance_id="
<< "(fragment_instance_id=" << PrintId(fragment_instance_id_) << "): " << PrintId(fragment_instance_id_)
<< rpc_status_.GetDetail(); << "): " << rpc_status_.GetDetail();
return rpc_status_; return rpc_status_;
} }
return Status::OK(); return Status::OK();
@@ -523,7 +524,7 @@ void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
DoRpcFn rpc_fn = DoRpcFn rpc_fn =
boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this); boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this);
const string& prepend = const string& prepend =
Substitute("TransmitData() to $0 failed", TNetworkAddressToString(address_)); Substitute("TransmitData() to $0 failed", NetworkAddressPBToString(address_));
HandleFailedRPC(rpc_fn, controller_status, prepend); HandleFailedRPC(rpc_fn, controller_status, prepend);
} }
} }
@@ -534,9 +535,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
// Initialize some constant fields in the request protobuf. // Initialize some constant fields in the request protobuf.
TransmitDataRequestPB req; TransmitDataRequestPB req;
UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id(); *req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
finstance_id_pb->set_lo(fragment_instance_id_.lo);
finstance_id_pb->set_hi(fragment_instance_id_.hi);
req.set_sender_id(parent_->sender_id_); req.set_sender_id(parent_->sender_id_);
req.set_dest_node_id(dest_node_id_); req.set_dest_node_id(dest_node_id_);
@@ -641,7 +640,7 @@ void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
DoRpcFn rpc_fn = DoRpcFn rpc_fn =
boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this); boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this);
const string& prepend = const string& prepend =
Substitute("EndDataStream() to $0 failed", TNetworkAddressToString(address_)); Substitute("EndDataStream() to $0 failed", NetworkAddressPBToString(address_));
HandleFailedRPC(rpc_fn, controller_status, prepend); HandleFailedRPC(rpc_fn, controller_status, prepend);
} }
} }
@@ -650,9 +649,7 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
DCHECK(rpc_in_flight_); DCHECK(rpc_in_flight_);
EndDataStreamRequestPB eos_req; EndDataStreamRequestPB eos_req;
rpc_controller_.Reset(); rpc_controller_.Reset();
UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id(); *eos_req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
finstance_id_pb->set_lo(fragment_instance_id_.lo);
finstance_id_pb->set_hi(fragment_instance_id_.hi);
eos_req.set_sender_id(parent_->sender_id_); eos_req.set_sender_id(parent_->sender_id_);
eos_req.set_dest_node_id(dest_node_id_); eos_req.set_dest_node_id(dest_node_id_);
eos_resp_.Clear(); eos_resp_.Clear();
@@ -710,7 +707,7 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink, const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations, const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& destinations,
int per_channel_buffer_size, RuntimeState* state) int per_channel_buffer_size, RuntimeState* state)
: DataSink(sink_id, sink_config, : DataSink(sink_id, sink_config,
Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state), Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), state),
@@ -728,11 +725,10 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
|| sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANDOM
|| sink.output_partition.type == TPartitionType::KUDU); || sink.output_partition.type == TPartitionType::KUDU);
for (int i = 0; i < destinations.size(); ++i) { for (const auto& destination : destinations) {
channels_.emplace_back( channels_.emplace_back(new Channel(this, row_desc_,
new Channel(this, row_desc_, destinations[i].thrift_backend.hostname, destination.thrift_backend().hostname(), destination.krpc_backend(),
destinations[i].krpc_backend, destinations[i].fragment_instance_id, destination.fragment_instance_id(), sink.dest_node_id, per_channel_buffer_size));
sink.dest_node_id, per_channel_buffer_size));
} }
if (partition_type_ == TPartitionType::UNPARTITIONED if (partition_type_ == TPartitionType::UNPARTITIONED

View File

@@ -38,13 +38,11 @@ class MemTracker;
class RowDescriptor; class RowDescriptor;
class TDataStreamSink; class TDataStreamSink;
class TNetworkAddress; class TNetworkAddress;
class TPlanFragmentDestination; class PlanFragmentDestinationPB;
class KrpcDataStreamSenderConfig : public DataSinkConfig { class KrpcDataStreamSenderConfig : public DataSinkConfig {
public: public:
DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx, DataSink* CreateSink(RuntimeState* state) const override;
const TPlanFragmentInstanceCtx& fragment_instance_ctx,
RuntimeState* state) const override;
void Close() override; void Close() override;
/// Codegen KrpcDataStreamSender::HashAndAddRows() if partitioning type is /// Codegen KrpcDataStreamSender::HashAndAddRows() if partitioning type is
@@ -108,7 +106,7 @@ class KrpcDataStreamSender : public DataSink {
/// and RANDOM. /// and RANDOM.
KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink, const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations, const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& destinations,
int per_channel_buffer_size, RuntimeState* state); int per_channel_buffer_size, RuntimeState* state);
virtual ~KrpcDataStreamSender(); virtual ~KrpcDataStreamSender();

View File

@@ -209,10 +209,16 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
// don't copy query_ctx, it's large and we already did that in the c'tor // don't copy query_ctx, it's large and we already did that in the c'tor
exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx()); exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
exec_rpc_params_.mutable_fragment_ctxs()->Swap(
const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
&exec_rpc_params->fragment_ctxs()));
exec_rpc_params_.mutable_fragment_instance_ctxs()->Swap(
const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentInstanceCtxPB>*>(
&exec_rpc_params->fragment_instance_ctxs()));
TExecPlanFragmentInfo& non_const_fragment_info = TExecPlanFragmentInfo& non_const_fragment_info =
const_cast<TExecPlanFragmentInfo&>(fragment_info); const_cast<TExecPlanFragmentInfo&>(fragment_info);
fragment_info_.fragment_ctxs.swap(non_const_fragment_info.fragment_ctxs); fragment_info_.fragments.swap(non_const_fragment_info.fragments);
fragment_info_.__isset.fragment_ctxs = true; fragment_info_.__isset.fragments = true;
fragment_info_.fragment_instance_ctxs.swap( fragment_info_.fragment_instance_ctxs.swap(
non_const_fragment_info.fragment_instance_ctxs); non_const_fragment_info.fragment_instance_ctxs);
fragment_info_.__isset.fragment_instance_ctxs = true; fragment_info_.__isset.fragment_instance_ctxs = true;
@@ -286,13 +292,12 @@ bool VerifyFiltersProduced(const vector<TPlanFragmentInstanceCtx>& instance_ctxs
Status QueryState::InitFilterBank() { Status QueryState::InitFilterBank() {
int64_t runtime_filters_reservation_bytes = 0; int64_t runtime_filters_reservation_bytes = 0;
int fragment_ctx_idx = -1; int fragment_ctx_idx = -1;
const vector<TPlanFragmentCtx>& fragment_ctxs = fragment_info_.fragment_ctxs; const vector<TPlanFragment>& fragments = fragment_info_.fragments;
const vector<TPlanFragmentInstanceCtx>& instance_ctxs = const vector<TPlanFragmentInstanceCtx>& instance_ctxs =
fragment_info_.fragment_instance_ctxs; fragment_info_.fragment_instance_ctxs;
// Add entries for all produced and consumed filters. // Add entries for all produced and consumed filters.
unordered_map<int32_t, FilterRegistration> filters; unordered_map<int32_t, FilterRegistration> filters;
for (const TPlanFragmentCtx& fragment_ctx : fragment_ctxs) { for (const TPlanFragment& fragment : fragments) {
const TPlanFragment& fragment = fragment_ctx.fragment;
for (const TPlanNode& plan_node : fragment.plan.nodes) { for (const TPlanNode& plan_node : fragment.plan.nodes) {
if (!plan_node.__isset.runtime_filters) continue; if (!plan_node.__isset.runtime_filters) continue;
for (const TRuntimeFilterDesc& filter : plan_node.runtime_filters) { for (const TRuntimeFilterDesc& filter : plan_node.runtime_filters) {
@@ -315,15 +320,15 @@ Status QueryState::InitFilterBank() {
<< "Filters produced by all instances on the same backend should be the same"; << "Filters produced by all instances on the same backend should be the same";
for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) { for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
bool first_instance_of_fragment = fragment_ctx_idx == -1 bool first_instance_of_fragment = fragment_ctx_idx == -1
|| fragment_ctxs[fragment_ctx_idx].fragment.idx != instance_ctx.fragment_idx; || fragments[fragment_ctx_idx].idx != instance_ctx.fragment_idx;
if (first_instance_of_fragment) { if (first_instance_of_fragment) {
++fragment_ctx_idx; ++fragment_ctx_idx;
DCHECK_EQ(fragment_ctxs[fragment_ctx_idx].fragment.idx, instance_ctx.fragment_idx); DCHECK_EQ(fragments[fragment_ctx_idx].idx, instance_ctx.fragment_idx);
} }
// TODO: this over-reserves memory a bit in a couple of cases: // TODO: this over-reserves memory a bit in a couple of cases:
// * if different fragments on this backend consume or produce the same filter. // * if different fragments on this backend consume or produce the same filter.
// * if a finstance was chosen not to produce a global broadcast filter. // * if a finstance was chosen not to produce a global broadcast filter.
const TPlanFragment& fragment = fragment_ctxs[fragment_ctx_idx].fragment; const TPlanFragment& fragment = fragments[fragment_ctx_idx];
runtime_filters_reservation_bytes += runtime_filters_reservation_bytes +=
fragment.produced_runtime_filters_reservation_bytes; fragment.produced_runtime_filters_reservation_bytes;
if (first_instance_of_fragment) { if (first_instance_of_fragment) {
@@ -617,7 +622,7 @@ bool QueryState::StartFInstances() {
DCHECK_GT(refcnt_.Load(), 0); DCHECK_GT(refcnt_.Load(), 0);
DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()"; DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0); DCHECK_GT(fragment_info_.fragments.size(), 0);
vector<unique_ptr<Thread>> codegen_threads; vector<unique_ptr<Thread>> codegen_threads;
int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size(); int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
@@ -629,16 +634,20 @@ bool QueryState::StartFInstances() {
VLOG(2) << "descriptor table for query=" << PrintId(query_id()) VLOG(2) << "descriptor table for query=" << PrintId(query_id())
<< "\n" << desc_tbl_->DebugString(); << "\n" << desc_tbl_->DebugString();
start_finstances_status = start_finstances_status = FragmentState::CreateFragmentStateMap(
FragmentState::CreateFragmentStateMap(fragment_info_, this, fragment_state_map_); fragment_info_, exec_rpc_params_, this, fragment_state_map_);
if (UNLIKELY(!start_finstances_status.ok())) goto error; if (UNLIKELY(!start_finstances_status.ok())) goto error;
fragment_events_start_time_ = MonotonicStopWatch::Now(); fragment_events_start_time_ = MonotonicStopWatch::Now();
for (auto& fragment : fragment_state_map_) { for (auto& fragment : fragment_state_map_) {
FragmentState* fragment_state = fragment.second; FragmentState* fragment_state = fragment.second;
for (const TPlanFragmentInstanceCtx* instance_ctx : fragment_state->instance_ctxs()) { for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i) {
FragmentInstanceState* fis = const TPlanFragmentInstanceCtx* instance_ctx = fragment_state->instance_ctxs()[i];
obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx)); const PlanFragmentInstanceCtxPB* instance_ctx_pb =
fragment_state->instance_ctx_pbs()[i];
DCHECK_EQ(instance_ctx->fragment_idx, instance_ctx_pb->fragment_idx());
FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(
this, fragment_state, *instance_ctx, *instance_ctx_pb));
// start new thread to execute instance // start new thread to execute instance
refcnt_.Add(1); // decremented in ExecFInstance() refcnt_.Add(1); // decremented in ExecFInstance()

View File

@@ -205,12 +205,12 @@ Status RowBatch::FromProtobuf(const RowDescriptor* row_desc,
row_batch->num_rows_ = header.num_rows(); row_batch->num_rows_ = header.num_rows();
row_batch->capacity_ = header.num_rows(); row_batch->capacity_ = header.num_rows();
const CompressionType& compression_type = header.compression_type(); const CompressionTypePB& compression_type = header.compression_type();
DCHECK(compression_type == CompressionType::NONE || DCHECK(compression_type == CompressionTypePB::NONE ||
compression_type == CompressionType::LZ4) compression_type == CompressionTypePB::LZ4)
<< "Unexpected compression type: " << compression_type; << "Unexpected compression type: " << compression_type;
row_batch->Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size, row_batch->Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size,
compression_type == CompressionType::LZ4, tuple_data); compression_type == CompressionTypePB::LZ4, tuple_data);
*row_batch_ptr = std::move(row_batch); *row_batch_ptr = std::move(row_batch);
return Status::OK(); return Status::OK();
} }
@@ -266,7 +266,7 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch) {
header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size()); header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size());
header->set_uncompressed_size(uncompressed_size); header->set_uncompressed_size(uncompressed_size);
header->set_compression_type( header->set_compression_type(
is_compressed ? CompressionType::LZ4 : CompressionType::NONE); is_compressed ? CompressionTypePB::LZ4 : CompressionTypePB::NONE);
return Status::OK(); return Status::OK();
} }

View File

@@ -70,11 +70,15 @@ namespace impala {
const char* RuntimeState::LLVM_CLASS_NAME = "class.impala::RuntimeState"; const char* RuntimeState::LLVM_CLASS_NAME = "class.impala::RuntimeState";
RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragment& fragment,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env) const TPlanFragmentInstanceCtx& instance_ctx,
const PlanFragmentCtxPB& fragment_ctx,
const PlanFragmentInstanceCtxPB& instance_ctx_pb, ExecEnv* exec_env)
: query_state_(query_state), : query_state_(query_state),
fragment_ctx_(&fragment_ctx), fragment_(&fragment),
instance_ctx_(&instance_ctx), instance_ctx_(&instance_ctx),
fragment_ctx_(&fragment_ctx),
instance_ctx_pb_(&instance_ctx_pb),
now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat( now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
query_state->query_ctx().now_string))), query_state->query_ctx().now_string))),
utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat( utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
@@ -97,8 +101,10 @@ RuntimeState::RuntimeState(
qctx.client_request.query_options.mem_limit : qctx.client_request.query_options.mem_limit :
-1, -1,
"test-pool")), "test-pool")),
fragment_ctx_(nullptr), fragment_(nullptr),
instance_ctx_(nullptr), instance_ctx_(nullptr),
fragment_ctx_(nullptr),
instance_ctx_pb_(nullptr),
local_query_state_(query_state_), local_query_state_(query_state_),
now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(qctx.now_string))), now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(qctx.now_string))),
utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat( utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
@@ -131,9 +137,9 @@ void RuntimeState::Init() {
// Register with the thread mgr // Register with the thread mgr
resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool(); resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
DCHECK(resource_pool_ != nullptr); DCHECK(resource_pool_ != nullptr);
if (fragment_ctx_ != nullptr) { if (fragment_ != nullptr) {
// Ensure that the planner correctly determined the required threads. // Ensure that the planner correctly determined the required threads.
resource_pool_->set_max_required_threads(fragment_ctx_->fragment.thread_reservation); resource_pool_->set_max_required_threads(fragment_->thread_reservation);
} }
total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads"); total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads");
@@ -330,14 +336,12 @@ void RuntimeState::ReleaseResources() {
released_resources_ = true; released_resources_ = true;
} }
void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) { void RuntimeState::SetRPCErrorInfo(NetworkAddressPB dest_node, int16_t posix_error_code) {
std::lock_guard<SpinLock> l(aux_error_info_lock_); std::lock_guard<SpinLock> l(aux_error_info_lock_);
if (aux_error_info_ == nullptr && !reported_aux_error_info_) { if (aux_error_info_ == nullptr && !reported_aux_error_info_) {
aux_error_info_.reset(new AuxErrorInfoPB()); aux_error_info_.reset(new AuxErrorInfoPB());
RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info(); RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node(); *rpc_error_info->mutable_dest_node() = dest_node;
network_addr->set_hostname(dest_node.hostname);
network_addr->set_port(dest_node.port);
rpc_error_info->set_posix_error_code(posix_error_code); rpc_error_info->set_posix_error_code(posix_error_code);
} }
} }

View File

@@ -54,7 +54,7 @@ class ThreadResourcePool;
class TUniqueId; class TUniqueId;
class ExecEnv; class ExecEnv;
class HBaseTableFactory; class HBaseTableFactory;
class TPlanFragmentCtx; class TPlanFragment;
class TPlanFragmentInstanceCtx; class TPlanFragmentInstanceCtx;
class QueryState; class QueryState;
class ConditionVariable; class ConditionVariable;
@@ -81,8 +81,10 @@ class RuntimeState {
public: public:
/// query_state, fragment_ctx, and instance_ctx need to be alive at least as long as /// query_state, fragment_ctx, and instance_ctx need to be alive at least as long as
/// the constructed RuntimeState /// the constructed RuntimeState
RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, RuntimeState(QueryState* query_state, const TPlanFragment& fragment,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env); const TPlanFragmentInstanceCtx& instance_ctx,
const PlanFragmentCtxPB& fragment_ctx,
const PlanFragmentInstanceCtxPB& instance_ctx_pb, ExecEnv* exec_env);
/// RuntimeState for test execution and fe-support.cc. Creates its own QueryState and /// RuntimeState for test execution and fe-support.cc. Creates its own QueryState and
/// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it to "test-pool". /// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it to "test-pool".
@@ -102,8 +104,10 @@ class RuntimeState {
bool strict_mode() const { return query_options().strict_mode; } bool strict_mode() const { return query_options().strict_mode; }
bool decimal_v2() const { return query_options().decimal_v2; } bool decimal_v2() const { return query_options().decimal_v2; }
const TQueryCtx& query_ctx() const; const TQueryCtx& query_ctx() const;
const TPlanFragmentCtx& fragment_ctx() const { return *fragment_ctx_; } const TPlanFragment& fragment() const { return *fragment_; }
const TPlanFragmentInstanceCtx& instance_ctx() const { return *instance_ctx_; } const TPlanFragmentInstanceCtx& instance_ctx() const { return *instance_ctx_; }
const PlanFragmentCtxPB& fragment_ctx() const { return *fragment_ctx_; }
const PlanFragmentInstanceCtxPB& instance_ctx_pb() const { return *instance_ctx_pb_; }
const TUniqueId& session_id() const { return query_ctx().session.session_id; } const TUniqueId& session_id() const { return query_ctx().session.session_id; }
const std::string& do_as_user() const { return query_ctx().session.delegated_user; } const std::string& do_as_user() const { return query_ctx().session.delegated_user; }
const std::string& connected_user() const { const std::string& connected_user() const {
@@ -275,7 +279,7 @@ class RuntimeState {
/// the posix error code of the failed RPC. The target node address and posix error code /// the posix error code of the failed RPC. The target node address and posix error code
/// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is /// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This method is
/// idempotent. /// idempotent.
void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code); void SetRPCErrorInfo(NetworkAddressPB dest_node, int16_t posix_error_code);
/// Returns true if this RuntimeState has any auxiliary error information, false /// Returns true if this RuntimeState has any auxiliary error information, false
/// otherwise. Currently, only SetRPCErrorInfo() sets aux error info. /// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
@@ -308,8 +312,10 @@ class RuntimeState {
/// Global QueryState and original thrift descriptors for this fragment instance. /// Global QueryState and original thrift descriptors for this fragment instance.
QueryState* const query_state_; QueryState* const query_state_;
const TPlanFragmentCtx* const fragment_ctx_; const TPlanFragment* const fragment_;
const TPlanFragmentInstanceCtx* const instance_ctx_; const TPlanFragmentInstanceCtx* const instance_ctx_;
const PlanFragmentCtxPB* const fragment_ctx_;
const PlanFragmentInstanceCtxPB* const instance_ctx_pb_;
/// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor /// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor
boost::scoped_ptr<QueryState> local_query_state_; boost::scoped_ptr<QueryState> local_query_state_;

View File

@@ -162,17 +162,21 @@ Status TestEnv::CreateQueryState(
ExecQueryFInstancesRequestPB rpc_params; ExecQueryFInstancesRequestPB rpc_params;
// create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
rpc_params.set_coord_state_idx(0); rpc_params.set_coord_state_idx(0);
rpc_params.add_fragment_ctxs();
rpc_params.add_fragment_instance_ctxs();
TExecPlanFragmentInfo fragment_info; TExecPlanFragmentInfo fragment_info;
fragment_info.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()})); fragment_info.__set_fragments(vector<TPlanFragment>({TPlanFragment()}));
fragment_info.__set_fragment_instance_ctxs( fragment_info.__set_fragment_instance_ctxs(
vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()})); vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info)); RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
FragmentState* frag_state = FragmentState* frag_state = qs->obj_pool()->Add(new FragmentState(
qs->obj_pool()->Add(new FragmentState(qs, qs->fragment_info_.fragment_ctxs[0])); qs, qs->fragment_info_.fragments[0], qs->exec_rpc_params_.fragment_ctxs(0)));
FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs, FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
frag_state, qs->fragment_info_.fragment_instance_ctxs[0])); frag_state, qs->fragment_info_.fragment_instance_ctxs[0],
RuntimeState* rs = qs->obj_pool()->Add( qs->exec_rpc_params_.fragment_instance_ctxs(0)));
new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get())); RuntimeState* rs =
qs->obj_pool()->Add(new RuntimeState(qs, fis->fragment(), fis->instance_ctx(),
fis->fragment_ctx(), fis->instance_ctx_pb(), exec_env_.get()));
runtime_states_.push_back(rs); runtime_states_.push_back(rs);
*runtime_state = rs; *runtime_state = rs;

View File

@@ -25,6 +25,7 @@
#include "common/global-types.h" #include "common/global-types.h"
#include "gen-cpp/Frontend_types.h" #include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress #include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "gen-cpp/control_service.pb.h"
#include "gen-cpp/statestore_service.pb.h" #include "gen-cpp/statestore_service.pb.h"
#include "util/container-util.h" #include "util/container-util.h"
#include "util/runtime-profile.h" #include "util/runtime-profile.h"
@@ -35,7 +36,7 @@ struct FragmentExecParams;
struct FInstanceExecParams; struct FInstanceExecParams;
/// map from scan node id to a list of scan ranges /// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; typedef std::map<TPlanNodeId, std::vector<ScanRangeParamsPB>> PerNodeScanRanges;
/// map from an impalad host address to the per-node assigned scan ranges; /// map from an impalad host address to the per-node assigned scan ranges;
/// records scan range assignment for a single fragment /// records scan range assignment for a single fragment
@@ -104,7 +105,7 @@ struct FInstanceExecParams {
int sender_id = -1; int sender_id = -1;
// List of input join build finstances for joins in this finstance. // List of input join build finstances for joins in this finstance.
std::vector<TJoinBuildInput> join_build_inputs; std::vector<JoinBuildInputPB> join_build_inputs;
// If this is a join build fragment, the number of fragment instances that consume the // If this is a join build fragment, the number of fragment instances that consume the
// join build. -1 = invalid. // join build. -1 = invalid.
@@ -127,7 +128,7 @@ struct FInstanceExecParams {
/// Execution parameters shared between fragment instances /// Execution parameters shared between fragment instances
struct FragmentExecParams { struct FragmentExecParams {
/// output destinations of this fragment /// output destinations of this fragment
std::vector<TPlanFragmentDestination> destinations; std::vector<PlanFragmentDestinationPB> destinations;
/// map from node id to the number of senders (node id expected to be for an /// map from node id to the number of senders (node id expected to be for an
/// ExchangeNode) /// ExchangeNode)

View File

@@ -554,16 +554,16 @@ void Result::ProcessAssignments(const AssignmentCallback& cb) const {
const TNetworkAddress& addr = assignment_elem.first; const TNetworkAddress& addr = assignment_elem.first;
const PerNodeScanRanges& per_node_ranges = assignment_elem.second; const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
for (const auto& per_node_ranges_elem : per_node_ranges) { for (const auto& per_node_ranges_elem : per_node_ranges) {
const vector<TScanRangeParams> scan_range_params_vector = const vector<ScanRangeParamsPB> scan_range_params_vector =
per_node_ranges_elem.second; per_node_ranges_elem.second;
for (const TScanRangeParams& scan_range_params : scan_range_params_vector) { for (const ScanRangeParamsPB& scan_range_params : scan_range_params_vector) {
const TScanRange& scan_range = scan_range_params.scan_range; const ScanRangePB& scan_range = scan_range_params.scan_range();
DCHECK(scan_range.__isset.hdfs_file_split); DCHECK(scan_range.has_hdfs_file_split());
const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split; const HdfsFileSplitPB& hdfs_file_split = scan_range.hdfs_file_split();
bool try_hdfs_cache = scan_range_params.__isset.try_hdfs_cache ? bool try_hdfs_cache = scan_range_params.has_try_hdfs_cache() ?
scan_range_params.try_hdfs_cache : false; scan_range_params.try_hdfs_cache() : false;
bool is_remote = bool is_remote =
scan_range_params.__isset.is_remote ? scan_range_params.is_remote : false; scan_range_params.has_is_remote() ? scan_range_params.is_remote() : false;
cb({addr, hdfs_file_split, try_hdfs_cache, is_remote}); cb({addr, hdfs_file_split, try_hdfs_cache, is_remote});
} }
} }
@@ -583,7 +583,7 @@ int Result::CountAssignmentsIf(const AssignmentFilter& filter) const {
int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const { int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const {
int64_t assigned_bytes = 0; int64_t assigned_bytes = 0;
AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) { AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) {
if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length; if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length();
}; };
ProcessAssignments(cb); ProcessAssignments(cb);
return assigned_bytes; return assigned_bytes;
@@ -603,7 +603,7 @@ void Result::CountAssignedBytesPerBackend(
AssignmentCallback cb = [&num_assignments_per_backend]( AssignmentCallback cb = [&num_assignments_per_backend](
const AssignmentInfo& assignment) { const AssignmentInfo& assignment) {
(*num_assignments_per_backend)[assignment.addr.hostname] += (*num_assignments_per_backend)[assignment.addr.hostname] +=
assignment.hdfs_file_split.length; assignment.hdfs_file_split.length();
}; };
ProcessAssignments(cb); ProcessAssignments(cb);
} }

View File

@@ -34,6 +34,7 @@
namespace impala { namespace impala {
class ClusterMembershipMgr; class ClusterMembershipMgr;
class HdfsFileSplitPB;
class Scheduler; class Scheduler;
class TTopicDelta; class TTopicDelta;
@@ -364,7 +365,7 @@ class Result {
/// Parameter type for callbacks, which are used to filter scheduling results. /// Parameter type for callbacks, which are used to filter scheduling results.
struct AssignmentInfo { struct AssignmentInfo {
const TNetworkAddress& addr; const TNetworkAddress& addr;
const THdfsFileSplit& hdfs_file_split; const HdfsFileSplitPB& hdfs_file_split;
bool is_cached; bool is_cached;
bool is_remote; bool is_remote;
}; };

View File

@@ -19,6 +19,7 @@
#include <random> #include <random>
#include "common/logging.h" #include "common/logging.h"
#include "gen-cpp/control_service.pb.h"
#include "scheduling/cluster-membership-mgr.h" #include "scheduling/cluster-membership-mgr.h"
#include "scheduling/scheduler.h" #include "scheduling/scheduler.h"
#include "scheduling/scheduler-test-util.h" #include "scheduling/scheduler-test-util.h"
@@ -727,36 +728,37 @@ TEST_F(SchedulerTest, TestExecAtCoordWithoutLocalBackend) {
// of the algorithm. // of the algorithm.
TEST_F(SchedulerTest, TestMultipleFinstances) { TEST_F(SchedulerTest, TestMultipleFinstances) {
const int NUM_RANGES = 16; const int NUM_RANGES = 16;
std::vector<TScanRangeParams> fs_ranges(NUM_RANGES); std::vector<ScanRangeParamsPB> fs_ranges(NUM_RANGES);
std::vector<TScanRangeParams> kudu_ranges(NUM_RANGES); std::vector<ScanRangeParamsPB> kudu_ranges(NUM_RANGES);
// Create ranges with lengths 1, 2, ..., etc. // Create ranges with lengths 1, 2, ..., etc.
for (int i = 0; i < NUM_RANGES; ++i) { for (int i = 0; i < NUM_RANGES; ++i) {
fs_ranges[i].scan_range.__set_hdfs_file_split(THdfsFileSplit()); *fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split() = HdfsFileSplitPB();
fs_ranges[i].scan_range.hdfs_file_split.length = i + 1; fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split()->set_length(i + 1);
kudu_ranges[i].scan_range.__set_kudu_scan_token("fake token"); kudu_ranges[i].mutable_scan_range()->set_kudu_scan_token("fake token");
} }
// Test handling of the single instance case - all ranges go to the same instance. // Test handling of the single instance case - all ranges go to the same instance.
vector<vector<TScanRangeParams>> fs_one_instance = vector<vector<ScanRangeParamsPB>> fs_one_instance =
Scheduler::AssignRangesToInstances(1, &fs_ranges); Scheduler::AssignRangesToInstances(1, &fs_ranges);
ASSERT_EQ(1, fs_one_instance.size()); ASSERT_EQ(1, fs_one_instance.size());
EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size()); EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
vector<vector<TScanRangeParams>> kudu_one_instance = vector<vector<ScanRangeParamsPB>> kudu_one_instance =
Scheduler::AssignRangesToInstances(1, &kudu_ranges); Scheduler::AssignRangesToInstances(1, &kudu_ranges);
ASSERT_EQ(1, kudu_one_instance.size()); ASSERT_EQ(1, kudu_one_instance.size());
EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size()); EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
// Ensure that each executor gets one range regardless of input order. // Ensure that each executor gets one range regardless of input order.
for (int attempt = 0; attempt < 20; ++attempt) { for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance = vector<vector<ScanRangeParamsPB>> range_per_instance =
Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges); Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
EXPECT_EQ(NUM_RANGES, range_per_instance.size()); EXPECT_EQ(NUM_RANGES, range_per_instance.size());
// Confirm each range is present and each instance got exactly one range. // Confirm each range is present and each instance got exactly one range.
vector<int> range_length_count(NUM_RANGES); vector<int> range_length_count(NUM_RANGES);
for (const auto& instance_ranges : range_per_instance) { for (const auto& instance_ranges : range_per_instance) {
ASSERT_EQ(1, instance_ranges.size()); ASSERT_EQ(1, instance_ranges.size());
++range_length_count[instance_ranges[0].scan_range.hdfs_file_split.length - 1]; ++range_length_count[instance_ranges[0].scan_range().hdfs_file_split().length()
- 1];
} }
for (int i = 0; i < NUM_RANGES; ++i) { for (int i = 0; i < NUM_RANGES; ++i) {
EXPECT_EQ(1, range_length_count[i]) << i; EXPECT_EQ(1, range_length_count[i]) << i;
@@ -767,7 +769,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
// across the instances regardless of input order. // across the instances regardless of input order.
for (int attempt = 0; attempt < 20; ++attempt) { for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance = vector<vector<ScanRangeParamsPB>> range_per_instance =
Scheduler::AssignRangesToInstances(4, &fs_ranges); Scheduler::AssignRangesToInstances(4, &fs_ranges);
EXPECT_EQ(4, range_per_instance.size()); EXPECT_EQ(4, range_per_instance.size());
// Ensure we got a range of each length in the output. // Ensure we got a range of each length in the output.
@@ -776,8 +778,8 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
EXPECT_EQ(4, instance_ranges.size()); EXPECT_EQ(4, instance_ranges.size());
int64_t instance_bytes = 0; int64_t instance_bytes = 0;
for (const auto& range : instance_ranges) { for (const auto& range : instance_ranges) {
instance_bytes += range.scan_range.hdfs_file_split.length; instance_bytes += range.scan_range().hdfs_file_split().length();
++range_length_count[range.scan_range.hdfs_file_split.length - 1]; ++range_length_count[range.scan_range().hdfs_file_split().length() - 1];
} }
// Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are
// distributed evenly. // distributed evenly.
@@ -793,13 +795,13 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
// range, so we just need to check the # of ranges. // range, so we just need to check the # of ranges.
for (int attempt = 0; attempt < 20; ++attempt) { for (int attempt = 0; attempt < 20; ++attempt) {
std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_); std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
vector<vector<TScanRangeParams>> range_per_instance = vector<vector<ScanRangeParamsPB>> range_per_instance =
Scheduler::AssignRangesToInstances(4, &kudu_ranges); Scheduler::AssignRangesToInstances(4, &kudu_ranges);
EXPECT_EQ(4, range_per_instance.size()); EXPECT_EQ(4, range_per_instance.size());
for (const auto& instance_ranges : range_per_instance) { for (const auto& instance_ranges : range_per_instance) {
EXPECT_EQ(4, instance_ranges.size()); EXPECT_EQ(4, instance_ranges.size());
for (const auto& range : instance_ranges) { for (const auto& range : instance_ranges) {
EXPECT_TRUE(range.scan_range.__isset.kudu_scan_token); EXPECT_TRUE(range.scan_range().has_kudu_scan_token());
} }
} }
} }

View File

@@ -38,6 +38,7 @@
#include "scheduling/executor-group.h" #include "scheduling/executor-group.h"
#include "scheduling/hash-ring.h" #include "scheduling/hash-ring.h"
#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp" #include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
#include "util/compression-util.h"
#include "util/debug-util.h" #include "util/debug-util.h"
#include "util/flat_buffer.h" #include "util/flat_buffer.h"
#include "util/hash-util.h" #include "util/hash-util.h"
@@ -45,6 +46,7 @@
#include "util/network-util.h" #include "util/network-util.h"
#include "util/pretty-printer.h" #include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h" #include "util/runtime-profile-counters.h"
#include "util/uid-util.h"
#include "common/names.h" #include "common/names.h"
@@ -226,14 +228,15 @@ void Scheduler::ComputeFragmentExecParams(
// populate src_params->destinations // populate src_params->destinations
src_params->destinations.resize(dest_params->instance_exec_params.size()); src_params->destinations.resize(dest_params->instance_exec_params.size());
for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) { for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) {
TPlanFragmentDestination& dest = src_params->destinations[i]; PlanFragmentDestinationPB& dest = src_params->destinations[i];
dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id); TUniqueIdToUniqueIdPB(dest_params->instance_exec_params[i].instance_id,
dest.mutable_fragment_instance_id());
const TNetworkAddress& host = dest_params->instance_exec_params[i].host; const TNetworkAddress& host = dest_params->instance_exec_params[i].host;
dest.__set_thrift_backend(host); *dest.mutable_thrift_backend() = FromTNetworkAddress(host);
const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, host); const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, host);
DCHECK(desc.has_krpc_address()); DCHECK(desc.has_krpc_address());
DCHECK(IsResolvedAddress(desc.krpc_address())); DCHECK(IsResolvedAddress(desc.krpc_address()));
dest.__set_krpc_backend(FromNetworkAddressPB(desc.krpc_address())); *dest.mutable_krpc_backend() = desc.krpc_address();
} }
// enumerate senders consecutively; // enumerate senders consecutively;
@@ -314,9 +317,9 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
/// Returns a numeric weight that is proportional to the estimated processing time for /// Returns a numeric weight that is proportional to the estimated processing time for
/// the scan range represented by 'params'. Weights from different scan node /// the scan range represented by 'params'. Weights from different scan node
/// implementations, e.g. FS vs Kudu, are not comparable. /// implementations, e.g. FS vs Kudu, are not comparable.
static int64_t ScanRangeWeight(const TScanRangeParams& params) { static int64_t ScanRangeWeight(const ScanRangeParamsPB& params) {
if (params.scan_range.__isset.hdfs_file_split) { if (params.scan_range().has_hdfs_file_split()) {
return params.scan_range.hdfs_file_split.length; return params.scan_range().hdfs_file_split().length();
} else { } else {
// Give equal weight to each Kudu and Hbase split. // Give equal weight to each Kudu and Hbase split.
// TODO: implement more accurate logic for Kudu and Hbase // TODO: implement more accurate logic for Kudu and Hbase
@@ -429,7 +432,7 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
// The inner vectors are the output of AssignRangesToInstances(). // The inner vectors are the output of AssignRangesToInstances().
// The vector may be ragged - i.e. different nodes have different numbers // The vector may be ragged - i.e. different nodes have different numbers
// of instances. // of instances.
vector<vector<vector<TScanRangeParams>>> per_scan_per_instance_ranges; vector<vector<vector<ScanRangeParamsPB>>> per_scan_per_instance_ranges;
for (TPlanNodeId scan_node_id : scan_node_ids) { for (TPlanNodeId scan_node_id : scan_node_ids) {
// Ensure empty list is created if no scan ranges are scheduled on this host. // Ensure empty list is created if no scan ranges are scheduled on this host.
per_scan_per_instance_ranges.emplace_back(); per_scan_per_instance_ranges.emplace_back();
@@ -478,8 +481,8 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
} }
} }
vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances( vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
int max_num_instances, vector<TScanRangeParams>* ranges) { int max_num_instances, vector<ScanRangeParamsPB>* ranges) {
// We need to assign scan ranges to instances. We would like the assignment to be // We need to assign scan ranges to instances. We would like the assignment to be
// as even as possible, so that each instance does about the same amount of work. // as even as possible, so that each instance does about the same amount of work.
// Use longest-processing time (LPT) algorithm, which is a good approximation of the // Use longest-processing time (LPT) algorithm, which is a good approximation of the
@@ -488,7 +491,7 @@ vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
// to each instance. // to each instance.
DCHECK_GT(max_num_instances, 0); DCHECK_GT(max_num_instances, 0);
int num_instances = min(max_num_instances, static_cast<int>(ranges->size())); int num_instances = min(max_num_instances, static_cast<int>(ranges->size()));
vector<vector<TScanRangeParams>> per_instance_ranges(num_instances); vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances);
if (num_instances < 2) { if (num_instances < 2) {
// Short-circuit the assignment algorithm for the single instance case. // Short-circuit the assignment algorithm for the single instance case.
per_instance_ranges[0] = *ranges; per_instance_ranges[0] = *ranges;
@@ -502,10 +505,10 @@ vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
instance_heap.emplace_back(InstanceAssignment{0, i}); instance_heap.emplace_back(InstanceAssignment{0, i});
} }
std::sort(ranges->begin(), ranges->end(), std::sort(ranges->begin(), ranges->end(),
[](const TScanRangeParams& a, const TScanRangeParams& b) { [](const ScanRangeParamsPB& a, const ScanRangeParamsPB& b) {
return ScanRangeWeight(a) > ScanRangeWeight(b); return ScanRangeWeight(a) > ScanRangeWeight(b);
}); });
for (TScanRangeParams& range : *ranges) { for (ScanRangeParamsPB& range : *ranges) {
per_instance_ranges[instance_heap[0].instance_idx].push_back(range); per_instance_ranges[instance_heap[0].instance_idx].push_back(range);
instance_heap[0].weight += ScanRangeWeight(range); instance_heap[0].weight += ScanRangeWeight(range);
pop_heap(instance_heap.begin(), instance_heap.end()); pop_heap(instance_heap.begin(), instance_heap.end());
@@ -554,9 +557,10 @@ void Scheduler::CreateCollocatedJoinBuildInstances(
parent_exec_params.krpc_host, per_fragment_instance_idx++, *fragment_params); parent_exec_params.krpc_host, per_fragment_instance_idx++, *fragment_params);
instance_exec_params->back().num_join_build_outputs = 0; instance_exec_params->back().num_join_build_outputs = 0;
} }
TJoinBuildInput build_input; JoinBuildInputPB build_input;
build_input.__set_join_node_id(sink.dest_node_id); build_input.set_join_node_id(sink.dest_node_id);
build_input.__set_input_finstance_id(instance_exec_params->back().instance_id); TUniqueIdToUniqueIdPB(instance_exec_params->back().instance_id,
build_input.mutable_input_finstance_id());
parent_exec_params.join_build_inputs.emplace_back(build_input); parent_exec_params.join_build_inputs.emplace_back(build_input);
VLOG(3) << "Linked join build for node id=" << sink.dest_node_id VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
<< " build finstance=" << PrintId(instance_exec_params->back().instance_id) << " build finstance=" << PrintId(instance_exec_params->back().instance_id)
@@ -1048,6 +1052,31 @@ void Scheduler::AssignmentCtx::SelectExecutorOnHost(
} }
} }
void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_range_pb) {
if (tscan_range.__isset.hdfs_file_split) {
HdfsFileSplitPB* hdfs_file_split = scan_range_pb->mutable_hdfs_file_split();
hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
hdfs_file_split->set_file_compression(
THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
hdfs_file_split->set_partition_path_hash(
tscan_range.hdfs_file_split.partition_path_hash);
}
if (tscan_range.__isset.hbase_key_range) {
HBaseKeyRangePB* hbase_key_range = scan_range_pb->mutable_hbase_key_range();
hbase_key_range->set_startkey(tscan_range.hbase_key_range.startKey);
hbase_key_range->set_stopkey(tscan_range.hbase_key_range.stopKey);
}
if (tscan_range.__isset.kudu_scan_token) {
scan_range_pb->set_kudu_scan_token(tscan_range.kudu_scan_token);
}
}
void Scheduler::AssignmentCtx::RecordScanRangeAssignment( void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
const BackendDescriptorPB& executor, PlanNodeId node_id, const BackendDescriptorPB& executor, PlanNodeId node_id,
const vector<TNetworkAddress>& host_list, const vector<TNetworkAddress>& host_list,
@@ -1104,14 +1133,15 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
PerNodeScanRanges* scan_ranges = FindOrInsert( PerNodeScanRanges* scan_ranges = FindOrInsert(
assignment, FromNetworkAddressPB(executor.address()), PerNodeScanRanges()); assignment, FromNetworkAddressPB(executor.address()), PerNodeScanRanges());
vector<TScanRangeParams>* scan_range_params_list = vector<ScanRangeParamsPB>* scan_range_params_list =
FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>()); FindOrInsert(scan_ranges, node_id, vector<ScanRangeParamsPB>());
// Add scan range. // Add scan range.
TScanRangeParams scan_range_params; ScanRangeParamsPB scan_range_params;
scan_range_params.scan_range = scan_range_locations.scan_range; TScanRangeToScanRangePB(
scan_range_params.__set_volume_id(volume_id); scan_range_locations.scan_range, scan_range_params.mutable_scan_range());
scan_range_params.__set_try_hdfs_cache(try_hdfs_cache); scan_range_params.set_volume_id(volume_id);
scan_range_params.__set_is_remote(remote_read); scan_range_params.set_try_hdfs_cache(try_hdfs_cache);
scan_range_params.set_is_remote(remote_read);
scan_range_params_list->push_back(scan_range_params); scan_range_params_list->push_back(scan_range_params);
if (VLOG_FILE_IS_ON) { if (VLOG_FILE_IS_ON) {
@@ -1133,8 +1163,8 @@ void Scheduler::AssignmentCtx::PrintAssignment(
VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first); VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
for (const PerNodeScanRanges::value_type& per_node_scan_ranges : entry.second) { for (const PerNodeScanRanges::value_type& per_node_scan_ranges : entry.second) {
stringstream str; stringstream str;
for (const TScanRangeParams& params : per_node_scan_ranges.second) { for (const ScanRangeParamsPB& params : per_node_scan_ranges.second) {
str << ThriftDebugString(params) << " "; str << params.DebugString() << " ";
} }
VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str(); VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
} }

View File

@@ -180,7 +180,7 @@ class Scheduler {
/// Pick an executor in round-robin fashion from multiple executors on a single host. /// Pick an executor in round-robin fashion from multiple executors on a single host.
void SelectExecutorOnHost(const IpAddr& executor_ip, BackendDescriptorPB* executor); void SelectExecutorOnHost(const IpAddr& executor_ip, BackendDescriptorPB* executor);
/// Build a new TScanRangeParams object and append it to the assignment list for the /// Build a new ScanRangeParamsPB object and append it to the assignment list for the
/// tuple (executor, node_id) in 'assignment'. Also, update assignment_heap_ and /// tuple (executor, node_id) in 'assignment'. Also, update assignment_heap_ and
/// assignment_byte_counters_, increase the counters 'total_assignments_' and /// assignment_byte_counters_, increase the counters 'total_assignments_' and
/// 'total_local_assignments_'. 'scan_range_locations' contains information about the /// 'total_local_assignments_'. 'scan_range_locations' contains information about the
@@ -388,8 +388,8 @@ class Scheduler {
/// positive. Only returns non-empty vectors: if there are not enough ranges /// positive. Only returns non-empty vectors: if there are not enough ranges
/// to create 'max_num_instances', fewer instances are assigned ranges. /// to create 'max_num_instances', fewer instances are assigned ranges.
/// May reorder ranges in 'ranges'. /// May reorder ranges in 'ranges'.
static std::vector<std::vector<TScanRangeParams>> AssignRangesToInstances( static std::vector<std::vector<ScanRangeParamsPB>> AssignRangesToInstances(
int max_num_instances, std::vector<TScanRangeParams>* ranges); int max_num_instances, std::vector<ScanRangeParamsPB>* ranges);
/// For each instance of fragment_params's input fragment, create a collocated /// For each instance of fragment_params's input fragment, create a collocated
/// instance for fragment_params's fragment. /// instance for fragment_params's fragment.

View File

@@ -201,8 +201,9 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
query_ctx.request_pool = "fe-eval-exprs"; query_ctx.request_pool = "fe-eval-exprs";
RuntimeState state(query_ctx, ExecEnv::GetInstance()); RuntimeState state(query_ctx, ExecEnv::GetInstance());
TPlanFragmentCtx fragment_ctx; TPlanFragment fragment;
FragmentState fragment_state(state.query_state(), fragment_ctx); PlanFragmentCtxPB fragment_ctx;
FragmentState fragment_state(state.query_state(), fragment, fragment_ctx);
// Make sure to close the runtime state no matter how this scope is exited. // Make sure to close the runtime state no matter how this scope is exited.
const auto close_runtime_state = MakeScopeExitTrigger([&state, &fragment_state]() { const auto close_runtime_state = MakeScopeExitTrigger([&state, &fragment_state]() {
fragment_state.ReleaseResources(); fragment_state.ReleaseResources();

View File

@@ -42,6 +42,7 @@ add_library(Util
codec.cc codec.cc
collection-metrics.cc collection-metrics.cc
common-metrics.cc common-metrics.cc
compression-util.cc
compress.cc compress.cc
cpu-info.cc cpu-info.cc
cyclic-barrier.cc cyclic-barrier.cc

View File

@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/compression-util.h"
#include "common/logging.h"
namespace impala {
CompressionTypePB THdfsCompressionToProto(const THdfsCompression::type& compression) {
switch(compression) {
case NONE: return CompressionTypePB::NONE;
case DEFAULT: return CompressionTypePB::DEFAULT;
case GZIP: return CompressionTypePB::GZIP;
case DEFLATE: return CompressionTypePB::DEFLATE;
case BZIP2: return CompressionTypePB::BZIP2;
case SNAPPY: return CompressionTypePB::SNAPPY;
case SNAPPY_BLOCKED: return CompressionTypePB::SNAPPY_BLOCKED;
case LZO: return CompressionTypePB::LZO;
case LZ4: return CompressionTypePB::LZ4;
case ZLIB: return CompressionTypePB::ZLIB;
case ZSTD: return CompressionTypePB::ZSTD;
case BROTLI: return CompressionTypePB::BROTLI;
case LZ4_BLOCKED: return CompressionTypePB::LZ4_BLOCKED;
}
DCHECK(false) << "Invalid compression type: " << compression;
return CompressionTypePB::NONE;
}
THdfsCompression::type CompressionTypePBToThrift(const CompressionTypePB& compression) {
switch(compression) {
case NONE: return THdfsCompression::NONE;
case DEFAULT: return THdfsCompression::DEFAULT;
case GZIP: return THdfsCompression::GZIP;
case DEFLATE: return THdfsCompression::DEFLATE;
case BZIP2: return THdfsCompression::BZIP2;
case SNAPPY: return THdfsCompression::SNAPPY;
case SNAPPY_BLOCKED: return THdfsCompression::SNAPPY_BLOCKED;
case LZO: return THdfsCompression::LZO;
case LZ4: return THdfsCompression::LZ4;
case ZLIB: return THdfsCompression::ZLIB;
case ZSTD: return THdfsCompression::ZSTD;
case BROTLI: return THdfsCompression::BROTLI;
case LZ4_BLOCKED: return THdfsCompression::LZ4_BLOCKED;
}
DCHECK(false) << "Invalid compression type: " << compression;
return THdfsCompression::NONE;
}
} // namespace impala

View File

@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "gen-cpp/CatalogObjects_types.h"
#include "gen-cpp/common.pb.h"
namespace impala {
// Convert THdfsCompression to the equivalent protobuf enum.
CompressionTypePB THdfsCompressionToProto(const THdfsCompression::type& compression);
// Convert CompressionTypePB to the equivalent thrift enum.
THdfsCompression::type CompressionTypePBToThrift(const CompressionTypePB& compression);
} // namespace impala

View File

@@ -227,6 +227,14 @@ const V& FindWithDefault(const boost::unordered_map<K, V>& m, const K& key,
return it->second; return it->second;
} }
template <typename K, typename V>
const V& FindWithDefault(const google::protobuf::Map<K, V>& m, const K& key,
const V& default_val) {
typename google::protobuf::Map<K,V>::const_iterator it = m.find(key);
if (it == m.end()) return default_val;
return it->second;
}
/// Merges (by summing) the values from two maps of values. The values must be /// Merges (by summing) the values from two maps of values. The values must be
/// native types or support operator +=. /// native types or support operator +=.
template<typename MAP_TYPE> template<typename MAP_TYPE>

View File

@@ -48,6 +48,12 @@ inline void TUniqueIdToUniqueIdPB(
unique_id_pb->set_hi(t_unique_id.hi); unique_id_pb->set_hi(t_unique_id.hi);
} }
inline void UniqueIdPBToTUniqueId(
const UniqueIdPB& unique_id_pb, TUniqueId* t_unique_id) {
t_unique_id->__set_lo(unique_id_pb.lo());
t_unique_id->__set_hi(unique_id_pb.hi());
}
/// Query id: uuid with bottom 4 bytes set to 0 /// Query id: uuid with bottom 4 bytes set to 0
/// Fragment instance id: query id with instance index stored in the bottom 4 bytes /// Fragment instance id: query id with instance index stored in the bottom 4 bytes

View File

@@ -22,7 +22,7 @@ add_custom_target(proto-deps)
set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/) set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
foreach(pb_src common row_batch) foreach(pb_src common row_batch planner)
string(TOUPPER ${pb_src} _PB_SRC_UPPER) string(TOUPPER ${pb_src} _PB_SRC_UPPER)
set(_PROTO_SRCS ${_PB_SRC_UPPER}_PROTO_SRCS) set(_PROTO_SRCS ${_PB_SRC_UPPER}_PROTO_SRCS)
set(_PROTO_HDRS ${_PB_SRC_UPPER}_PROTO_HDRS) set(_PROTO_HDRS ${_PB_SRC_UPPER}_PROTO_HDRS)

View File

@@ -39,11 +39,22 @@ message UniqueIdPB {
required fixed64 lo = 2; required fixed64 lo = 2;
} }
// The compression codec. Currently used in row batch's header to // The compression codec. Currently used to indicate the compression used in
// indicate the type of compression applied to the row batch. // row batches and HDFS files. Corresponds to THdfsCompression.
enum CompressionType { enum CompressionTypePB {
NONE = 0; // No compression. NONE = 0;
LZ4 = 1; DEFAULT = 1;
GZIP = 2;
DEFLATE = 3;
BZIP2 = 4;
SNAPPY = 5;
SNAPPY_BLOCKED = 6;
LZO = 7;
LZ4 = 8;
ZLIB = 9;
ZSTD = 10;
BROTLI = 11;
LZ4_BLOCKED = 12;
} }
// This is a union over all possible return types. // This is a union over all possible return types.

View File

@@ -21,6 +21,7 @@ syntax="proto2";
package impala; package impala;
import "common.proto"; import "common.proto";
import "planner.proto";
import "kudu/rpc/rpc_header.proto"; import "kudu/rpc/rpc_header.proto";
@@ -239,6 +240,70 @@ message RemoteShutdownResultPB {
optional ShutdownStatusPB shutdown_status = 2; optional ShutdownStatusPB shutdown_status = 2;
} }
// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {
// The globally unique fragment instance id.
optional UniqueIdPB fragment_instance_id = 1;
// Hostname + port of the Thrift based ImpalaInteralService on the destination.
optional NetworkAddressPB thrift_backend = 2;
// IP address + port of the KRPC based ImpalaInternalService on the destination.
optional NetworkAddressPB krpc_backend = 3;
}
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Output destinations, one per output partition. The partitioning of the output is
// specified by TPlanFragment.output_sink.output_partition in the corresponding
// TPlanFragment. The number of output partitions is destinations.size().
repeated PlanFragmentDestinationPB destinations = 2;
}
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
optional ScanRangePB scan_range = 1;
optional int32 volume_id = 2 [default = -1];
optional bool try_hdfs_cache = 3 [default = false];
optional bool is_remote = 4;
}
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {
repeated ScanRangeParamsPB scan_ranges = 1;
}
// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
// The join node id that will consume this join build.
optional int32 join_node_id = 1;
// Fragment instance id of the input fragment instance.
optional UniqueIdPB input_finstance_id = 2;
}
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Map from plan node id to initial scan ranges for each scan node in
// TPlanFragment.plan_tree
map<int32, ScanRangesPB> per_node_scan_ranges = 2;
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 3;
}
// ExecQueryFInstances // ExecQueryFInstances
message ExecQueryFInstancesRequestPB { message ExecQueryFInstancesRequestPB {
// This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
@@ -268,6 +333,14 @@ message ExecQueryFInstancesRequestPB {
// The backend memory limit (in bytes) as set by the admission controller. Used by the // The backend memory limit (in bytes) as set by the admission controller. Used by the
// query mem tracker to enforce the memory limit. // query mem tracker to enforce the memory limit.
optional int64 per_backend_mem_limit = 6; optional int64 per_backend_mem_limit = 6;
// General execution parameters for different fragments. Corresponds to 'fragments' in
// the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentCtxPB fragment_ctxs = 7;
// Execution parameters for specific fragment instances. Corresponds to
// 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
} }
message ExecQueryFInstancesResponsePB { message ExecQueryFInstancesResponsePB {

View File

@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
syntax="proto2";
package impala;
import "common.proto";
// Specification of a subsection of a single HDFS file. Corresponds to THdfsFileSpilt and
// should be kept in sync with it.
message HdfsFileSplitPB {
// File name (not the full path). The path is assumed to be relative to the
// 'location' of the THdfsPartition referenced by partition_id.
optional string relative_path = 1;
// Starting offset.
optional int64 offset = 2;
// Length of split.
optional int64 length = 3;
// ID of partition within the THdfsTable associated with this scan node.
optional int64 partition_id = 4;
// Total size of the hdfs file.
optional int64 file_length = 5;
// Compression type of the hdfs file.
optional CompressionTypePB file_compression = 6;
// Last modified time of the file.
optional int64 mtime = 7;
// Whether this file is erasure-coded.
optional bool is_erasure_coded = 8;
// Hash of the partition's path. This must be hashed with a hash algorithm that is
// consistent across different processes and machines. This is currently using
// Java's String.hashCode(), which is consistent. For testing purposes, this can use
// any consistent hash.
optional int32 partition_path_hash = 9;
}
// Key range for single THBaseScanNode. Corresponds to THBaseKeyRange and should be kept
// in sync with it.
message HBaseKeyRangePB {
// Inclusive
optional string startKey = 1;
// Exclusive
optional string stopKey = 2;
}
// Specification of an individual data range which is held in its entirety by a storage
// server. Corresponds to TScanRange and should be kept in sync with it.
message ScanRangePB {
// One of these must be set for every ScanRangePB.
optional HdfsFileSplitPB hdfs_file_split = 1;
optional HBaseKeyRangePB hbase_key_range = 2;
optional string kudu_scan_token = 3;
}

View File

@@ -37,5 +37,5 @@ message RowBatchHeaderPB {
optional int64 uncompressed_size = 3; optional int64 uncompressed_size = 3;
// The compression codec (if any) used for compressing the row batch. // The compression codec (if any) used for compressing the row batch.
optional CompressionType compression_type = 4; optional CompressionTypePB compression_type = 4;
} }

View File

@@ -590,54 +590,16 @@ struct TQueryCtx {
25: optional i64 transaction_id 25: optional i64 transaction_id
} }
// Specification of one output destination of a plan fragment
struct TPlanFragmentDestination {
// the globally unique fragment instance id
1: required Types.TUniqueId fragment_instance_id
// hostname + port of the Thrift based ImpalaInteralService on the destination
2: required Types.TNetworkAddress thrift_backend
// IP address + port of the KRPC based ImpalaInternalService on the destination
3: optional Types.TNetworkAddress krpc_backend
}
// Context to collect information, which is shared among all instances of that plan
// fragment.
struct TPlanFragmentCtx {
1: required Planner.TPlanFragment fragment
// Output destinations, one per output partition.
// The partitioning of the output is specified by
// TPlanFragment.output_sink.output_partition.
// The number of output partitions is destinations.size().
2: list<TPlanFragmentDestination> destinations
}
// A scan range plus the parameters needed to execute that scan.
struct TScanRangeParams {
1: required PlanNodes.TScanRange scan_range
2: optional i32 volume_id = -1
3: optional bool try_hdfs_cache = false
4: optional bool is_remote
}
// Descriptor that indicates that a runtime filter is produced by a plan node. // Descriptor that indicates that a runtime filter is produced by a plan node.
struct TRuntimeFilterSource { struct TRuntimeFilterSource {
1: required Types.TPlanNodeId src_node_id 1: required Types.TPlanNodeId src_node_id
2: required i32 filter_id 2: required i32 filter_id
} }
// Information about the input fragment instance of a join node. // The Thrift portion of the execution parameters of a single fragment instance. Every
struct TJoinBuildInput { // fragment instance will also have a corresponding PlanFragmentInstanceCtxPB with the
// The join node id that will consume this join build. // same fragment_idx.
1: required Types.TPlanNodeId join_node_id // TODO: convert the rest of this struct to protobuf
// Fragment instance id of the input fragment instance.
2: required Types.TUniqueId input_finstance_id
}
// Execution parameters of a single fragment instance.
struct TPlanFragmentInstanceCtx { struct TPlanFragmentInstanceCtx {
// TPlanFragment.idx // TPlanFragment.idx
1: required Types.TFragmentIdx fragment_idx 1: required Types.TFragmentIdx fragment_idx
@@ -656,12 +618,9 @@ struct TPlanFragmentInstanceCtx {
// Range: [0, <# of instances of parent fragment> - 1] // Range: [0, <# of instances of parent fragment> - 1]
3: required i32 per_fragment_instance_idx 3: required i32 per_fragment_instance_idx
// Initial scan ranges for each scan node in TPlanFragment.plan_tree
4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
// Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree; // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
// needed to create a DataStreamRecvr // needed to create a DataStreamRecvr
// TODO for per-query exec rpc: move these to TPlanFragmentCtx // TODO for per-query exec rpc: move these to PlanFragmentCtxPB
5: required map<Types.TPlanNodeId, i32> per_exch_num_senders 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
// Id of this instance in its role as a sender. // Id of this instance in its role as a sender.
@@ -672,9 +631,6 @@ struct TPlanFragmentInstanceCtx {
// List of runtime filters produced by nodes in the finstance. // List of runtime filters produced by nodes in the finstance.
8: optional list<TRuntimeFilterSource> filters_produced 8: optional list<TRuntimeFilterSource> filters_produced
// List of input join build finstances for joins in this finstance.
9: optional list<TJoinBuildInput> join_build_inputs
// If this is a join build fragment, the number of fragment instances that consume the // If this is a join build fragment, the number of fragment instances that consume the
// join build. -1 = invalid. // join build. -1 = invalid.
10: optional i32 num_join_build_outputs 10: optional i32 num_join_build_outputs
@@ -694,9 +650,9 @@ enum ImpalaInternalServiceVersion {
// serialize it ourselves and send it with ExecQueryFInstances as a sidecar. // serialize it ourselves and send it with ExecQueryFInstances as a sidecar.
// TODO: investigate if it's worth converting this fully to protobuf // TODO: investigate if it's worth converting this fully to protobuf
struct TExecPlanFragmentInfo { struct TExecPlanFragmentInfo {
1: optional list<TPlanFragmentCtx> fragment_ctxs 1: optional list<Planner.TPlanFragment> fragments
// the order corresponds to the order of fragments in fragment_ctxs // the order corresponds to the order of fragments in 'fragments'
2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs 2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
} }

View File

@@ -170,7 +170,8 @@ struct TRuntimeFilterDesc {
// - T<subclass>: all other operational parameters that are the same across // - T<subclass>: all other operational parameters that are the same across
// all plan fragments // all plan fragments
// Specification of subsection of a single hdfs file. // Specification of a subsection of a single HDFS file. Corresponds to HdfsFileSpiltPB and
// should be kept in sync with it.
struct THdfsFileSplit { struct THdfsFileSplit {
// File name (not the full path). The path is assumed to be relative to the // File name (not the full path). The path is assumed to be relative to the
// 'location' of the THdfsPartition referenced by partition_id. // 'location' of the THdfsPartition referenced by partition_id.
@@ -204,7 +205,8 @@ struct THdfsFileSplit {
9: required i32 partition_path_hash 9: required i32 partition_path_hash
} }
// key range for single THBaseScanNode // Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept
// in sync with it.
// TODO: does 'binary' have an advantage over string? strings can // TODO: does 'binary' have an advantage over string? strings can
// already store binary data // already store binary data
struct THBaseKeyRange { struct THBaseKeyRange {
@@ -240,7 +242,7 @@ struct TFileSplitGeneratorSpec {
} }
// Specification of an individual data range which is held in its entirety // Specification of an individual data range which is held in its entirety
// by a storage server. // by a storage server. Corresponds to ScanRangePB and should be kept in sync with it.
struct TScanRange { struct TScanRange {
// one of these must be set for every TScanRange // one of these must be set for every TScanRange
1: optional THdfsFileSplit hdfs_file_split 1: optional THdfsFileSplit hdfs_file_split