IMPALA-3001 / 3008: Runtime filter polish

This commit polishes a few parts of the runtime filter feature:

1. ENABLE_RUNTIME_FILTER_PROPAGATION has been replaced with
RUNTIME_FILTER_MODE which takes as values LOCAL, GLOBAL and OFF.

2. The filter routing table is only printed in mode GLOBAL.

3. The filter routing table is now printed with TablePrinter, and
includes whether a filter is broadcast, or if it's a partition-only
filter.

4. Parquet per-row filtering can be disabled using
DISABLE_PARQUET_ROW_FILTERING.

5. De-serialisation of the Thrift Bloom filter is moved out of the
spinlock scope in Coordinator::UpdateFilter().

Change-Id: I9257aa079c3793c1c4b3e2be51e25fc207298c32
Reviewed-on: http://gerrit.cloudera.org:8080/2194
Reviewed-by: Henry Robinson <henry@cloudera.com>
Tested-by: Henry Robinson <henry@cloudera.com>
This commit is contained in:
Henry Robinson
2016-02-16 12:22:43 -08:00
committed by Harrison Sheinblatt
parent 8135ef6eaa
commit 2cc586372c
14 changed files with 160 additions and 60 deletions

View File

@@ -81,10 +81,14 @@ Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
BOOST_FOREACH(const TRuntimeFilterDesc& tfilter, tnode.runtime_filters) {
// If filter propagation not enabled, only consider building broadcast joins (that may
// be consumed by this fragment).
if (!state->query_options().enable_runtime_filter_propagation &&
if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
!tfilter.is_broadcast_join) {
continue;
}
if (state->query_options().disable_row_runtime_filtering &&
!tfilter.is_bound_by_partition_columns) {
continue;
}
filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true));
ExprContext* ctx;
RETURN_IF_ERROR(Expr::CreateExprTree(pool_, tfilter.src_expr, &ctx));

View File

@@ -434,8 +434,8 @@ class HdfsParquetScanner::BaseScalarColumnReader :
RuntimeState* state = parent_->scan_node_->runtime_state();
hash_seed_ = state->fragment_hash_seed();
// Check to see if filter expr is bound by this slot. Any filter is only valid for the
// top-level tuple.
// Check to see if any filter expr is bound by this slot. Any filter is only valid for
// the top-level tuple.
HdfsScanNode* scan_node = parent_->scan_node_;
if (slot_desc != NULL && slot_desc_->parent() == scan_node->tuple_desc()) {
for (int i = 0; i < parent_->context_->filter_ctxs().size(); ++i) {

View File

@@ -136,13 +136,19 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
Expr::CreateExprTrees(pool_, iter->second, &conjuncts_map_[iter->first]));
}
const TQueryOptions& query_options = runtime_state()->query_options();
BOOST_FOREACH(const TRuntimeFilterDesc& filter, tnode.runtime_filters) {
if (query_options.disable_row_runtime_filtering &&
!filter.is_bound_by_partition_columns) {
continue;
}
FilterContext filter_ctx;
RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.target_expr, &filter_ctx.expr));
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
RuntimeProfile* profile = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(),
Substitute("Filter $0", filter.filter_id)));
Substitute("Filter $0", filter.filter_id)));
runtime_profile_->AddChild(profile);
filter_ctx.stats = state->obj_pool()->Add(
new FilterStats(profile, filter.is_bound_by_partition_columns));
@@ -184,7 +190,8 @@ bool HdfsScanNode::WaitForPartitionFilters(int32_t time_ms) {
if (!ctx.filter->WaitForArrival(time_ms)) {
all_filters_arrived = false;
} else {
arrived_filter_ids.push_back(Substitute("$0", ctx.filter->filter_desc().filter_id));
arrived_filter_ids.push_back(
Substitute("$0", ctx.filter->filter_desc().filter_id));
}
}
}

View File

@@ -93,10 +93,14 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
BOOST_FOREACH(const TRuntimeFilterDesc& filter, tnode.runtime_filters) {
// If filter propagation not enabled, only consider building broadcast joins (that may
// be consumed by this fragment).
if (!state->query_options().enable_runtime_filter_propagation &&
if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
!filter.is_broadcast_join) {
continue;
}
if (state->query_options().disable_row_runtime_filtering &&
!filter.is_bound_by_partition_columns) {
continue;
}
FilterContext filter_ctx;
filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.src_expr, &filter_ctx.expr));

View File

@@ -16,6 +16,7 @@
#include <limits>
#include <map>
#include <memory>
#include <thrift/protocol/TDebugProtocol.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/accumulators/accumulators.hpp>
@@ -60,6 +61,7 @@
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "util/summary-util.h"
#include "util/table-printer.h"
#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Frontend_types.h"
@@ -78,6 +80,7 @@ using boost::algorithm::join;
using boost::algorithm::token_compress_on;
using boost::algorithm::split;
using boost::filesystem::path;
using std::unique_ptr;
DECLARE_int32(be_port);
DECLARE_string(hostname);
@@ -479,6 +482,9 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
int fragment_instance_idx = 0;
bool has_coordinator_fragment =
request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
bool filter_routing_enabled =
schedule->query_options().__isset.runtime_filter_mode &&
schedule->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL;
// Start one fragment instance per fragment per host (number of hosts running each
// fragment may not be constant).
for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
@@ -487,27 +493,29 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
int num_hosts = params->hosts.size();
DCHECK_GT(num_hosts, 0);
// Build the filter routing table by iterating over all plan nodes in this fragment
// and collecting the filters that they either produce or consume.
BOOST_FOREACH(const TPlanNode& plan_node,
request.fragments[fragment_idx].plan.nodes) {
if (plan_node.__isset.hash_join_node && plan_node.__isset.runtime_filters) {
BOOST_FOREACH(const TRuntimeFilterDesc& filter, plan_node.runtime_filters) {
Filter* f = &(filter_routing_table_[filter.filter_id]);
f->src = plan_node.node_id;
// TODO: Remove hack when FE provides this information.
// TODO: consider only sending filter requests to ~3 backends for broadcast
// filters, to reduce load during aggregation.
bool is_broadcast = (plan_node.label_detail.find("BROADCAST") != string::npos);
f->pending_count = is_broadcast ? 1 : num_hosts;
}
} else if (plan_node.__isset.hdfs_scan_node && plan_node.__isset.runtime_filters) {
BOOST_FOREACH(const TRuntimeFilterDesc& filter, plan_node.runtime_filters) {
Filter* f = &(filter_routing_table_[filter.filter_id]);
f->dst = plan_node.node_id;
for (int i = fragment_instance_idx; i < fragment_instance_idx + num_hosts;
++i) {
f->fragment_instance_idxs.push_back(i);
if (filter_routing_enabled) {
// Build the filter routing table by iterating over all plan nodes in this fragment
// and collecting the filters that they either produce or consume.
BOOST_FOREACH(const TPlanNode& plan_node,
request.fragments[fragment_idx].plan.nodes) {
if (plan_node.__isset.hash_join_node && plan_node.__isset.runtime_filters) {
BOOST_FOREACH(const TRuntimeFilterDesc& filter, plan_node.runtime_filters) {
FilterState* f = &(filter_routing_table_[filter.filter_id]);
f->desc = filter;
f->src = plan_node.node_id;
// TODO: consider only sending filter requests to ~3 backends for broadcast
// filters, to reduce load during aggregation.
f->pending_count = filter.is_broadcast_join ? 1 : num_hosts;
}
} else if (
plan_node.__isset.hdfs_scan_node && plan_node.__isset.runtime_filters) {
BOOST_FOREACH(const TRuntimeFilterDesc& filter, plan_node.runtime_filters) {
FilterState* f = &(filter_routing_table_[filter.filter_id]);
f->dst = plan_node.node_id;
for (int i = fragment_instance_idx; i < fragment_instance_idx + num_hosts;
++i) {
f->fragment_instance_idxs.push_back(i);
}
}
}
}
@@ -534,11 +542,13 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
exec_complete_barrier_->Wait();
query_events_->MarkEvent(
Substitute("All $0 remote fragments started", fragment_instance_idx));
query_profile_->AddInfoString(
"Number of filters", Substitute("$0", filter_routing_table_.size()));
query_profile_->AddInfoString("Filter routing table", FilterDebugString());
if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
if (filter_routing_enabled) {
query_profile_->AddInfoString(
"Number of filters", Substitute("$0", filter_routing_table_.size()));
query_profile_->AddInfoString("Filter routing table", FilterDebugString());
if (VLOG_IS_ON(2)) VLOG_QUERY << FilterDebugString();
}
Status status = Status::OK();
const TMetricDef& def =
@@ -562,17 +572,42 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
}
string Coordinator::FilterDebugString() {
stringstream ss;
TablePrinter table_printer;
table_printer.AddColumn("ID", false);
table_printer.AddColumn("Src. Node", false);
table_printer.AddColumn("Dst. Node", false);
table_printer.AddColumn("Destinations", false);
table_printer.AddColumn("Pending", false);
table_printer.AddColumn("Broadcast", false);
table_printer.AddColumn("Partition filter", false);
table_printer.AddColumn("First arrived", false);
table_printer.AddColumn("Completed", false);
lock_guard<SpinLock> l(filter_lock_);
ss << endl << "------ Filter routing table ------ " << endl;
BOOST_FOREACH(const FilterRoutingTable::value_type& v, filter_routing_table_) {
ss << "id: " << v.first
<< "\tsrc: " << v.second.src
<< "\tdst: " << v.second.dst
<< "\tnum. destinations: " << v.second.fragment_instance_idxs.size()
<< "\tnum. pending: " << v.second.pending_count << endl;
vector<string> row;
const FilterState& state = v.second;
row.push_back(lexical_cast<string>(v.first));
row.push_back(lexical_cast<string>(state.src));
row.push_back(lexical_cast<string>(state.dst));
row.push_back(lexical_cast<string>(state.fragment_instance_idxs.size()));
row.push_back(lexical_cast<string>(state.pending_count));
row.push_back(state.desc.is_broadcast_join ? "true" : "false");
row.push_back(state.desc.is_bound_by_partition_columns ? "true" : "false");
if (state.first_arrival_time == 0L) {
row.push_back("N/A");
} else {
row.push_back(PrettyPrinter::Print(state.first_arrival_time, TUnit::TIME_NS));
}
if (state.completion_time == 0L) {
row.push_back("N/A");
} else {
row.push_back(PrettyPrinter::Print(state.completion_time, TUnit::TIME_NS));
}
table_printer.AddRow(row);
}
return ss.str();
// Add a line break, as in all contexts this is called we need to start a new line to
// print it correctly.
return Substitute("\n$0", table_printer.ToString());
}
Status Coordinator::GetStatus() {
@@ -944,7 +979,9 @@ Status Coordinator::Wait() {
ReportQuerySummary();
}
query_profile_->AddInfoString("Final filter table", FilterDebugString());
if (filter_routing_table_.size() > 0) {
query_profile_->AddInfoString("Final filter table", FilterDebugString());
}
return return_status;
}
@@ -1847,6 +1884,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
// Make a 'master' copy that will be shared by all concurrent delivery RPC attempts.
shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams());
vector<int32_t> fragment_instance_idxs;
unique_ptr<BloomFilter> bloom_filter(new BloomFilter(params.bloom_filter, NULL, NULL));
{
lock_guard<SpinLock> l(filter_lock_);
FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
@@ -1857,6 +1895,9 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
// Receiving unnecessary updates for a broadcast.
if (it->second.pending_count == 0) return;
if (it->second.first_arrival_time == 0L) {
it->second.first_arrival_time = query_events_->ElapsedTime();
}
--it->second.pending_count;
if (filters_received_->value() == 0) {
@@ -1865,16 +1906,16 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
filters_received_->Add(1);
if (it->second.bloom_filter == NULL) {
it->second.bloom_filter =
obj_pool()->Add(new BloomFilter(params.bloom_filter, NULL, NULL));
obj_pool()->Add(bloom_filter.release());
} else {
// TODO: Implement BloomFilter::Or(const ThriftBloomFilter&)
BloomFilter bloom_filter(params.bloom_filter, NULL, NULL);
it->second.bloom_filter->Or(bloom_filter);
it->second.bloom_filter->Or(*bloom_filter);
}
if (it->second.pending_count > 0) return;
// No more filters are pending on this filter ID. Create a distribution payload and
// offer it to the queue.
it->second.completion_time = query_events_->ElapsedTime();
fragment_instance_idxs = it->second.fragment_instance_idxs;
it->second.bloom_filter->ToThrift(&rpc_params->bloom_filter);
}

View File

@@ -361,7 +361,9 @@ class Coordinator {
/// returned, successfully or not. Initialised during StartRemoteFragments().
boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
struct Filter {
struct FilterState {
TRuntimeFilterDesc desc;
TPlanNodeId src;
TPlanNodeId dst;
@@ -375,14 +377,20 @@ class Coordinator {
/// destination plan fragment instances. Owned by the coordinator's object pool.
BloomFilter* bloom_filter;
Filter() : bloom_filter(NULL) { }
/// Time at which first local filter arrived.
int64_t first_arrival_time;
/// Time at which all local filters arrived.
int64_t completion_time;
FilterState() : bloom_filter(NULL), first_arrival_time(0L), completion_time(0L) { }
};
/// Protects filter_routing_table_.
SpinLock filter_lock_;
/// Map from filter ID to filter.
typedef boost::unordered_map<int32_t, Filter> FilterRoutingTable;
typedef boost::unordered_map<int32_t, FilterState> FilterRoutingTable;
FilterRoutingTable filter_routing_table_;
RuntimeProfile::Counter* filters_received_;

View File

@@ -87,6 +87,8 @@ void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params
void RuntimeFilterBank::UpdateFilterFromLocal(uint32_t filter_id,
BloomFilter* bloom_filter) {
DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
<< "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
TUpdateFilterParams params;
bool is_broadcast = false;
{
@@ -98,7 +100,7 @@ void RuntimeFilterBank::UpdateFilterFromLocal(uint32_t filter_id,
is_broadcast = it->second->filter_desc().is_broadcast_join;
}
if (state_->query_options().enable_runtime_filter_propagation) {
if (state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
bloom_filter->ToThrift(&params.bloom_filter);
params.filter_id = filter_id;
params.query_id = query_ctx_.query_id;

View File

@@ -313,9 +313,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
case TImpalaQueryOptions::DISABLE_STREAMING_PREAGGREGATIONS:
query_options->__set_disable_streaming_preaggregations(atoi(value.c_str()));
break;
case TImpalaQueryOptions::ENABLE_RUNTIME_FILTER_PROPAGATION:
query_options->__set_enable_runtime_filter_propagation(
iequals(value, "true") || iequals(value, "1"));
case TImpalaQueryOptions::RUNTIME_FILTER_MODE:
if (iequals(value, "off") || iequals(value, "0")) {
query_options->__set_runtime_filter_mode(TRuntimeFilterMode::OFF);
} else if (iequals(value, "local") || iequals(value, "1")) {
query_options->__set_runtime_filter_mode(TRuntimeFilterMode::LOCAL);
} else if (iequals(value, "global") || iequals(value, "2")) {
query_options->__set_runtime_filter_mode(TRuntimeFilterMode::GLOBAL);
} else {
return Status(Substitute("Invalid runtime filter mode '$0'. Valid modes are"
" OFF(0), LOCAL(1) or GLOBAL(2).", value));
}
break;
case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
int32 size = atoi(value.c_str());
@@ -327,6 +335,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_runtime_filter_wait_time_ms(time_ms);
break;
}
case TImpalaQueryOptions::DISABLE_ROW_RUNTIME_FILTERING:
query_options->__set_disable_row_runtime_filtering(
iequals(value, "true") || iequals(value, "1"));
break;
default:
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
// when we add a new query option.

View File

@@ -32,7 +32,7 @@ class TQueryOptions;
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS + 1);\
TImpalaQueryOptions::DISABLE_ROW_RUNTIME_FILTERING + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -69,9 +69,10 @@ class TQueryOptions;
QUERY_OPT_FN(random_replica, RANDOM_REPLICA)\
QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS)\
QUERY_OPT_FN(enable_runtime_filter_propagation, ENABLE_RUNTIME_FILTER_PROPAGATION);\
QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE)\
QUERY_OPT_FN(runtime_bloom_filter_size, RUNTIME_BLOOM_FILTER_SIZE);\
QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS);
QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS)\
QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING);
/// Converts a TQueryOptions struct into a map of key, value pairs.
void TQueryOptionsToMap(const TQueryOptions& query_options,

View File

@@ -148,13 +148,16 @@ struct TQueryOptions {
36: optional bool disable_streaming_preaggregations = 0
// If true, runtime filter propagation is enabled
37: optional bool enable_runtime_filter_propagation = 0
37: optional Types.TRuntimeFilterMode runtime_filter_mode = 1
// Size in bytes of runtime bloom filters
38: optional i32 runtime_bloom_filter_size = 0
// Time in ms to wait until partition filters are delivered
39: optional i32 runtime_filter_wait_time_ms = 0
// If true, per-row runtime filtering is disabled
40: optional bool disable_row_runtime_filtering = false
}
// Impala currently has two types of sessions: Beeswax and HiveServer2

View File

@@ -187,15 +187,17 @@ enum TImpalaQueryOptions {
// If true, the planner will not generate plans with streaming preaggregations.
DISABLE_STREAMING_PREAGGREGATIONS,
// If true, enable runtime filter propagation.
ENABLE_RUNTIME_FILTER_PROPAGATION,
RUNTIME_FILTER_MODE,
// Size (in bytes) of a runtime Bloom Filter. Will be rounded up to nearest power of
// two.
RUNTIME_BLOOM_FILTER_SIZE,
// Time (in ms) to wait in scans for partition filters to arrive.
RUNTIME_FILTER_WAIT_TIME_MS
RUNTIME_FILTER_WAIT_TIME_MS,
// If true, disable application of runtime filters to individual rows.
DISABLE_ROW_RUNTIME_FILTERING
}
// The summary of an insert.

View File

@@ -21,6 +21,8 @@ typedef i32 TTupleId
typedef i32 TSlotId
typedef i32 TTableId
// TODO: Consider moving unrelated enums to better locations.
enum TPrimitiveType {
INVALID_TYPE,
NULL_TYPE,
@@ -105,6 +107,18 @@ enum TExplainLevel {
VERBOSE
}
enum TRuntimeFilterMode {
// No filters are computed in the FE or the BE.
OFF,
// Only broadcast filters are computed in the BE, and are only published to the local
// fragment.
LOCAL,
// All fiters are computed in the BE, and are published globally.
GLOBAL
}
// A TNetworkAddress is the standard host, port representation of a
// network address. The hostname field must be resolvable to an IPv4
// address.
@@ -199,4 +213,3 @@ struct TFunction {
9: optional TScalarFunction scalar_fn
10: optional TAggregateFunction aggregate_fn
}

View File

@@ -19,6 +19,7 @@ import com.cloudera.impala.common.ImpalaException;
import com.cloudera.impala.common.PrintUtils;
import com.cloudera.impala.common.RuntimeEnv;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TRuntimeFilterMode;
import com.cloudera.impala.thrift.TQueryCtx;
import com.cloudera.impala.thrift.TQueryExecRequest;
import com.cloudera.impala.thrift.TTableName;
@@ -75,7 +76,8 @@ public class Planner {
// Only one scanner thread for small queries
ctx_.getQueryOptions().setNum_scanner_threads(1);
}
} else {
} else if (
ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
// Always compute filters, even if the BE won't always use all of them.
RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(),
singleNodePlan);

View File

@@ -17,6 +17,7 @@ package com.cloudera.impala.planner;
import org.junit.Test;
import com.cloudera.impala.thrift.TQueryOptions;
import com.cloudera.impala.thrift.TRuntimeFilterMode;
// All planner tests, except for S3 specific tests should go here.
public class PlannerTest extends PlannerTestBase {
@@ -218,7 +219,7 @@ public class PlannerTest extends PlannerTestBase {
@Test
public void testRuntimeFilterPropagation() {
TQueryOptions options = new TQueryOptions();
options.setEnable_runtime_filter_propagation(true);
options.setRuntime_filter_mode(TRuntimeFilterMode.GLOBAL);
runPlannerTestFile("runtime-filter-propagation", options);
}
}