IMPALA-9725: incorrect spilling join results for wide keys

The control flow was broken if the join operator hit
the end of the expression values cache before the end
of the probe batch, immediately after processing a row
for a spilled partition. In NextProbeRow(), the problematic
code path was:
* The last row in the expression values cache was for a
  spilled partition, so skip_row=true and it falls out
  of the loop with 'current_probe_row_' pointing to that
  row.
* probe_batch_iterator->AtEnd() is false, because
  the expression value cache is smaller than the probe batch,
  so 'current_probe_row_' is not nulled out.

Thus we end up in a state where 'current_probe_row_' is
set, but 'hash_table_iterator_' is unset.

In the case of a left anti join, this was interpreted by
ProcessProbeRowLeftSemiJoins() as meaning that there was
no hash table match for 'current_probe_row_', and it
therefore returned the row.

This bug could only occur under specific circumstances:
* The join key takes up > 256 bytes in the expression values
  cache (assuming the default batch size of 1024).
* The join spilled.
* The join operator returns rows that were unmatched in
  the right input, i.e. LEFT OUTER JOIN, LEFT ANTI JOIN,
  FULL OUTER JOIN.

The core of the fix is to null out 'current_probe_row_' when
falling out the bottom of the loop in NextProbeRow(). Related
DCHECKS were fixed and some control flow was slightly
simplified.

Testing:
Added a test query on TPC-H that reproduces the problem reliably.

Ran exhaustive tests.

Change-Id: I9d7e5871c35a90e8cf24b8dded04775ee1eae9d8
Reviewed-on: http://gerrit.cloudera.org:8080/15904
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
(cherry picked from commit fcf08d1822)
This commit is contained in:
Tim Armstrong
2020-05-11 10:31:31 -07:00
committed by stiga-huang
parent f5988190e6
commit b71187dbf9
3 changed files with 68 additions and 10 deletions

View File

@@ -331,10 +331,8 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
DCHECK(status->ok());
return true;
}
if (probe_batch_iterator->AtEnd()) {
// No more probe row.
current_probe_row_ = NULL;
}
// We finished processing the cached values - there is no current probe row.
current_probe_row_ = nullptr;
return false;
}
@@ -383,12 +381,12 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
// Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
int remaining_capacity = max_rows;
bool has_probe_rows = current_probe_row_ != NULL || !probe_batch_iterator.AtEnd();
HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
// Keep processing more probe rows if there are more to process and the output batch
// has room and we haven't hit any error yet.
while (has_probe_rows && remaining_capacity > 0 && status->ok()) {
while ((current_probe_row_ != nullptr || !probe_batch_iterator.AtEnd())
&& remaining_capacity > 0 && status->ok()) {
// Prefetch for the current hash_tbl_iterator_.
if (prefetch_mode != TPrefetchMode::NONE) {
hash_tbl_iterator_.PrefetchBucket<true>();
@@ -420,9 +418,10 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
DCHECK(status->ok());
} while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
status));
// Update whether there are more probe rows to process in the current batch.
has_probe_rows = current_probe_row_ != NULL;
if (!has_probe_rows) DCHECK(probe_batch_iterator.AtEnd());
// NextProbeRow() returns false either when it exhausts its input or hits
// an error. Otherwise we must have filled up the output batch.
DCHECK((ht_ctx->expr_values_cache()->AtEnd() && current_probe_row_ == nullptr)
|| !status->ok() || remaining_capacity == 0);
// Update where we are in the probe batch.
probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
probe_batch_->num_tuples_per_row();

View File

@@ -572,7 +572,8 @@ Status PartitionedHashJoinNode::GetNext(
case ProbeState::PROBING_IN_BATCH: {
// Finish processing rows in the current probe batch.
RETURN_IF_ERROR(ProcessProbeBatch(out_batch));
DCHECK(out_batch->AtCapacity() || probe_batch_pos_ == probe_batch_->num_rows());
DCHECK(out_batch->AtCapacity() || probe_batch_pos_ == probe_batch_->num_rows()
|| ht_ctx_->expr_values_cache()->AtEnd());
if (probe_batch_pos_ == probe_batch_->num_rows()
&& current_probe_row_ == nullptr) {
probe_state_ = ProbeState::PROBING_END_BATCH;

View File

@@ -308,3 +308,61 @@ order by l1.l_orderkey desc, l1.l_linenumber desc limit 10
---- CATCH
Repartitioning did not reduce the size of a spilled partition
====
---- QUERY
# IMPALA-9725: incorrect results for spilling joins with very wide join keys.
# This test is crafted to always return 0 rows, but with the previous bug
# would sometimes return rows while spilling. The query needs to have a join
# key that is more than 256 bytes wide to trigger the bug.
set buffer_pool_limit=110m;
set runtime_filter_mode=0;
SELECT straight_join o_orderkey
FROM (
SELECT *
FROM orders
JOIN customer ON o_custkey = c_custkey
JOIN nation ON c_nationkey = n_nationkey
JOIN region ON n_regionkey = r_regionkey
WHERE o_orderkey < 500000) o1
LEFT ANTI JOIN /*+broadcast*/ (
SELECT *
FROM orders
JOIN customer ON o_custkey = c_custkey
JOIN nation ON c_nationkey = n_nationkey
JOIN region ON n_regionkey = r_regionkey
WHERE o_orderkey < 500000) o2 ON o1.o_orderkey = o2.o_orderkey
AND o1.o_custkey = o2.o_custkey
AND o1.o_orderstatus = o2.o_orderstatus
AND o1.o_totalprice = o2.o_totalprice
AND o1.o_orderdate = o2.o_orderdate
AND o1.o_orderpriority = o2.o_orderpriority
AND o1.o_clerk = o2.o_clerk
AND o1.o_shippriority = o2.o_shippriority
AND o1.o_comment = o2.o_comment
AND o1.c_custkey = o2.c_custkey
AND o1.c_name = o2.c_name
AND o1.c_address = o2.c_address
AND o1.c_nationkey = o2.c_nationkey
AND o1.c_phone = o2.c_phone
AND o1.c_acctbal = o2.c_acctbal
AND o1.c_mktsegment = o2.c_mktsegment
AND o1.n_nationkey = o2.n_nationkey
AND o1.n_name = o2.n_name
AND o1.n_regionkey = o2.n_regionkey
AND o1.n_comment = o2.n_comment
AND o1.r_name = o2.r_name
AND o1.r_comment = o2.r_comment
AND fnv_hash(o1.n_name) = fnv_hash(o2.n_name)
AND fnv_hash(o1.r_name) = fnv_hash(o2.r_name)
AND fnv_hash(o1.o_orderstatus) = fnv_hash(o2.o_orderstatus)
AND fnv_hash(o1.o_shippriority) = fnv_hash(o2.o_shippriority)
AND fnv_hash(o1.o_orderdate) = fnv_hash(o2.o_orderdate)
AND fnv_hash(o1.o_orderpriority) = fnv_hash(o2.o_orderpriority)
AND fnv_hash(o1.o_clerk) = fnv_hash(o2.o_clerk)
ORDER BY o_orderkey
---- RESULTS
---- TYPES
BIGINT
---- RUNTIME_PROFILE
# Verify that at least one of the joins was spilled.
row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
====