IMPALA-3430: Runtime filter : Extend runtime filter to support Min/Max values for HDFS scans

This patch enables min/max filtering for non-correlated subqueries
that return one value. In this case, the filters are built from the
results of the subqueries and the filtering target is the scan node to
be qualified by one of the subqueries. Shown below is one such query
that normally gets compiled into a nested loop join. The filtering
limits the values from column store_sales.ss_sales_price to be within
[-infinite, min(ss_wholesale_cost)].

  select count(*) from store_sales
    where ss_sales_price <=
      (select min(ss_wholesale_cost) from store_sales);

In FE, the fact that the above scalar subquery exists is recorded
in a flag in InlineViewRef in analyzer and later on transferred to
AggregationNode in planner.

In BE, the min/max filtering infrastructure is integrated with the
nested loop join as follows.

 1. NljBuilderConfig is populated with filter descriptors from nested
    join plan node via NljBuilder::CreateEmbeddedBuilder() (similar
    to hash join), or in NljBuilderConfig::Init() when the sink config
    is created (for separate builder case);
 2. NljBuilder is populated with filter contexts utilizing the filter
    descriptors in NljBuilderConfig. Filter contexts are the interface
    to actual min/max filters;
 3. New insertion methods InsertFor<op>(), where <op> is LE, LT, GE and
    GT, are added to MinMaxFilter class hierarcy. They are used for
    join predicate target <op> src_expr;
 4. RuntimeContext::InsertPerCompareOp() calls one of the new
    insertion methods above based on the comparison op saved in the
    filter descriptor;
 5. NljBuilder::InsertRuntimeFilters() calls the new methods.

By default, the feature is turned on only for sorted or partitioned
join columns.

Testing:
 1. Add single range insertion tests in min-max-filter-test.cc;
 2. Add positive and negative plan tests in
    overlap_min_max_filters.test;
 3. Add tests in overlap_min_max_filters_on_partition_columns.test;
 4. Add tests in overlap_min_max_filters_on_sorted_columns.test;
 5. Run core tests.

TODO in follow-up patches:
 1. Extend min/max filter for inequality subquery for other use cases
    (IMPALA-10869).

Change-Id: I7c2bb5baad622051d1002c9c162c672d428e5446
Reviewed-on: http://gerrit.cloudera.org:8080/17706
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Qifan Chen
2021-07-20 15:27:12 -04:00
committed by Impala Public Jenkins
parent 27179b691c
commit cd902d8c22
38 changed files with 1586 additions and 166 deletions

View File

@@ -146,6 +146,7 @@ class DataSink {
MemTracker* mem_tracker() const { return mem_tracker_.get(); }
RuntimeProfile* profile() const { return profile_; }
const std::string& name() const { return name_; }
const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
return output_expr_evals_;
}

View File

@@ -94,6 +94,33 @@ void FilterContext::Insert(TupleRow* row) const noexcept {
}
}
void FilterContext::InsertPerCompareOp(TupleRow* row) const noexcept {
if (filter->getCompareOp() == extdatasource::TComparisonOp::type::EQ) {
Insert(row);
} else {
DCHECK(filter->is_min_max_filter());
if (local_min_max_filter == nullptr || local_min_max_filter->AlwaysTrue()) return;
void* val = expr_eval->GetValue(row);
switch (filter->getCompareOp()) {
case extdatasource::TComparisonOp::type::LE:
local_min_max_filter->InsertForLE(val);
break;
case extdatasource::TComparisonOp::type::LT:
local_min_max_filter->InsertForLT(val);
break;
case extdatasource::TComparisonOp::type::GE:
local_min_max_filter->InsertForGE(val);
break;
case extdatasource::TComparisonOp::type::GT:
local_min_max_filter->InsertForGT(val);
break;
default:
DCHECK(false)
<< "Unsupported comparison op in FilterContext::InsertPerCompareOp()";
}
}
}
void FilterContext::MaterializeValues() const {
if (filter->is_min_max_filter() && local_min_max_filter != nullptr) {
local_min_max_filter->MaterializeValues();

View File

@@ -120,6 +120,14 @@ struct FilterContext {
/// or 'local_min_max_filter' as appropriate.
void Insert(TupleRow* row) const noexcept;
/// Implements different flavors of insertion based on filter type and comparison
/// op in filter desc.
/// 1). When the op is EQ, regardless of filter type, call this->Insert(TupleRow* row);
/// 2). When the op is LE, LT, GE or GT and the filter type is min/max, call
// MinMaxFilter::InsertFor<op>(TupleRow* row);
/// 3). DCHECK(false) otherwise.
void InsertPerCompareOp(TupleRow* row) const noexcept;
/// Materialize filter values by copying any values stored by filters into memory owned
/// by the filter. Filters may assume that the memory for Insert()-ed values stays valid
/// until this is called.

View File

@@ -17,7 +17,9 @@
#include "exec/join-builder.h"
#include "service/hs2-util.h"
#include "util/debug-util.h"
#include "util/min-max-filter.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
@@ -124,4 +126,71 @@ void JoinBuilder::HandoffToProbesAndWait(RuntimeState* build_side_state) {
<< " cancelled=" << build_side_state->is_cancelled();
}
}
void JoinBuilder::PublishRuntimeFilters(const std::vector<FilterContext>& filter_ctxs,
RuntimeState* runtime_state, float minmax_filter_threshold, int64_t num_build_rows) {
VLOG(3) << name() << " publishing "
<< filter_ctxs.size() << " filters.";
int32_t num_enabled_filters = 0;
for (const FilterContext& ctx : filter_ctxs) {
BloomFilter* bloom_filter = nullptr;
if (ctx.local_bloom_filter != nullptr) {
bloom_filter = ctx.local_bloom_filter;
++num_enabled_filters;
} else if (ctx.local_min_max_filter != nullptr) {
/// Apply the column min/max stats (if applicable) to shut down the min/max
/// filter early by setting always true flag for the filter. Do this only if
/// the min/max filter is too close in area to the column stats of all target
/// scan columns.
const TRuntimeFilterDesc& filter_desc = ctx.filter->filter_desc();
VLOG(3) << "Check out the usefulness of the local minmax filter:"
<< " id=" << ctx.filter->id()
<< ", filter details=" << ctx.local_min_max_filter->DebugString()
<< ", column stats:"
<< " low=" << PrintTColumnValue(filter_desc.targets[0].low_value)
<< ", high=" << PrintTColumnValue(filter_desc.targets[0].high_value)
<< ", threshold=" << minmax_filter_threshold
<< ", #targets=" << filter_desc.targets.size();
bool all_overlap = true;
for (const auto& target_desc : filter_desc.targets) {
if (!FilterContext::ShouldRejectFilterBasedOnColumnStats(
target_desc, ctx.local_min_max_filter, minmax_filter_threshold)) {
all_overlap = false;
break;
}
}
if (all_overlap) {
ctx.local_min_max_filter->SetAlwaysTrue();
VLOG(3) << "The local minmax filter is set to always true:"
<< " id=" << ctx.filter->id();
}
if (!ctx.local_min_max_filter->AlwaysTrue()) {
++num_enabled_filters;
}
}
runtime_state->filter_bank()->UpdateFilterFromLocal(
ctx.filter->id(), bloom_filter, ctx.local_min_max_filter);
if ( ctx.local_min_max_filter != nullptr ) {
VLOG(3) << name() << " published min/max filter: "
<< " id=" << ctx.filter->id()
<< ", details=" << ctx.local_min_max_filter->DebugString();
}
}
if (filter_ctxs.size() > 0) {
string info_string;
if (num_enabled_filters == filter_ctxs.size()) {
info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filter_ctxs.size(),
filter_ctxs.size() == 1 ? "" : "s");
} else {
info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
num_enabled_filters, filter_ctxs.size(), filter_ctxs.size() == 1 ? "" : "s",
filter_ctxs.size() - num_enabled_filters);
}
profile()->AddInfoString("Runtime filters", info_string);
}
}
} // namespace impala

View File

@@ -20,6 +20,8 @@
#include <mutex>
#include "exec/data-sink.h"
#include "exec/filter-context.h"
#include "runtime/runtime-state.h"
#include "util/condition-variable.h"
namespace impala {
@@ -140,6 +142,10 @@ class JoinBuilder : public DataSink {
int num_probe_threads() const { return num_probe_threads_; }
static string ConstructBuilderName(const char* name, int join_node_id) {
return strings::Substitute("$0 Join Builder (join_node_id=$1)", name, join_node_id);
}
protected:
/// ID of the join node that this builder is associated with.
const int join_node_id_;
@@ -205,5 +211,13 @@ class JoinBuilder : public DataSink {
/// TODO: IMPALA-9255: reconsider this so that the build-side thread can exit instead
/// of being blocked indefinitely.
void HandoffToProbesAndWait(RuntimeState* build_side_state);
/// Publish the runtime filters as described in 'filter_ctxs' to the fragment-local
/// RuntimeFilterBank in 'runtime_state'. 'minmax_filter_threshold' specifies the
/// threshold to determine the usefulness of a min/max filter. 'num_build_rows' is used
/// to determine whether the computed filters have an unacceptably high false-positive
/// rate.
void PublishRuntimeFilters(const std::vector<FilterContext>& filter_ctxs,
RuntimeState* runtime_state, float minmax_filter_threshold, int64_t num_build_rows);
};
}

View File

@@ -16,12 +16,18 @@
// under the License.
#include "exec/nested-loop-join-builder.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include <utility>
#include "runtime/fragment-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "service/hs2-util.h"
#include "util/min-max-filter.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
@@ -37,44 +43,138 @@ DataSink* NljBuilderConfig::CreateSink(RuntimeState* state) const {
Status NljBuilderConfig::Init(
const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
const TJoinBuildSink& join_build_sink = tsink.join_build_sink;
const vector<TRuntimeFilterDesc>& filter_descs = join_build_sink.runtime_filters;
RETURN_IF_ERROR(InitExprsAndFilters(filter_descs, state));
return Status::OK();
}
NljBuilder* NljBuilder::CreateEmbeddedBuilder(
const RowDescriptor* row_desc, RuntimeState* state, int join_node_id) {
Status NljBuilderConfig::DoInitExprsAndFilters(
const vector<TRuntimeFilterDesc>& filter_descs,
const vector<TRuntimeFilterSource>& filters_produced, FragmentState* state) {
// Skip over filters that are not produced by the instances of the builder, i.e.
// broadcast filters where this instance was not selected as a filter producer.
for (const TRuntimeFilterDesc& filter_desc : filter_descs) {
DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL
|| filter_desc.is_broadcast_join || state->query_options().num_nodes == 1);
DCHECK(!state->query_options().disable_row_runtime_filtering
|| filter_desc.applied_on_partition_columns);
auto it = std::find_if(filters_produced.begin(), filters_produced.end(),
[this, &filter_desc](const TRuntimeFilterSource f) {
return f.src_node_id == join_node_id_ && f.filter_id == filter_desc.filter_id;
});
if (it == filters_produced.end()) continue;
filter_descs_.push_back(filter_desc);
ScalarExpr* filter_expr = nullptr;
RETURN_IF_ERROR(
ScalarExpr::Create(filter_desc.src_expr, *input_row_desc_, state, &filter_expr));
filter_exprs_.push_back(filter_expr);
}
return Status::OK();
}
Status NljBuilderConfig::InitExprsAndFilters(
const vector<TRuntimeFilterDesc>& filter_descs, FragmentState* state) {
const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs =
state->instance_ctxs();
// Skip over filters that are not produced by the instances of the builder, i.e.
// broadcast filters where this instance was not selected as a filter producer.
// We can pick any instance since the filters produced should be the same for all
// instances.
if (instance_ctxs.size() > 0) {
const TPlanFragmentInstanceCtx& instance_ctx = *instance_ctxs[0];
const vector<TRuntimeFilterSource>& filters_produced = instance_ctx.filters_produced;
return DoInitExprsAndFilters(filter_descs, filters_produced, state);
}
return Status::OK();
}
Status NljBuilderConfig::InitExprsAndFilters(
const vector<TRuntimeFilterDesc>& filter_descs, RuntimeState* state) {
QueryState* queryState = state->query_state();
FragmentState* fragmentState = queryState->findFragmentState(state->fragment().idx);
DCHECK(fragmentState != nullptr);
// Skip over filters that are not produced by the instances of the builder, i.e.
// broadcast filters where this instance was not selected as a filter producer.
const TPlanFragmentInstanceCtx& instance_ctx = state->instance_ctx();
// We can pick any instance since the filters produced should be the same for all
// instances.
const vector<TRuntimeFilterSource>& filters_produced = instance_ctx.filters_produced;
return DoInitExprsAndFilters(filter_descs, filters_produced, fragmentState);
}
void NljBuilderConfig::Close() {
ScalarExpr::Close(filter_exprs_);
DataSinkConfig::Close();
}
Status NljBuilder::CreateEmbeddedBuilder(const RowDescriptor* row_desc,
RuntimeState* state, int join_node_id,
const std::vector<TRuntimeFilterDesc>& filters, NljBuilder** nlj_builder) {
ObjectPool* pool = state->obj_pool();
NljBuilderConfig* sink_config = pool->Add(new NljBuilderConfig());
sink_config->join_node_id_ = join_node_id;
sink_config->tsink_ = pool->Add(new TDataSink());
sink_config->input_row_desc_ = row_desc;
return pool->Add(new NljBuilder(*sink_config, state));
RETURN_IF_ERROR(sink_config->InitExprsAndFilters(filters, state));
*nlj_builder = pool->Add(new NljBuilder(*sink_config, state));
return Status::OK();
}
NljBuilder::NljBuilder(
TDataSinkId sink_id, const NljBuilderConfig& sink_config, RuntimeState* state)
: JoinBuilder(sink_id, sink_config, "Nested Loop Join Builder", state),
build_batch_cache_(row_desc_, state->batch_size()) {}
NljBuilder::NljBuilder(const NljBuilderConfig& sink_config, RuntimeState* state)
: JoinBuilder(-1, sink_config, "Nested Loop Join Builder", state),
build_batch_cache_(row_desc_, state->batch_size()) {}
NljBuilder* NljBuilder::CreateSeparateBuilder(
TDataSinkId sink_id, const NljBuilderConfig& sink_config, RuntimeState* state) {
return state->obj_pool()->Add(new NljBuilder(sink_id, sink_config, state));
}
void NljBuilder::InitFilterContexts(
const NljBuilderConfig& sink_config, RuntimeState* state) {
for (const TRuntimeFilterDesc& filter_desc : sink_config.filter_descs_) {
filter_ctxs_.emplace_back();
filter_ctxs_.back().filter = state->filter_bank()->RegisterProducer(filter_desc);
}
}
NljBuilder::NljBuilder(
TDataSinkId sink_id, const NljBuilderConfig& sink_config, RuntimeState* state)
: JoinBuilder(sink_id, sink_config,
ConstructBuilderName("Nested Loop", sink_config.join_node_id_), state),
build_batch_cache_(row_desc_, state->batch_size()),
filter_exprs_(sink_config.filter_exprs_),
minmax_filter_threshold_(0.0),
runtime_state_(state) {
InitFilterContexts(sink_config, state);
}
NljBuilder::NljBuilder(const NljBuilderConfig& sink_config, RuntimeState* state)
: JoinBuilder(-1, sink_config,
ConstructBuilderName("Nested Loop", sink_config.join_node_id_), state),
build_batch_cache_(row_desc_, state->batch_size()),
filter_exprs_(sink_config.filter_exprs_),
minmax_filter_threshold_(0.0),
runtime_state_(state) {
InitFilterContexts(sink_config, state);
}
NljBuilder::~NljBuilder() {}
Status NljBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
num_build_rows_ = ADD_COUNTER(profile(), "BuildRows", TUnit::UNIT);
DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
for (int i = 0; i < filter_exprs_.size(); ++i) {
RETURN_IF_ERROR(
ScalarExprEvaluator::Create(*filter_exprs_[i], state, state->obj_pool(),
expr_perm_pool_.get(), expr_results_pool_.get(), &filter_ctxs_[i].expr_eval));
}
return Status::OK();
}
Status NljBuilder::Open(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::Open(state));
AllocateRuntimeFilters();
return Status::OK();
}
@@ -113,7 +213,12 @@ Status NljBuilder::FlushFinal(RuntimeState* state) {
DCHECK(copied_build_batches_.total_num_rows() == 0 ||
input_build_batches_.total_num_rows() == 0);
PublishRuntimeFilters(
copied_build_batches_.total_num_rows() + input_build_batches_.total_num_rows());
if (is_separate_build_) HandoffToProbesAndWait(state);
return Status::OK();
}
@@ -129,6 +234,11 @@ void NljBuilder::Close(RuntimeState* state) {
build_batch_cache_.Clear();
input_build_batches_.Reset();
copied_build_batches_.Reset();
for (const FilterContext& ctx : filter_ctxs_) {
if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state);
}
DataSink::Close(state);
closed_ = true;
}
@@ -151,3 +261,26 @@ Status NljBuilder::DeepCopyBuildBatches(RuntimeState* state) {
input_build_batches_.Reset();
return Status::OK();
}
void NljBuilder::AllocateRuntimeFilters() {
for (int i = 0; i < filter_ctxs_.size(); ++i) {
DCHECK(filter_ctxs_[i].filter->is_min_max_filter());
filter_ctxs_[i].local_min_max_filter =
runtime_state_->filter_bank()->AllocateScratchMinMaxFilter(
filter_ctxs_[i].filter->id(), filter_ctxs_[i].expr_eval->root().type());
}
minmax_filter_threshold_ =
(float)(runtime_state_->query_options().minmax_filter_threshold);
}
void NljBuilder::InsertRuntimeFilters(
FilterContext filter_ctxs[], TupleRow* build_row) noexcept {
// For the only interpreted path we can directly use the filter_ctxs_ member variable.
DCHECK_EQ(filter_ctxs_.data(), filter_ctxs);
for (const FilterContext& ctx : filter_ctxs_) ctx.InsertPerCompareOp(build_row);
}
void NljBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
JoinBuilder::PublishRuntimeFilters(
filter_ctxs_, runtime_state_, minmax_filter_threshold_, num_build_rows);
}

View File

@@ -20,6 +20,7 @@
#include "common/atomic.h"
#include "exec/blocking-join-node.h"
#include "exec/filter-context.h"
#include "exec/join-builder.h"
#include "exec/row-batch-cache.h"
#include "exec/row-batch-list.h"
@@ -33,9 +34,32 @@ class NljBuilderConfig : public JoinBuilderConfig {
~NljBuilderConfig() override {}
/// Initializes the filter expressions and creates a copy of the filter
/// descriptors that will be generated by this sink. The first version
/// uses the descriptors in the fragment instance corresponding to 'state'
/// to retain the filters relevant. The 2nd version uses the 1st fragment
/// instance in the fragment to do so.
Status InitExprsAndFilters(
const vector<TRuntimeFilterDesc>& filter_descs, RuntimeState* state);
Status InitExprsAndFilters(
const vector<TRuntimeFilterDesc>& filter_descs, FragmentState* state);
/// Expressions for evaluating input rows for insertion into runtime filters.
/// Only includes exprs for filters produced by this builder.
std::vector<ScalarExpr*> filter_exprs_;
/// The runtime filter descriptors of filters produced by this builder.
vector<TRuntimeFilterDesc> filter_descs_;
void Close() override;
protected:
Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
FragmentState* state) override;
/// Perform the major work for InitExprsAndFilters().
Status DoInitExprsAndFilters(const vector<TRuntimeFilterDesc>& filter_descs,
const vector<TRuntimeFilterSource>& filters_produced, FragmentState* state);
};
/// Builder for the NestedLoopJoinNode that accumulates the build-side rows for the join.
@@ -52,8 +76,9 @@ class NljBuilderConfig : public JoinBuilderConfig {
class NljBuilder : public JoinBuilder {
public:
/// To be used by the NestedLoopJoinNode to create an instance of this sink.
static NljBuilder* CreateEmbeddedBuilder(
const RowDescriptor* row_desc, RuntimeState* state, int join_node_id);
static Status CreateEmbeddedBuilder(const RowDescriptor* row_desc,
RuntimeState* state, int join_node_id,
const std::vector<TRuntimeFilterDesc>& filters, NljBuilder** nlj_builder);
// Factory method for separate builder.
static NljBuilder* CreateSeparateBuilder(
TDataSinkId sink_id, const NljBuilderConfig& sink_config, RuntimeState* state);
@@ -81,7 +106,15 @@ class NljBuilder : public JoinBuilder {
/// be owned by the batch (or a later batch), or DeepCopyBuildBatches() must be called
/// before the referenced resources are released.
/// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for efficiency.
inline void AddBuildBatch(RowBatch* batch) { input_build_batches_.AddRowBatch(batch); }
inline void AddBuildBatch(RowBatch* batch) {
input_build_batches_.AddRowBatch(batch);
if (filter_ctxs_.size() > 0) {
FOREACH_ROW(batch, 0, build_batch_iter) {
TupleRow* build_row = build_batch_iter.Get();
InsertRuntimeFilters(filter_ctxs_.data(), build_row);
}
}
}
/// Return a pointer to the final list of build batches.
/// Only valid to call after FlushFinal() has been called. The returned build batches
@@ -98,6 +131,19 @@ class NljBuilder : public JoinBuilder {
inline RowBatchList* input_build_batches() { return &input_build_batches_; }
inline RowBatchList* copied_build_batches() { return &copied_build_batches_; }
/// For each filter in filters_, allocate a bloom_filter from the fragment-local
/// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
/// phase.
void AllocateRuntimeFilters();
/// Iterates over the runtime filters and inserts each row into each filter.
void InsertRuntimeFilters(FilterContext filter_ctxs[], TupleRow* build_row) noexcept;
/// Publish the runtime filters to the fragment-local RuntimeFilterBank.
/// 'num_build_rows' is used to determine whether the computed filters have an
/// unacceptably high false-positive rate.
void PublishRuntimeFilters(int64_t num_build_rows);
private:
// Constructor for builder embedded in NestedLoopJoinNode.
NljBuilder(const NljBuilderConfig& sink_config, RuntimeState* state);
@@ -105,6 +151,11 @@ class NljBuilder : public JoinBuilder {
NljBuilder(
TDataSinkId sink_id, const NljBuilderConfig& sink_config, RuntimeState* state);
/// Init the filter contexts in this builder with the filter descriptors in the
/// 'sink_config' which are built through the call to
/// NljBuilderConfig::InitExprsAndFilters().
void InitFilterContexts(const NljBuilderConfig& sink_config, RuntimeState* state);
/// Deep copy all build batches in 'input_build_batches_' to 'copied_build_batches_'.
/// Resets all the source batches and clears 'input_build_batches_'.
/// If the memory limit is exceeded while copying batches, returns a
@@ -123,6 +174,18 @@ class NljBuilder : public JoinBuilder {
/// List of build batches that were deep copied from 'input_build_batches_' and are
/// backed by each row batch's pool.
RowBatchList copied_build_batches_;
/// Expressions for evaluating input rows for insertion into runtime filters.
/// Only includes exprs for filters produced by this builder.
const std::vector<ScalarExpr*>& filter_exprs_;
/// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'.
std::vector<FilterContext> filter_ctxs_;
/// Cached copy of the min/max filter threshold value.
float minmax_filter_threshold_;
RuntimeState* const runtime_state_;
};
}

View File

@@ -121,11 +121,13 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
if (!UseSeparateBuild(state->query_options())) {
builder_ = NljBuilder::CreateEmbeddedBuilder(&build_row_desc(), state, id_);
RETURN_IF_ERROR(NljBuilder::CreateEmbeddedBuilder(
&build_row_desc(), state, id_, plan_node().tnode_->runtime_filters, &builder_));
RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
runtime_profile()->PrependChild(builder_->profile());
}

View File

@@ -55,10 +55,6 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
using namespace impala;
using strings::Substitute;
static string ConstructBuilderName(int join_node_id) {
return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id);
}
DataSink* PhjBuilderConfig::CreateSink(RuntimeState* state) const {
// We have one fragment per sink, so we can use the fragment index as the sink ID.
TDataSinkId sink_id = state->fragment().idx;
@@ -162,8 +158,8 @@ const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
PhjBuilder::PhjBuilder(
TDataSinkId sink_id, const PhjBuilderConfig& sink_config, RuntimeState* state)
: JoinBuilder(
sink_id, sink_config, ConstructBuilderName(sink_config.join_node_id_), state),
: JoinBuilder(sink_id, sink_config,
ConstructBuilderName("Hash", sink_config.join_node_id_), state),
runtime_state_(state),
hash_seed_(sink_config.hash_seed_),
resource_profile_(sink_config.resource_profile_),
@@ -200,7 +196,8 @@ PhjBuilder::PhjBuilder(
PhjBuilder::PhjBuilder(const PhjBuilderConfig& sink_config,
BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
int64_t max_row_buffer_size, RuntimeState* state)
: JoinBuilder(-1, sink_config, ConstructBuilderName(sink_config.join_node_id_), state),
: JoinBuilder(
-1, sink_config, ConstructBuilderName("Hash", sink_config.join_node_id_), state),
runtime_state_(state),
hash_seed_(sink_config.hash_seed_),
resource_profile_(nullptr),
@@ -965,69 +962,8 @@ void PhjBuilder::InsertRuntimeFilters(
}
void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
VLOG(3) << "Join builder (join_node_id_=" << join_node_id_ << ") publishing "
<< filter_ctxs_.size() << " filters.";
int32_t num_enabled_filters = 0;
for (const FilterContext& ctx : filter_ctxs_) {
BloomFilter* bloom_filter = nullptr;
if (ctx.local_bloom_filter != nullptr) {
bloom_filter = ctx.local_bloom_filter;
++num_enabled_filters;
} else if (ctx.local_min_max_filter != nullptr) {
/// Apply the column min/max stats (if applicable) to shut down the min/max
/// filter early by setting always true flag for the filter. Do this only if
/// the min/max filter is too close in area to the column stats of all target
/// scan columns.
const TRuntimeFilterDesc& filter_desc = ctx.filter->filter_desc();
VLOG(3) << "Check out the usefulness of the local minmax filter:"
<< " id=" << ctx.filter->id()
<< ", fillter details=" << ctx.local_min_max_filter->DebugString()
<< ", column stats:"
<< " low=" << PrintTColumnValue(filter_desc.targets[0].low_value)
<< ", high=" << PrintTColumnValue(filter_desc.targets[0].high_value)
<< ", threshold=" << minmax_filter_threshold_
<< ", #targets=" << filter_desc.targets.size();
bool all_overlap = true;
for (const auto& target_desc : filter_desc.targets) {
if (!FilterContext::ShouldRejectFilterBasedOnColumnStats(
target_desc, ctx.local_min_max_filter, minmax_filter_threshold_)) {
all_overlap = false;
break;
}
}
if (all_overlap) {
ctx.local_min_max_filter->SetAlwaysTrue();
VLOG(3) << "The local minmax filter is set to always true:"
<< " id=" << ctx.filter->id();
}
if (!ctx.local_min_max_filter->AlwaysTrue()) {
++num_enabled_filters;
}
}
runtime_state_->filter_bank()->UpdateFilterFromLocal(
ctx.filter->id(), bloom_filter, ctx.local_min_max_filter);
if ( ctx.local_min_max_filter != nullptr ) {
VLOG(3) << "HJBuilder published min/max filter: "
<< " id=" << ctx.filter->id()
<< ", details=" << ctx.local_min_max_filter->DebugString();
}
}
if (filter_ctxs_.size() > 0) {
string info_string;
if (num_enabled_filters == filter_ctxs_.size()) {
info_string = Substitute("$0 of $0 Runtime Filter$1 Published", filter_ctxs_.size(),
filter_ctxs_.size() == 1 ? "" : "s");
} else {
info_string = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
num_enabled_filters, filter_ctxs_.size(), filter_ctxs_.size() == 1 ? "" : "s",
filter_ctxs_.size() - num_enabled_filters);
}
profile()->AddInfoString("Runtime filters", info_string);
}
JoinBuilder::PublishRuntimeFilters(
filter_ctxs_, runtime_state_, minmax_filter_threshold_, num_build_rows);
}
Status PhjBuilder::BeginSpilledProbe(BufferPool::ClientHandle* probe_client,

View File

@@ -381,10 +381,11 @@ void Coordinator::InitFilterRoutingTable() {
if (!plan_node.__isset.runtime_filters) continue;
for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
// Currently hash joins are the only filter sources. Otherwise it must be
// a filter consumer.
if (plan_node.__isset.join_node &&
plan_node.join_node.__isset.hash_join_node) {
// Currently either hash or nested loop joins are the only filter sources.
// Otherwise it must be a filter consumer.
if (plan_node.__isset.join_node
&& (plan_node.join_node.__isset.hash_join_node
|| plan_node.join_node.__isset.nested_loop_join_node)) {
AddFilterSource(
fragment_params, num_instances, num_backends, filter, plan_node.node_id);
} else if (plan_node.__isset.hdfs_scan_node || plan_node.__isset.kudu_scan_node) {

View File

@@ -284,6 +284,13 @@ class QueryState {
/// The default BATCH_SIZE.
static const int DEFAULT_BATCH_SIZE = 1024;
/// Given a fragment index 'fragmentIdx', return its corresponding FragmentState from
/// the map 'fragment_state_map_'. Return nullptr if the index is not in the map.
FragmentState* findFragmentState(TFragmentIdx fragmentIdx) {
auto it = fragment_state_map_.find(fragmentIdx);
return (it != fragment_state_map_.end()) ? it->second : nullptr;
}
private:
friend class QueryExecMgr;

View File

@@ -19,6 +19,7 @@
#include <mutex>
#include "gen-cpp/ExternalDataSource_types.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-filter-bank.h"
#include "util/bloom-filter.h"
@@ -64,6 +65,10 @@ class RuntimeFilter {
return filter_desc().type == TRuntimeFilterType::MIN_MAX;
}
extdatasource::TComparisonOp::type getCompareOp() const {
return filter_desc().compareOp;
}
BloomFilter* get_bloom_filter() const { return bloom_filter_.Load(); }
MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }

View File

@@ -131,5 +131,40 @@ TEST(StringValueTest, TestConvertToUInt64) {
EXPECT_EQ(StringValue("\1\2\3\4\5\6\7\7\7").ToUInt64(), 0x102030405060707);
}
// Test finding the least smaller strings.
TEST(StringValueTest, TestLeastSmallerString) {
string oneKbNullStr(1024, 0x00);
string a1023NullStr(1023, 0x00);
EXPECT_EQ(StringValue(oneKbNullStr).LeastSmallerString(), a1023NullStr);
EXPECT_EQ(
StringValue(string("\x12\xef", 2)).LeastSmallerString(), string("\x12\xee"));
EXPECT_EQ(
StringValue(string("\x12\x00", 2)).LeastSmallerString(), string("\x12"));
// "0x00" is the smallest string.
string oneNullStr("\00", 1);
EXPECT_EQ(StringValue(oneNullStr).LeastSmallerString(), "");
}
// Test finding the least larger strings.
TEST(StringValueTest, TestLeastLargerString) {
string nullStr("\x00", 1);
EXPECT_EQ(StringValue(nullStr).LeastLargerString(), string("\x01", 1));
string a10230xFFStr(1023, 0xff);
string oneKbStr(1023, 0xff);
oneKbStr.append(1, 0x00);
EXPECT_EQ(StringValue(a10230xFFStr).LeastLargerString(), oneKbStr);
EXPECT_EQ(
StringValue(string("\x12\xef", 2)).LeastLargerString(), string("\x12\xf0"));
EXPECT_EQ(StringValue(string("\x12\xff", 2)).LeastLargerString(),
string("\x13"));
string emptyStr("", 0);
EXPECT_EQ(StringValue(emptyStr).LeastLargerString(), string("\00", 1));
}
}

View File

@@ -44,4 +44,57 @@ uint64_t StringValue::ToUInt64() const {
| static_cast<uint64_t>(bytes[4]) << 24 | static_cast<uint64_t>(bytes[5]) << 16
| static_cast<uint64_t>(bytes[6]) << 8 | static_cast<uint64_t>(bytes[7]);
}
string StringValue::LeastSmallerString() const {
if (len == 1 && ptr[0] == 0x00) return "";
int i = len - 1;
while (i >= 0 && ptr[i] == 0x00) i--;
if (UNLIKELY(i == -1)) {
// All characters are 0x00. Return a string with len-1 0x00 chars.
return string(len - 1, 0x00);
}
// i is pointing at a character != 0x00.
if (i < len - 1) {
// return 'this' without the last trailing 0x00
return string(ptr, len - 1);
}
DCHECK_EQ(i, len - 1);
// Copy characters of this in [0, i] to 'result' and perform a '-1' operation on the
// ith char.
string result;
result.reserve(i + 1);
// copy all i characters in [0, i-1] to 'result'
result.append(ptr, i);
// append a char which is ptr[i]-1
result.append(1, (uint8_t)(ptr[i]) - 1);
return result;
}
string StringValue::LeastLargerString() const {
if (len == 0) return string("\00", 1);
int i = len - 1;
while (i >= 0 && ptr[i] == (int8_t)0xff) i--;
if (UNLIKELY(i == -1)) {
// All characters are 0xff.
// Return a string with these many 0xff chars plus one 0x00 char
string result;
result.reserve(len + 1);
result.append(len, 0xff);
result.append(1, 0x00);
return result;
}
// i is pointing at a character != 0xff.
// Copy characters of this in [0, i] to 'result' and perform a '+1' operation on the
// ith char.
string result;
result.reserve(i + 1);
// copy all i characters in [0, i-1] to 'result'
result.append(ptr, i);
// append a char which is ptr[i]+1
result.append(1, (uint8_t)(ptr[i]) + 1);
return result;
}
}

View File

@@ -125,6 +125,13 @@ struct __attribute__((__packed__)) StringValue {
/// Returns number of characters in a char array (ignores trailing spaces)
inline static int64_t UnpaddedCharLength(const char* cptr, int64_t len);
// Return the least smaller string 'result' such that 'result' < 'this'.
// The smallest string is "\x00". If no such string exists, return an empty string.
std::string LeastSmallerString() const;
// Return the least larger string 'result' such that 'this' < 'result'.
std::string LeastLargerString() const;
/// For C++/IR interop, we need to be able to look up types by name.
static const char* LLVM_CLASS_NAME;
};

View File

@@ -24,6 +24,7 @@
#include <boost/date_time/gregorian/greg_date.hpp>
#include <boost/date_time/posix_time/posix_time_config.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/special_defs.hpp>
#include <boost/date_time/time_duration.hpp>
@@ -198,6 +199,18 @@ class TimestampValue {
&& date.day_number() <= MAX_DAY_NUMBER;
}
static inline TimestampValue GetMinValue() {
return TimestampValue(
boost::gregorian::date(boost::date_time::min_date_time),
boost::posix_time::time_duration(0, 0, 0, 0));
}
static inline TimestampValue GetMaxValue() {
return TimestampValue(
boost::gregorian::date(boost::date_time::max_date_time),
boost::posix_time::nanoseconds(NANOS_PER_DAY - 1));
}
/// Verifies that the time is not negative and is less than a whole day.
static inline bool IsValidTime(const boost::posix_time::time_duration& time) {
return !time.is_negative()

View File

@@ -15,22 +15,73 @@
// specific language governing permissions and limitations
// under the License.
#include "util/min-max-filter.h"
#include "runtime/string-value.inline.h"
#include "util/min-max-filter.h"
using std::string;
namespace impala {
#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE) \
void NAME##MinMaxFilter::Insert(const void* val) { \
if (LIKELY(val)) { \
const TYPE* value = reinterpret_cast<const TYPE*>(val); \
if (UNLIKELY(*value < min_)) min_ = *value; \
if (UNLIKELY(*value > max_)) max_ = *value; \
} \
} \
template <typename T>
void AddOne(T& v) {
v++;
}
template <>
void AddOne(bool& v) {
v = true;
}
template <typename T>
void SubtractOne(T& v) {
v--;
}
template <>
void SubtractOne(bool& v) {
v = false;
}
#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE) \
void NAME##MinMaxFilter::Insert(const void* val) { \
if (LIKELY(val)) { \
const TYPE* value = reinterpret_cast<const TYPE*>(val); \
if (UNLIKELY(*value < min_)) min_ = *value; \
if (UNLIKELY(*value > max_)) max_ = *value; \
} \
} \
void NAME##MinMaxFilter::InsertForLE(const void* val) { \
if (LIKELY(val)) { \
const TYPE* value = reinterpret_cast<const TYPE*>(val); \
min_ = std::numeric_limits<TYPE>::lowest(); \
if (UNLIKELY(*value > max_)) max_ = *value; \
} \
} \
void NAME##MinMaxFilter::InsertForLT(const void* val) { \
if (LIKELY(val)) { \
TYPE value = *reinterpret_cast<const TYPE*>(val); \
min_ = std::numeric_limits<TYPE>::lowest(); \
if (value > std::numeric_limits<TYPE>::lowest()) { \
SubtractOne(value); \
} \
if (UNLIKELY(value > max_)) max_ = value; \
} \
} \
void NAME##MinMaxFilter::InsertForGE(const void* val) { \
if (LIKELY(val)) { \
max_ = std::numeric_limits<TYPE>::max(); \
const TYPE* value = reinterpret_cast<const TYPE*>(val); \
if (UNLIKELY(*value < min_)) min_ = *value; \
} \
} \
void NAME##MinMaxFilter::InsertForGT(const void* val) { \
if (LIKELY(val)) { \
max_ = std::numeric_limits<TYPE>::max(); \
TYPE value = *reinterpret_cast<const TYPE*>(val); \
if (value < std::numeric_limits<TYPE>::max()) AddOne(value); \
if (UNLIKELY(value < min_)) min_ = value; \
} \
} \
bool NAME##MinMaxFilter::AlwaysTrue() const { return always_true_; }
NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool);
@@ -60,7 +111,76 @@ void StringMinMaxFilter::Insert(const void* val) {
}
}
bool StringMinMaxFilter::AlwaysTrue() const { return always_true_; }
void StringMinMaxFilter::UpdateMax(const StringValue& value) {
if (UNLIKELY(always_false_)) {
min_ = MIN_BOUND_STRING;
max_ = value;
always_false_ = false;
} else {
if (UNLIKELY(max_ < value)) {
max_ = value;
}
}
// Transfer the ownership of memory used in value to max_buffer. Truncation is
// possible if max_ is longer than MAX_BOUND_LENGTH bytes.
MaterializeMaxValue();
}
void StringMinMaxFilter::InsertForLE(const void* val) {
if (LIKELY(val)) {
const StringValue* value = reinterpret_cast<const StringValue*>(val);
UpdateMax(*value);
}
}
void StringMinMaxFilter::InsertForLT(const void* val) {
if (LIKELY(val)) {
std::string result =
reinterpret_cast<const StringValue*>(val)->LeastSmallerString();
if (result.size() > 0) {
UpdateMax(StringValue(result));
} else {
always_true_ = true;
}
}
}
void StringMinMaxFilter::UpdateMin(const StringValue& value) {
if (UNLIKELY(always_false_)) {
min_ = value;
max_ = MAX_BOUND_STRING;
always_false_ = false;
} else {
if (UNLIKELY(value < min_)) {
min_ = value;
}
}
// Transfer the ownership of memory used in value to min_buffer. Truncation is
// possible if min_ is longer than MAX_BOUND_LENGTH bytes.
MaterializeMinValue();
}
void StringMinMaxFilter::InsertForGE(const void* val) {
if (LIKELY(val)) {
const StringValue* value = reinterpret_cast<const StringValue*>(val);
UpdateMin(*value);
}
}
void StringMinMaxFilter::InsertForGT(const void* val) {
if (LIKELY(val)) {
std::string result = reinterpret_cast<const StringValue*>(val)->LeastLargerString();
if (result.size() <= MAX_BOUND_LENGTH) {
UpdateMin(StringValue(result));
} else {
always_true_ = true;
}
}
}
bool StringMinMaxFilter::AlwaysTrue() const {
return always_true_;
}
#define DATE_TIME_MIN_MAX_FILTER_FUNCS(NAME, TYPE) \
void NAME##MinMaxFilter::Insert(const void* val) { \
@@ -84,6 +204,112 @@ bool StringMinMaxFilter::AlwaysTrue() const { return always_true_; }
DATE_TIME_MIN_MAX_FILTER_FUNCS(Timestamp, TimestampValue);
DATE_TIME_MIN_MAX_FILTER_FUNCS(Date, DateValue);
void DateMinMaxFilter::UpdateMax(const DateValue& value) {
if (UNLIKELY(always_false_)) {
min_ = DateValue::MIN_DATE;
max_ = value;
always_false_ = false;
} else if (UNLIKELY(value > max_)) {
max_ = value;
}
}
void DateMinMaxFilter::InsertForLE(const void* val) {
if (LIKELY(val)) {
const DateValue* value = reinterpret_cast<const DateValue*>(val);
UpdateMax(*value);
}
}
void DateMinMaxFilter::InsertForLT(const void* val) {
if (LIKELY(val)) {
DateValue value = *reinterpret_cast<const DateValue*>(val);
if (value > DateValue::MIN_DATE) value = value.SubtractDays(1);
UpdateMax(value);
}
}
void DateMinMaxFilter::UpdateMin(const DateValue& value) {
if (UNLIKELY(always_false_)) {
min_ = value;
max_ = DateValue::MAX_DATE;
always_false_ = false;
} else if (UNLIKELY(value < min_)) {
min_ = value;
}
}
void DateMinMaxFilter::InsertForGE(const void* val) {
if (LIKELY(val)) {
const DateValue* value = reinterpret_cast<const DateValue*>(val);
UpdateMin(*value);
}
}
void DateMinMaxFilter::InsertForGT(const void* val) {
if (LIKELY(val)) {
DateValue value = *reinterpret_cast<const DateValue*>(val);
if (value < DateValue::MAX_DATE) value = value.AddDays(1);
UpdateMin(value);
}
}
void TimestampMinMaxFilter::UpdateMax(const TimestampValue& value) {
if (UNLIKELY(always_false_)) {
min_ = TimestampValue::GetMinValue();
max_ = value;
always_false_ = false;
} else if (UNLIKELY(value > max_)) {
max_ = value;
}
}
void TimestampMinMaxFilter::InsertForLE(const void* val) {
if (LIKELY(val)) {
const TimestampValue* value = reinterpret_cast<const TimestampValue*>(val);
UpdateMax(*value);
}
}
void TimestampMinMaxFilter::InsertForLT(const void* val) {
if (LIKELY(val)) {
TimestampValue value = *reinterpret_cast<const TimestampValue*>(val);
if (TimestampValue::GetMinValue() < value) {
// subtract one nanosecond.
value = value.Subtract(boost::posix_time::time_duration(0, 0, 0, 1));
}
UpdateMax(value);
}
}
void TimestampMinMaxFilter::UpdateMin(const TimestampValue& value) {
if (UNLIKELY(always_false_)) {
min_ = value;
max_ = TimestampValue::GetMaxValue();
always_false_ = false;
} else if (UNLIKELY(value < min_)) {
min_ = value;
}
}
void TimestampMinMaxFilter::InsertForGE(const void* val) {
if (LIKELY(val)) {
const TimestampValue* value = reinterpret_cast<const TimestampValue*>(val);
UpdateMin(*value);
}
}
void TimestampMinMaxFilter::InsertForGT(const void* val) {
if (LIKELY(val)) {
TimestampValue value = *reinterpret_cast<const TimestampValue*>(val);
if (value < TimestampValue::GetMaxValue()) {
// Add one nanosecond.
value = value.Add(boost::posix_time::time_duration(0, 0, 0, 1));
}
UpdateMin(value);
}
}
#define INSERT_DECIMAL_MINMAX(SIZE) \
do { \
if (LIKELY(val)) { \
@@ -128,6 +354,126 @@ void DecimalMinMaxFilter::Insert16(const void* val) {
}
}
bool DecimalMinMaxFilter::AlwaysTrue() const { return always_true_; }
#define UPDATE_MAX_DECIMAL_MINMAX(TYPE, SIZE) \
void DecimalMinMaxFilter::UpdateMax(const Decimal##SIZE##Value& value) { \
if (UNLIKELY(always_false_)) { \
min##SIZE##_.set_value(std::numeric_limits<TYPE>::min()); \
max##SIZE##_ = value; \
always_false_ = false; \
} else { \
if (UNLIKELY(value > max##SIZE##_)) { \
max##SIZE##_ = value; \
} \
} \
}
UPDATE_MAX_DECIMAL_MINMAX(int32_t, 4);
UPDATE_MAX_DECIMAL_MINMAX(int64_t, 8);
UPDATE_MAX_DECIMAL_MINMAX(__int128_t, 16);
#define INSERT_DECIMAL_MINMAX_FOR_LE(TYPE, SIZE) \
do { \
if (LIKELY(val)) { \
const Decimal##SIZE##Value* value = \
reinterpret_cast<const Decimal##SIZE##Value*>(val); \
UpdateMax(*value); \
} \
} while (false)
#define INSERT_DECIMAL_MINMAX_FOR_LT(TYPE, SIZE) \
do { \
if (LIKELY(val)) { \
TYPE value = reinterpret_cast<const Decimal##SIZE##Value*>(val)->value(); \
if (value > std::numeric_limits<TYPE>::min()) value--; \
UpdateMax(Decimal##SIZE##Value(value)); \
} \
} while (false)
void DecimalMinMaxFilter::Insert4ForLE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LE(int32_t, 4);
}
void DecimalMinMaxFilter::Insert8ForLE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LE(int64_t, 8);
}
void DecimalMinMaxFilter::Insert16ForLE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LE(__int128_t, 16);
}
void DecimalMinMaxFilter::Insert4ForLT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LT(int32_t, 4);
}
void DecimalMinMaxFilter::Insert8ForLT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LT(int64_t, 8);
}
void DecimalMinMaxFilter::Insert16ForLT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_LT(__int128_t, 16);
}
#define UPDATE_MIN_DECIMAL_MINMAX(TYPE, SIZE) \
void DecimalMinMaxFilter::UpdateMin(const Decimal##SIZE##Value& value) { \
if (UNLIKELY(always_false_)) { \
min##SIZE##_ = value; \
max##SIZE##_.set_value(std::numeric_limits<TYPE>::max()); \
always_false_ = false; \
} else { \
if (UNLIKELY(value < min##SIZE##_)) { \
min##SIZE##_ = value; \
} \
} \
}
UPDATE_MIN_DECIMAL_MINMAX(int32_t, 4);
UPDATE_MIN_DECIMAL_MINMAX(int64_t, 8);
UPDATE_MIN_DECIMAL_MINMAX(__int128_t, 16);
#define INSERT_DECIMAL_MINMAX_FOR_GE(TYPE, SIZE) \
do { \
if (LIKELY(val)) { \
const Decimal##SIZE##Value* value = \
reinterpret_cast<const Decimal##SIZE##Value*>(val); \
UpdateMin(*value); \
} \
} while (false)
#define INSERT_DECIMAL_MINMAX_FOR_GT(TYPE, SIZE) \
do { \
if (LIKELY(val)) { \
TYPE value = reinterpret_cast<const Decimal##SIZE##Value*>(val)->value(); \
if (value < std::numeric_limits<TYPE>::max()) value++; \
UpdateMin(Decimal##SIZE##Value(value)); \
} \
} while (false)
void DecimalMinMaxFilter::Insert4ForGE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GE(int32_t, 4);
}
void DecimalMinMaxFilter::Insert8ForGE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GE(int64_t, 8);
}
void DecimalMinMaxFilter::Insert16ForGE(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GE(__int128_t, 16);
}
void DecimalMinMaxFilter::Insert4ForGT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GT(int32_t, 4);
}
void DecimalMinMaxFilter::Insert8ForGT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GT(int64_t, 8);
}
void DecimalMinMaxFilter::Insert16ForGT(const void* val) {
INSERT_DECIMAL_MINMAX_FOR_GT(__int128_t, 16);
}
bool DecimalMinMaxFilter::AlwaysTrue() const {
return always_true_;
}
} // namespace impala

View File

@@ -89,9 +89,33 @@ TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
EXPECT_FALSE(f1->AlwaysTrue());
EXPECT_FALSE(f1->AlwaysFalse());
MinMaxFilter* f3= MinMaxFilter::Create(bool_column_type, &obj_pool, &mem_tracker);
f3->InsertForLE(&b2 /*false, update max*/);
EXPECT_EQ(*reinterpret_cast<const bool*>(f3->GetMin()), false);
EXPECT_EQ(*reinterpret_cast<const bool*>(f3->GetMax()), false);
MinMaxFilter* f4= MinMaxFilter::Create(bool_column_type, &obj_pool, &mem_tracker);
f4->InsertForLT(&b1 /*true, update max with false*/);
EXPECT_EQ(*reinterpret_cast<const bool*>(f4->GetMin()), false);
EXPECT_EQ(*reinterpret_cast<const bool*>(f4->GetMax()), false);
MinMaxFilter* f5= MinMaxFilter::Create(bool_column_type, &obj_pool, &mem_tracker);
f5->InsertForGE(&b2 /*false, update min */);
EXPECT_EQ(*reinterpret_cast<const bool*>(f5->GetMin()), false);
EXPECT_EQ(*reinterpret_cast<const bool*>(f5->GetMax()), true);
MinMaxFilter* f6= MinMaxFilter::Create(bool_column_type, &obj_pool, &mem_tracker);
f6->InsertForGT(&b1 /*true, update min with true */);
EXPECT_EQ(*reinterpret_cast<const bool*>(f6->GetMin()), true);
EXPECT_EQ(*reinterpret_cast<const bool*>(f6->GetMax()), true);
filter->Close();
f1->Close();
f2->Close();
f3->Close();
f4->Close();
f5->Close();
f6->Close();
}
void CheckIntVals(MinMaxFilter* filter, int32_t min, int32_t max) {
@@ -192,11 +216,38 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
MinMaxFilter::Or(pFilter2, &pFilter3, int_type);
EXPECT_EQ(pFilter3.always_true(), true);
// Test single range inserts for LE and LT
MinMaxFilter* int_filter_L = MinMaxFilter::Create(int_type, &obj_pool, &mem_tracker);
int_filter_L->InsertForLE(&i1);
MinMaxFilterPB pFilterL;
int_filter_L->ToProtobuf(&pFilterL);
EXPECT_EQ(pFilterL.min().int_val(), std::numeric_limits<int32_t>::lowest());
EXPECT_EQ(pFilterL.max().int_val(), i1);
int_filter_L->InsertForLT(&i2);
int_filter_L->ToProtobuf(&pFilterL);
EXPECT_EQ(pFilterL.min().int_val(), std::numeric_limits<int32_t>::lowest());
EXPECT_EQ(pFilterL.max().int_val(), i2-1);
// Test single range inserts for GE and GT
MinMaxFilter* int_filter_R = MinMaxFilter::Create(int_type, &obj_pool, &mem_tracker);
int_filter_R->InsertForGE(&i2);
MinMaxFilterPB pFilterR;
int_filter_R->ToProtobuf(&pFilterR);
EXPECT_EQ(pFilterR.min().int_val(), i2);
EXPECT_EQ(pFilterR.max().int_val(), std::numeric_limits<int32_t>::max());
int_filter_R->InsertForGT(&i1);
int_filter_R->ToProtobuf(&pFilterR);
EXPECT_EQ(pFilterR.min().int_val(), i1+1);
EXPECT_EQ(pFilterR.max().int_val(), std::numeric_limits<int32_t>::max());
int_filter->Close();
empty_filter->Close();
int_filter2->Close();
f1->Close();
f2->Close();
int_filter_L->Close();
int_filter_R->Close();
}
// Make a string that is compared less than 'str'
@@ -221,7 +272,8 @@ string make_greater_than(const string& str) {
return result;
}
void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max) {
void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max,
bool check_overlap = true) {
StringValue actual_min = *reinterpret_cast<const StringValue*>(filter->GetMin());
StringValue actual_max = *reinterpret_cast<const StringValue*>(filter->GetMax());
StringValue expected_min(min);
@@ -231,6 +283,8 @@ void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max)
EXPECT_FALSE(filter->AlwaysTrue());
EXPECT_FALSE(filter->AlwaysFalse());
if (!check_overlap) return;
// Check overlaps. The range is [min, max] in filter.
ColumnType string_type(PrimitiveType::TYPE_STRING);
@@ -463,6 +517,70 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
f2->Close();
always_false->Close();
f3->Close();
// Test single range insertion
string s("abcdefg");
StringValue stringVal(s);
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForLE(&stringVal);
CheckStringVals(f1, StringMinMaxFilter::min_string, s, false);
f1->Close();
f2 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f2->InsertForGE(&stringVal);
CheckStringVals(f2, s, StringMinMaxFilter::max_string, false);
f2->Close();
f3 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f3->InsertForLT(&stringVal);
string less("abcdeff", 7);
CheckStringVals(f3, StringMinMaxFilter::min_string, less, false);
f3->Close();
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForGT(&stringVal);
string more("abcdefh", 7);
CheckStringVals(f1, more, StringMinMaxFilter::max_string, false);
f1->Close();
// Insertions leading to out of boundary situations should turn off the
// filters.
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForLT(&StringMinMaxFilter::MIN_BOUND_STRING);
EXPECT_TRUE(f1->AlwaysTrue());
f1->Close();
// The least larger string for MAX_BOUND_STRING does not exist since MAX_BOUND_STRING
// is the largest string.
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForGT(&StringMinMaxFilter::MAX_BOUND_STRING);
EXPECT_TRUE(f1->AlwaysTrue());
f1->Close();
// Insert a string of 3 nulls with LT will change the max value to a string with two
// 0x00 chars.
string twoNulls(2, 0x00);
string threeNulls(3, 0x00);
StringValue twoNullVal(twoNulls);
StringValue threeNullVal(threeNulls);
string two0xffChars(2, 0xff);
string three0xffChars(3, 0xff);
StringValue two0xFFVal(two0xffChars);
StringValue three0xFFVal(three0xffChars);
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForLT(&threeNullVal);
CheckStringVals(f1, StringMinMaxFilter::min_string, twoNulls, false);
f1->Close();
// Insert a string of 2 0xff chars with GT will change the min value to a string with
// two 0xff char and one 0x00 char.
f1 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
f1->InsertForGT(&two0xFFVal);
CheckStringVals(f1, string("\xff\xff\x00", 3), StringMinMaxFilter::max_string, false);
f1->Close();
}
static TimestampValue ParseSimpleTimestamp(const char* s) {
@@ -537,12 +655,14 @@ void CheckOverlapForTimestamp(MinMaxFilter* filter, const ColumnType& col_type,
#define DATE_TIME_CHECK_VALS(NAME, TYPE) \
void Check##NAME##Vals(MinMaxFilter* filter, const ColumnType& col_type, \
const TYPE& min, const TYPE& max) { \
const TYPE& min, const TYPE& max, bool checkOverlap = true) { \
EXPECT_EQ(*reinterpret_cast<const TYPE*>(filter->GetMin()), min); \
EXPECT_EQ(*reinterpret_cast<const TYPE*>(filter->GetMax()), max); \
EXPECT_FALSE(filter->AlwaysFalse()); \
EXPECT_FALSE(filter->AlwaysTrue()); \
CheckOverlapFor##NAME(filter, col_type, min, max); \
if (checkOverlap) { \
CheckOverlapFor##NAME(filter, col_type, min, max); \
} \
}
DATE_TIME_CHECK_VALS(Timestamp, TimestampValue);
@@ -617,6 +737,71 @@ TEST(MinMaxFilterTest, TestDateMinMaxFilter) {
DATE_TIME_CHECK_FUNCS(Date, DateValue, date, DATE);
}
TEST(MinMaxFilterTest, TestDateMinMaxFilterSingleRange) {
ObjectPool obj_pool;
MemTracker mem_tracker;
ColumnType col_type(PrimitiveType::TYPE_DATE);
MinMaxFilter* filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
DateValue d1 = ParseSimpleDate("2001-04-30 05:00:00");
filter->InsertForLE(&d1);
CheckDateVals(filter, col_type, DateValue::MIN_DATE, d1);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
DateValue d2 = ParseSimpleDate("2021-01-01 05:00:00");
filter->InsertForLT(&d2);
DateValue d3 = ParseSimpleDate("2020-12-31 05:00:00");
CheckDateVals(
filter, col_type, DateValue::MIN_DATE, d3, false /* do not check overlap*/);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
filter->InsertForGE(&d1);
CheckDateVals(filter, col_type, d1, DateValue::MAX_DATE);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
filter->InsertForGT(&d3);
CheckDateVals(
filter, col_type, d2, DateValue::MAX_DATE, false /* do not check overlap*/);
filter->Close();
}
TEST(MinMaxFilterTest, TestTimestampMinMaxFilterSingleRange) {
ObjectPool obj_pool;
MemTracker mem_tracker;
ColumnType col_type(PrimitiveType::TYPE_TIMESTAMP);
MinMaxFilter* filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
TimestampValue t1 = ParseSimpleTimestamp("2001-04-30 05:00:00");
filter->InsertForLE(&t1);
CheckTimestampVals(filter, col_type, TimestampValue::GetMinValue(), t1,
false /* do not check overlap*/);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
TimestampValue t2 = ParseSimpleTimestamp("2021-01-01 05:00:00");
filter->InsertForLT(&t2);
TimestampValue t3 = t2.Subtract(boost::posix_time::time_duration(0, 0, 0, 1));
CheckTimestampVals(filter, col_type, TimestampValue::GetMinValue(), t3,
false /* do not check overlap*/);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
filter->InsertForGE(&t1);
CheckTimestampVals(filter, col_type, t1, TimestampValue::GetMaxValue(),
false /* do not check overlap*/);
filter->Close();
filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker);
filter->InsertForGT(&t3);
CheckTimestampVals(filter, col_type, t2, TimestampValue::GetMaxValue(),
false /* do not check overlap*/);
filter->Close();
}
#define DECIMAL_ADD(SIZE, result_type) \
Decimal##SIZE##Value Decimal##SIZE##Add(const Decimal##SIZE##Value& value, \
int precision, int scale, double x, bool* overflow) { \
@@ -637,6 +822,7 @@ DECIMAL_ADD(16, __int128_t);
EXPECT_EQ(*reinterpret_cast<const Decimal##SIZE##Value*>(filter->GetMax()), max); \
EXPECT_FALSE(filter->AlwaysFalse()); \
EXPECT_FALSE(filter->AlwaysTrue()); \
if (!check_overlap) return; \
/* Check overlaps. The range is [min, max] in filter.*/ \
/* Check overlapping with [min-10, min-1], which should be false.*/ \
const int& precision = col_type.precision; \
@@ -664,17 +850,17 @@ DECIMAL_ADD(16, __int128_t);
} while (false)
void CheckDecimalVals(MinMaxFilter* filter, const ColumnType& col_type,
const Decimal4Value& min, const Decimal4Value& max) {
const Decimal4Value& min, const Decimal4Value& max, bool check_overlap = true) {
DECIMAL_CHECK(4);
}
void CheckDecimalVals(MinMaxFilter* filter, const ColumnType& col_type,
const Decimal8Value& min, const Decimal8Value& max) {
const Decimal8Value& min, const Decimal8Value& max, bool check_overlap = true) {
DECIMAL_CHECK(8);
}
void CheckDecimalVals(MinMaxFilter* filter, const ColumnType& col_type,
const Decimal16Value& min, const Decimal16Value& max) {
const Decimal16Value& min, const Decimal16Value& max, bool check_overlap = true) {
DECIMAL_CHECK(16);
}
@@ -741,6 +927,36 @@ void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type
EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.max()), d2##SIZE); \
} while (false)
#define DECIMAL_SINGLE_RANGE_INSERT_AND_CHECK( \
PRIMITIVE_TYPE, SIZE, PRECISION, SCALE, VALUE) \
do { \
d1##SIZE = \
Decimal##SIZE##Value::FromDouble(PRECISION, SCALE, VALUE, false, &overflow); \
\
((DecimalMinMaxFilter*)filter##SIZE)->SetAlwaysFalse(); \
filter##SIZE->InsertForLE(&d1##SIZE); \
Decimal##SIZE##Value min##SIZE = \
Decimal##SIZE##Value(std::numeric_limits<PRIMITIVE_TYPE>::lowest()); \
CheckDecimalVals(filter##SIZE, decimal##SIZE##_type, min##SIZE, d1##SIZE, false); \
\
((DecimalMinMaxFilter*)filter##SIZE)->SetAlwaysFalse(); \
filter##SIZE->InsertForLT(&d1##SIZE); \
Decimal##SIZE##Value d1_minus_one##SIZE = \
Decimal##SIZE##Value(d1##SIZE.value() - 1); \
CheckDecimalVals( \
filter##SIZE, decimal##SIZE##_type, min##SIZE, d1_minus_one##SIZE, false); \
\
((DecimalMinMaxFilter*)filter##SIZE)->SetAlwaysFalse(); \
filter##SIZE->InsertForGE(&d1##SIZE); \
Decimal##SIZE##Value max##SIZE = \
Decimal##SIZE##Value(std::numeric_limits<PRIMITIVE_TYPE>::max()); \
CheckDecimalVals(filter##SIZE, decimal##SIZE##_type, d1##SIZE, max##SIZE, false); \
\
((DecimalMinMaxFilter*)filter##SIZE)->SetAlwaysFalse(); \
filter##SIZE->InsertForGT(&d1_minus_one##SIZE); \
CheckDecimalVals(filter##SIZE, decimal##SIZE##_type, d1##SIZE, max##SIZE, false); \
} while (false)
// Tests that a DecimalMinMaxFilter returns the expected min/max after having values
// inserted into it, and that MinMaxFilter::Or works for decimal values.
TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
@@ -788,6 +1004,13 @@ TEST(MinMaxFilterTest, TestDecimalMinMaxFilter) {
DECIMAL_CHECK_OR(8);
DECIMAL_CHECK_OR(16);
// Single range insert and check
DECIMAL_SINGLE_RANGE_INSERT_AND_CHECK(int32_t, 4, 9, 5, 2345.67891);
DECIMAL_SINGLE_RANGE_INSERT_AND_CHECK(int64_t, 8, 18, 9, 234567891.123456789);
// Testing of 16-byte DECIMALs is ignored since FE disables the min/max filtering
// against single ranges for such decimals.
// Close all filters
filter4->Close();
filter8->Close();

View File

@@ -394,6 +394,12 @@ APPROXIMATE_NUMERIC_MIN_MAX_FILTER_EVAL_OVERLAP(Double, double);
const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFilter";
const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
const std::string StringMinMaxFilter::min_string("\0", 1);
const std::string StringMinMaxFilter::max_string(MAX_BOUND_LENGTH, (uint8_t)0xff);
const StringValue StringMinMaxFilter::MIN_BOUND_STRING(min_string);
const StringValue StringMinMaxFilter::MAX_BOUND_STRING(max_string);
StringMinMaxFilter::StringMinMaxFilter(
const MinMaxFilterPB& protobuf, MemTracker* mem_tracker)
: mem_pool_(mem_tracker), min_buffer_(&mem_pool_), max_buffer_(&mem_pool_) {
@@ -415,37 +421,41 @@ PrimitiveType StringMinMaxFilter::type() const {
return PrimitiveType::TYPE_STRING;
}
void StringMinMaxFilter::MaterializeMinValue() {
if (min_.len > MAX_BOUND_LENGTH) {
// Truncating 'value' gives a valid min bound as the result will be <= 'value'.
CopyToBuffer(&min_buffer_, &min_, MAX_BOUND_LENGTH);
} else {
CopyToBuffer(&min_buffer_, &min_, min_.len);
}
}
void StringMinMaxFilter::MaterializeMaxValue() {
if (max_.len > MAX_BOUND_LENGTH) {
CopyToBuffer(&max_buffer_, &max_, MAX_BOUND_LENGTH);
if (always_true_) return;
// After truncating 'value', to still have a valid max bound we add 1 to one char in
// the string, so that the result will be > 'value'. If the entire string is already
// the max char, then disable this filter by making it always_true.
int i = MAX_BOUND_LENGTH - 1;
while (i >= 0 && static_cast<int32_t>(max_buffer_.buffer()[i]) == -1) {
max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
--i;
}
if (i == -1) {
SetAlwaysTrue();
return;
}
max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
} else {
CopyToBuffer(&max_buffer_, &max_, max_.len);
}
}
void StringMinMaxFilter::MaterializeValues() {
if (always_true_ || always_false_) return;
if (min_buffer_.IsEmpty()) {
if (min_.len > MAX_BOUND_LENGTH) {
// Truncating 'value' gives a valid min bound as the result will be <= 'value'.
CopyToBuffer(&min_buffer_, &min_, MAX_BOUND_LENGTH);
} else {
CopyToBuffer(&min_buffer_, &min_, min_.len);
}
}
if (max_buffer_.IsEmpty()) {
if (max_.len > MAX_BOUND_LENGTH) {
CopyToBuffer(&max_buffer_, &max_, MAX_BOUND_LENGTH);
if (always_true_) return;
// After truncating 'value', to still have a valid max bound we add 1 to one char in
// the string, so that the result will be > 'value'. If the entire string is already
// the max char, then disable this filter by making it always_true.
int i = MAX_BOUND_LENGTH - 1;
while (i >= 0 && static_cast<int32_t>(max_buffer_.buffer()[i]) == -1) {
max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
--i;
}
if (i == -1) {
SetAlwaysTrue();
return;
}
max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
} else {
CopyToBuffer(&max_buffer_, &max_, max_.len);
}
}
if (min_buffer_.IsEmpty()) MaterializeMinValue();
if (max_buffer_.IsEmpty()) MaterializeMaxValue();
}
void StringMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
@@ -735,6 +745,74 @@ void DecimalMinMaxFilter::Insert(const void* val) {
}
}
void DecimalMinMaxFilter::InsertForLE(const void* val) {
if (val == nullptr) return;
switch (size_) {
case 4:
Insert4ForLE(val);
break;
case 8:
Insert8ForLE(val);
break;
case 16:
Insert16ForLE(val);
break;
default:
DCHECK(false) << "Unknown decimal size: " << size_;
}
}
void DecimalMinMaxFilter::InsertForGE(const void* val) {
if (val == nullptr) return;
switch (size_) {
case 4:
Insert4ForGE(val);
break;
case 8:
Insert8ForGE(val);
break;
case 16:
Insert16ForGE(val);
break;
default:
DCHECK(false) << "Unknown decimal size: " << size_;
}
}
void DecimalMinMaxFilter::InsertForLT(const void* val) {
if (val == nullptr) return;
switch (size_) {
case 4:
Insert4ForLT(val);
break;
case 8:
Insert8ForLT(val);
break;
case 16:
Insert16ForLT(val);
break;
default:
DCHECK(false) << "Unknown decimal size: " << size_;
}
}
void DecimalMinMaxFilter::InsertForGT(const void* val) {
if (val == nullptr) return;
switch (size_) {
case 4:
Insert4ForGT(val);
break;
case 8:
Insert8ForGT(val);
break;
case 16:
Insert16ForGT(val);
break;
default:
DCHECK(false) << "Unknown decimal size: " << size_;
}
}
#define DECIMAL_DEBUG_STRING(SIZE) \
do { \
out << "DecimalMinMaxFilter(min=" << min##SIZE##_ << ", max=" << max##SIZE##_ \

View File

@@ -36,8 +36,9 @@ class ObjectPool;
/// runtime filters.
///
/// Filters are constructed using MinMaxFilter::Create() which returns a MinMaxFilter of
/// the appropriate type. Values can then be added using Insert(), and the min and max can
/// be retrieved using GetMin()/GetMax().
/// the appropriate type. Values can then be added using Insert(), InsertForLE(),
/// InsertForLT(), InsertForGE() or InsertForGT, and the min and max can be retrieved
/// using GetMin()/GetMax().
///
/// MinMaxFilters ignore NULL values, and so are only appropriate to use as a runtime
/// filter if the join predicate is '=' and not 'is not distinct from'.
@@ -72,6 +73,22 @@ class MinMaxFilter {
/// Add a new value, updating the current min/max.
virtual void Insert(const void* val) = 0;
/// Add a new value, updating the current min/max when always false is true, or
/// only the current max otherwise.
virtual void InsertForLE(const void* val) = 0;
/// Add a new value which is 'val'-1, updating the current min/max when always false is
/// true, or only the current max otherwise.
virtual void InsertForLT(const void* val) = 0;
/// Add a new value, updating the current min/max when always false is true, or
/// only the current min otherwise.
virtual void InsertForGE(const void* val) = 0;
/// Add a new value which is 'val'-1, updating the current min/max when always false is
/// true, or only the current min otherwise.
virtual void InsertForGT(const void* val) = 0;
/// If true, this filter doesn't allow any rows to pass.
virtual bool AlwaysFalse() const = 0;
@@ -183,6 +200,10 @@ class MinMaxFilter {
const TColumnValue& data_max) override; \
virtual PrimitiveType type() const override; \
virtual void Insert(const void* val) override; \
virtual void InsertForLE(const void* val) override; \
virtual void InsertForLT(const void* val) override; \
virtual void InsertForGE(const void* val) override; \
virtual void InsertForGT(const void* val) override; \
bool AlwaysTrue() const override; \
virtual bool AlwaysFalse() const override { \
return min_ == std::numeric_limits<TYPE>::max() \
@@ -223,14 +244,28 @@ class StringMinMaxFilter : public MinMaxFilter {
always_false_(true) {}
StringMinMaxFilter(const MinMaxFilterPB& protobuf, MemTracker* mem_tracker);
virtual ~StringMinMaxFilter() {}
static const std::string min_string; // a string of 1 byte of 0x0.
static const std::string max_string; // a string of MAX_BOUND_LENGTH bytes of 0xff.
static const StringValue MIN_BOUND_STRING;
static const StringValue MAX_BOUND_STRING;
virtual void Close() override { mem_pool_.FreeAll(); }
virtual const void* GetMin() const override { return &min_; }
virtual const void* GetMax() const override { return &max_; }
virtual PrimitiveType type() const override;
virtual void Insert(const void* val) override;
// These four version of insert methods materialize the min_ and max_
// by making them pointing at min_buffer/max_buffer_.
virtual void InsertForLE(const void* val) override;
virtual void InsertForLT(const void* val) override;
virtual void InsertForGE(const void* val) override;
virtual void InsertForGT(const void* val) override;
bool AlwaysTrue() const override;
virtual bool AlwaysFalse() const override { return always_false_; }
bool EvalOverlap(
@@ -254,6 +289,18 @@ class StringMinMaxFilter : public MinMaxFilter {
protected:
virtual void SetAlwaysTrue() override;
// Update min_ and max_ to [value, MAX_BOUND_STRING]
void UpdateMin(const StringValue& value);
// Update min_ and max_ to [MIN_BOUND_STRING, value]
void UpdateMax(const StringValue& value);
// Perform the real work to materialize the min value to min_buffer_;
void MaterializeMinValue();
// Perform the real work to materialize the max value to max_buffer_;
void MaterializeMaxValue();
private:
/// Copies the contents of 'value' into 'buffer', up to 'len', and reassignes 'value' to
/// point to 'buffer'. If an oom is hit, disables the filter by setting 'always_true_'
@@ -291,6 +338,10 @@ class StringMinMaxFilter : public MinMaxFilter {
virtual const void* GetMax() const override { return &max_; } \
virtual PrimitiveType type() const override; \
virtual void Insert(const void* val) override; \
virtual void InsertForLE(const void* val) override; \
virtual void InsertForLT(const void* val) override; \
virtual void InsertForGE(const void* val) override; \
virtual void InsertForGT(const void* val) override; \
bool AlwaysTrue() const override; \
virtual bool AlwaysFalse() const override { return always_false_; } \
bool EvalOverlap( \
@@ -305,6 +356,10 @@ class StringMinMaxFilter : public MinMaxFilter {
static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out); \
static const char* LLVM_CLASS_NAME; \
\
private: \
void UpdateMin(const TYPE& val); \
void UpdateMax(const TYPE& val); \
\
private: \
TYPE min_; \
TYPE max_; \
@@ -363,9 +418,14 @@ class DecimalMinMaxFilter : public MinMaxFilter {
}
virtual void Insert(const void* val) override;
virtual void InsertForLE(const void* val) override;
virtual void InsertForLT(const void* val) override;
virtual void InsertForGE(const void* val) override;
virtual void InsertForGT(const void* val) override;
virtual PrimitiveType type() const override;
bool AlwaysTrue() const override;
virtual bool AlwaysFalse() const override { return always_false_; }
virtual void SetAlwaysFalse() { always_false_ = true; }
virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
virtual std::string DebugString() const override;
@@ -382,6 +442,30 @@ class DecimalMinMaxFilter : public MinMaxFilter {
void Insert8(const void* val);
void Insert16(const void* val);
void Insert4ForLE(const void* val);
void Insert8ForLE(const void* val);
void Insert16ForLE(const void* val);
void Insert4ForGE(const void* val);
void Insert8ForGE(const void* val);
void Insert16ForGE(const void* val);
void Insert4ForLT(const void* val);
void Insert8ForLT(const void* val);
void Insert16ForLT(const void* val);
void Insert4ForGT(const void* val);
void Insert8ForGT(const void* val);
void Insert16ForGT(const void* val);
void UpdateMin(const Decimal4Value&);
void UpdateMin(const Decimal8Value&);
void UpdateMin(const Decimal16Value&);
void UpdateMax(const Decimal4Value&);
void UpdateMax(const Decimal8Value&);
void UpdateMax(const Decimal16Value&);
/// Struct name in LLVM IR.
static const char* LLVM_CLASS_NAME;

View File

@@ -146,7 +146,7 @@ struct TRuntimeFilterDesc {
// Filter unique id (within a query)
1: required i32 filter_id
// Expr on which the filter is built on a hash join.
// Expr on which the filter is built on a hash or nested loop join.
2: required Exprs.TExpr src_expr
// List of targets for this runtime filter
@@ -177,12 +177,15 @@ struct TRuntimeFilterDesc {
// The type of runtime filter to build.
10: required TRuntimeFilterType type
// The comparison operator between targets and src_expr
11: required ExternalDataSource.TComparisonOp compareOp
// The size of the filter based on the ndv estimate and the min/max limit specified in
// the query options. Should be greater than zero for bloom filters, zero otherwise.
11: optional i64 filter_size_bytes
12: optional i64 filter_size_bytes
// The ID of the plan node that produces this filter.
12: optional Types.TPlanNodeId src_node_id
13: optional Types.TPlanNodeId src_node_id
}
// The information contained in subclasses of ScanNode captured in two separate

View File

@@ -77,6 +77,14 @@ public class BinaryPredicate extends Predicate {
public boolean isEquivalence() { return this == EQ || this == NOT_DISTINCT; }
public boolean isSqlEquivalence() { return this == EQ; }
/**
* Test if the operator specifies a single range.
**/
public boolean isSingleRange() {
return this == EQ || this == LE || this == GE || this == LT || this == GT;
}
public Operator converse() {
switch (this) {
case EQ: return EQ;

View File

@@ -78,6 +78,10 @@ public class InlineViewRef extends TableRef {
// Whether this is an inline view generated for table masking.
private boolean isTableMaskingView_ = false;
// Whether this is an inline view generated for a non-correlated scalar subquery
// returning at most one value.
private boolean isNonCorrelatedScalarSubquery_ = false;
// END: Members that need to be reset()
/////////////////////////////////////////
@@ -446,4 +450,11 @@ public class InlineViewRef extends TableRef {
}
return sql.toString();
}
public void setIsNonCorrelatedScalarSubquery() {
isNonCorrelatedScalarSubquery_ = true;
}
public boolean isNonCorrelatedScalarSubquery() {
return isNonCorrelatedScalarSubquery_;
}
}

View File

@@ -92,6 +92,11 @@ public abstract class Predicate extends Expr {
&& ((BinaryPredicate) expr).getOp().isSqlEquivalence();
}
public static boolean isSingleRangePredicate(Expr expr) {
return (expr instanceof BinaryPredicate)
&& ((BinaryPredicate) expr).getOp().isSingleRange();
}
/**
* If predicate is of the form "<slotref> = <slotref>", returns both SlotRefs,
* otherwise returns null.

View File

@@ -799,6 +799,9 @@ public class StmtRewriter {
stmt.whereClause_ =
CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause_);
inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
// Indicate this inline view returns at most one value through a
// non-correlated scalar subquery.
if (isScalarSubquery) inlineView.setIsNonCorrelatedScalarSubquery();
// Indicate that the CROSS JOIN may add a new visible tuple to stmt's
// select list (if the latter contains an unqualified star item '*')
return true;

View File

@@ -259,6 +259,14 @@ public class ScalarType extends Type {
return scale_;
}
public int storageBytesForDecimal() {
if (type_ != PrimitiveType.DECIMAL) return -1;
if (precision_ <= 9) return 4;
if (precision_ <= 18) return 8;
if (precision_ <= 38) return 16;
return -1;
}
@Override
public PrimitiveType getPrimitiveType() { return type_; }
public int ordinal() { return type_.ordinal(); }

View File

@@ -87,6 +87,9 @@ public class AggregationNode extends PlanNode {
// Peak memory is at least 16k, which is an empirical value
protected final static long MIN_PLAIN_AGG_MEM = 16L * 1024L;
// The output is from a non-correlated scalar subquery returning at most one value.
protected boolean isNonCorrelatedScalarSubquery_ = false;
public AggregationNode(
PlanNodeId id, PlanNode input, MultiAggregateInfo multiAggInfo, AggPhase aggPhase) {
super(id, "AGGREGATE");
@@ -108,6 +111,7 @@ public class AggregationNode extends PlanNode {
aggInfos_ = src.aggInfos_;
needsFinalize_ = src.needsFinalize_;
useIntermediateTuple_ = src.useIntermediateTuple_;
isNonCorrelatedScalarSubquery_ = src.isNonCorrelatedScalarSubquery_;
}
@Override
@@ -629,4 +633,12 @@ public class AggregationNode extends PlanNode {
}
return builder.build();
}
public void setIsNonCorrelatedScalarSubquery(boolean val) {
isNonCorrelatedScalarSubquery_ = val;
}
public boolean isNonCorrelatedScalarSubquery() {
return isNonCorrelatedScalarSubquery_;
}
}

View File

@@ -940,6 +940,10 @@ public class DistributedPlanner {
mergeFragment.getPlanRoot(), node.getMultiAggInfo(), AggPhase.FIRST_MERGE);
mergeAggNode.init(ctx_.getRootAnalyzer());
mergeAggNode.setLimit(limit);
// Carry the IsNonCorrelatedSclarSubquery_ flag to the merge node. This flag is
// applicable regardless of the partition scheme for the children since it is a
// logical property.
mergeAggNode.setIsNonCorrelatedScalarSubquery(node.isNonCorrelatedScalarSubquery());
// Merge of non-grouping agg only processes one tuple per Impala daemon - codegen
// will cost more than benefit.
if (!hasGrouping) {

View File

@@ -60,11 +60,12 @@ public class JoinBuildSink extends DataSink {
joinNode_ = joinNode;
Preconditions.checkNotNull(joinNode);
Preconditions.checkState(joinNode instanceof JoinNode);
if (!(joinNode instanceof HashJoinNode)) return;
for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) {
BinaryPredicate p = (BinaryPredicate) eqJoinConjunct;
// by convention the build exprs are the rhs of the join conjuncts
buildExprs_.add(p.getChild(1).clone());
if (joinNode instanceof HashJoinNode) {
for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) {
BinaryPredicate p = (BinaryPredicate) eqJoinConjunct;
// by convention the build exprs are the rhs of the join conjuncts
buildExprs_.add(p.getChild(1).clone());
}
}
runtimeFilters_.addAll(joinNode.getRuntimeFilters());
}

View File

@@ -38,6 +38,7 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TExecNodePhase;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TJoinDistributionMode;
import org.apache.impala.thrift.TJoinNode;
import org.apache.impala.thrift.TQueryOptions;
@@ -929,4 +930,11 @@ public abstract class JoinNode extends PlanNode {
*/
public abstract Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
TQueryOptions queryOptions);
/* Helper to return all predicates as a string. */
public String getAllPredicatesAsString(TExplainLevel level) {
return "Conjuncts=" + Expr.getExplainString(getConjuncts(), level)
+ ", EqJoinConjuncts=" + Expr.getExplainString(getEqJoinConjuncts(), level)
+ ", EqJoinConjuncts=" + Expr.getExplainString(getOtherJoinConjuncts(), level);
}
}

View File

@@ -116,6 +116,10 @@ public class NestedLoopJoinNode extends JoinNode {
output.append(detailPrefix + "predicates: ")
.append(Expr.getExplainString(conjuncts_, detailLevel) + "\n");
}
if (!runtimeFilters_.isEmpty()) {
output.append(detailPrefix + "runtime filters: ");
output.append(getRuntimeFilterExplainString(true, detailLevel));
}
}
return output.toString();
}

View File

@@ -47,6 +47,7 @@ import org.apache.impala.analysis.TupleIsNullPredicate;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.IdGenerator;
@@ -56,6 +57,7 @@ import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterDesc;
import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -335,6 +337,7 @@ public final class RuntimeFilterGenerator {
tFilter.setNdv_estimate(ndvEstimate_);
tFilter.setHas_local_targets(hasLocalTargets_);
tFilter.setHas_remote_targets(hasRemoteTargets_);
tFilter.setCompareOp(exprCmpOp_.getThriftOp());
boolean appliedOnPartitionColumns = true;
for (int i = 0; i < targets_.size(); ++i) {
RuntimeFilterTarget target = targets_.get(i);
@@ -361,18 +364,61 @@ public final class RuntimeFilterGenerator {
Preconditions.checkNotNull(idGen);
Preconditions.checkNotNull(joinPredicate);
Preconditions.checkNotNull(filterSrcNode);
// Only consider binary equality predicates
if (type == TRuntimeFilterType.BLOOM
&& !Predicate.isEquivalencePredicate(joinPredicate)) {
return null;
// Only consider binary equality predicates under hash joins
if (type == TRuntimeFilterType.BLOOM) {
if (!Predicate.isEquivalencePredicate(joinPredicate)
|| filterSrcNode instanceof NestedLoopJoinNode) {
return null;
}
}
if (type == TRuntimeFilterType.MIN_MAX
&& !Predicate.isSqlEquivalencePredicate(joinPredicate)) {
return null;
}
BinaryPredicate normalizedJoinConjunct = SingleNodePlanner.getNormalizedEqPred(
BinaryPredicate normalizedJoinConjunct = null;
if (type == TRuntimeFilterType.MIN_MAX) {
if (filterSrcNode instanceof HashJoinNode) {
if (!Predicate.isSqlEquivalencePredicate(joinPredicate)) {
return null;
} else {
normalizedJoinConjunct = SingleNodePlanner.getNormalizedEqPred(joinPredicate,
filterSrcNode.getChild(0).getTupleIds(),
filterSrcNode.getChild(1).getTupleIds(), analyzer);
}
} else if (filterSrcNode instanceof NestedLoopJoinNode) {
if (Predicate.isSingleRangePredicate(joinPredicate)) {
PlanNode child1 = filterSrcNode.getChild(1);
if (child1 instanceof ExchangeNode) {
child1 = child1.getChild(0);
}
// When immediate or the indirect child below an Exchange is an
// AggregationNode that implements a non-correlated scalar subquery
// returning at most one value, a min/max filter can be generated.
if (!(child1 instanceof AggregationNode)
|| !((AggregationNode) (child1)).isNonCorrelatedScalarSubquery()) {
return null;
}
} else {
return null;
}
normalizedJoinConjunct = SingleNodePlanner.getNormalizedSingleRangePred(
joinPredicate, filterSrcNode.getChild(0).getTupleIds(),
filterSrcNode.getChild(1).getTupleIds(), analyzer);
if (normalizedJoinConjunct != null) {
// MinMaxFilter can't handle range predicates with decimals stored
// in __int128_t for the following reason:
// 1) Both numeric_limits<__int128_t>::min() and
// numeric_limits<__int128_t>::max() return 0.
if (normalizedJoinConjunct.getChild(0).getType().isDecimal()) {
ScalarType decimalType =
(ScalarType) (normalizedJoinConjunct.getChild(0).getType());
if (decimalType.storageBytesForDecimal() == 16) return null;
}
}
}
} else {
normalizedJoinConjunct = SingleNodePlanner.getNormalizedEqPred(
joinPredicate, filterSrcNode.getChild(0).getTupleIds(),
filterSrcNode.getChild(1).getTupleIds(), analyzer);
}
if (normalizedJoinConjunct == null) return null;
// Ensure that the target expr does not contain TupleIsNull predicates as these
@@ -623,6 +669,7 @@ public final class RuntimeFilterGenerator {
public String debugString() {
StringBuilder output = new StringBuilder();
return output.append("FilterID: " + id_ + " ")
.append("Type: " + type_ + " ")
.append("Source: " + src_.getId() + " ")
.append("SrcExpr: " + getSrcExpr().debugString() + " ")
.append("Target(s): ")
@@ -727,8 +774,8 @@ public final class RuntimeFilterGenerator {
* (scan) nodes. Filters that cannot be assigned to a scan node are discarded.
*/
private void generateFilters(PlannerContext ctx, PlanNode root) {
if (root instanceof HashJoinNode) {
HashJoinNode joinNode = (HashJoinNode) root;
if (root instanceof HashJoinNode || root instanceof NestedLoopJoinNode) {
JoinNode joinNode = (JoinNode) root;
List<Expr> joinConjuncts = new ArrayList<>();
if (!joinNode.getJoinOp().isLeftOuterJoin()
&& !joinNode.getJoinOp().isFullOuterJoin()

View File

@@ -1244,6 +1244,13 @@ public class SingleNodePlanner {
rootNode = addUnassignedConjuncts(
analyzer, inlineViewRef.getDesc().getId().asList(), rootNode);
}
// Transfer whether the inline view is for a non-correlated subquery returning at
// most one value to the root node when it is an AggregationNode.
if (rootNode instanceof AggregationNode) {
((AggregationNode) rootNode)
.setIsNonCorrelatedScalarSubquery(
inlineViewRef.isNonCorrelatedScalarSubquery());
}
return rootNode;
}
@@ -1923,6 +1930,28 @@ public class SingleNodePlanner {
return result;
}
/**
* Similar to getNormalizedEqPred(), except returns a normalized version of a binary
* single range predicate 'expr'. A single range predicate is defined as one with
* pred.getOp() being =, <, <=, > or >=.
*/
public static BinaryPredicate getNormalizedSingleRangePred(
Expr expr, List<TupleId> lhsTids, List<TupleId> rhsTids, Analyzer analyzer) {
if (!(expr instanceof BinaryPredicate)) return null;
BinaryPredicate pred = (BinaryPredicate) expr;
if (!pred.getOp().isSingleRange()) return null;
if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null;
Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids);
Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids);
if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null;
BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr);
result.analyzeNoThrow(analyzer);
return result;
}
/**
* Creates a new node to join outer with inner. Collects and assigns join conjunct
* as well as regular conjuncts. Calls init() on the new join node.

View File

@@ -4556,7 +4556,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
// Also check TableRefs.
testNumberOfMembers(TableRef.class, 26);
testNumberOfMembers(BaseTableRef.class, 0);
testNumberOfMembers(InlineViewRef.class, 9);
testNumberOfMembers(InlineViewRef.class, 10);
}
@SuppressWarnings("rawtypes")

View File

@@ -1332,7 +1332,7 @@ PLAN-ROOT SINK
07:HASH JOIN [LEFT SEMI JOIN]
| hash predicates: t.bigint_col = g.bigint_col
| other join predicates: a.int_col < min(int_col)
| runtime filters: RF000 <- g.bigint_col
| runtime filters: RF001 <- g.bigint_col
| row-size=184B cardinality=781
|
|--03:AGGREGATE [FINALIZE]
@@ -1347,18 +1347,18 @@ PLAN-ROOT SINK
|
06:HASH JOIN [INNER JOIN]
| hash predicates: a.id = t.id
| runtime filters: RF002 <- t.id
| runtime filters: RF003 <- t.id
| row-size=184B cardinality=3.91K
|
|--01:SCAN HDFS [functional.alltypes t]
| HDFS partitions=24/24 files=24 size=478.45KB
| predicates: t.bool_col = FALSE
| runtime filters: RF000 -> t.bigint_col
| runtime filters: RF001 -> t.bigint_col
| row-size=89B cardinality=3.65K
|
00:SCAN HDFS [functional.alltypesagg a]
HDFS partitions=11/11 files=11 size=814.73KB
runtime filters: RF002 -> a.id
runtime filters: RF003 -> a.id
row-size=95B cardinality=11.00K
====
# Multiple nesting levels with aggregation subqueries

View File

@@ -331,3 +331,142 @@ where a.l_orderkey = b.o_orderkey and b.o_custkey = -5;
---- RUNTIME_PROFILE
row_regex: .*1.+0 \(\d+\).+true.+MIN_MAX\s+AlwaysFalse\s+AlwaysFalse.*
====
---- QUERY
# Positive tests to check out the explain output involving a non-correlated one-row
# sub-query. Expect to see a min/max filter at the nested join for an inequality
# predicate between a column and the subquery.
# DECIMAL data type.
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from tpcds_parquet.store_sales
where ss_sales_price < (select min(ss_wholesale_cost) from tpcds_parquet.store_sales);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- min\(ss_wholesale_cost\).*
row_regex:.* RF000\[min_max\] -> ss_sales_price.*
====
---- QUERY
# INT data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where id < (select min(id) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- min\(id\).*
row_regex:.* RF000\[min_max\] -> id.*
====
---- QUERY
# Bool data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where bool_col <= (select max(bool_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- max\(bool_col\).*
row_regex:.* RF000\[min_max\] -> bool_col.*
====
---- QUERY
# Tiny INT data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where tinyint_col > (select max(tinyint_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- max\(tinyint_col\).*
row_regex:.* RF000\[min_max\] -> tinyint_col.*
====
---- QUERY
# FLOAT data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where float_col >= (select max(float_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- max\(float_col\).*
row_regex:.* RF000\[min_max\] -> float_col.*
====
---- QUERY
# Timestamp data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where timestamp_col < (select min(timestamp_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- min\(timestamp_col\).*
row_regex:.* RF000\[min_max\] -> timestamp_col.*
====
---- QUERY
# Date data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.date_tbl
where date_col < (select min(date_col) from functional_parquet.date_tbl);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- min\(date_col\).*
row_regex:.* RF000\[min_max\] -> date_col.*
====
---- QUERY
# String data type
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where string_col < (select min(string_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- min\(string_col\).*
row_regex:.* RF000\[min_max\] -> string_col.*
====
---- QUERY
# An equal predicate that gets translated into a hash join with bloom and min/max
# filters.
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes
where string_col = (select min(string_col) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF001\[min_max\] <- min\(string_col\).*
row_regex:.* RF001\[min_max\] -> string_col.*
====
---- QUERY
# Two subqueries that get translated into two min/max filters.
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.alltypes where
id > (select min(id) from functional_parquet.alltypes) and
id < (select max(id) from functional_parquet.alltypes);
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* RF000\[min_max\] <- max\(id\).*
row_regex:.* RF001\[min_max\] <- min\(id\).*
====
---- QUERY
# Negative tests to check out the explain output involving a non-correlated one-row
# sub-query. Expect to observe no min/max filters.
#
# Decimal data type with 16-byte storage (__int128_t) is not supported as
# numeric_limits<__int128_t>::min() and numeric_limits<__int128_t>::max() return 0.
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from functional_parquet.decimal_tbl
where d3 < (select min(d3) from functional_parquet.decimal_tbl);
---- RESULTS: VERIFY_IS_NOT_IN
row_regex:.*min_max.*
====
---- QUERY
# Uncorrelated non-aggregate subquery produces no min/max filters.
set minmax_filtering_level=page;
set minmax_filter_threshold=0.9;
set explain_level=3;
explain select count(*) from tpcds_parquet.store_sales
where ss_sales_price < (select ss_wholesale_cost from tpcds_parquet.store_sales)
---- RESULTS: VERIFY_IS_NOT_IN
row_regex:.*min_max.*
====

View File

@@ -59,3 +59,15 @@ where a.month= b.int_col;
aggregation(SUM, Files processed): 24
aggregation(SUM, Files rejected): 6
====
---- QUERY
# Setup a join query with min/max filter on the partition column 'month' again.
# The filter built from the subquery is [0, 2].
set runtime_filter_wait_time_ms=$RUNTIME_FILTER_WAIT_TIME_MS;
select count(*) from functional_parquet.alltypes where month <=
(select max(int_col) from functional_parquet.alltypes where int_col <= 2);
---- RESULTS
1180
---- RUNTIME_PROFILE
aggregation(SUM, Files processed): 24
aggregation(SUM, Files rejected): 20
====

View File

@@ -332,7 +332,7 @@ row_regex:.* RF001\[min_max\] -. .\.ss_cdemo_sk.*
---- QUERY
###################################################
# Run a query that demonstrates the min/max filter
# helps reduces # of pages:
# helps reduce the number of pages:
# sum(NumRuntimeFilteredPages) = 28
###################################################
set minmax_filtering_level=page;
@@ -347,3 +347,24 @@ where i_manufact_id = 1 and i_current_price < 1.0;
---- RUNTIME_PROFILE
aggregation(SUM, NumRuntimeFilteredPages): 28
====
---- QUERY
###################################################
# Run a query with inequality predicate involving
# a non-correlated subquery that demonstrates the
# min/max filter helps reduce the number of row
# groups and pages.
# sum(NumRuntimeFilteredRowGroups) = 2
# sum(NumRuntimeFilteredPages) > 200
###################################################
SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
set mt_dop=2;
select count(*) from lineitem_orderkey_partkey_only
where l_orderkey < (select min(l_partkey) from
lineitem_orderkey_partkey_only where l_orderkey < 3000);
---- RESULTS
92
---- RUNTIME_PROFILE
aggregation(SUM, NumRuntimeFilteredRowGroups): 2
aggregation(SUM, NumRuntimeFilteredPages)> 200
row_regex:.*NESTED LOOP JOIN.*
====