IMPALA-11740: Incorrect results for partitioned Iceberg V2 tables when runtime filters are applied

If an Iceberg V2 table is partitioned, and contains delete files,
then in a query that involves runtime filters on the partition columns
return empty result set.

E.g.:

  select count(*)
  from store_sales, date_dim
  where d_date_sk = ss_sold_date_sk and d_moy=2 and d_year=1998;

In the above query store_sales is partitioned by ss_sold_date_sk which
will be filtered by runtime filters created by the JOIN. If store_sales
has delete files then the above query returns empty result set.

The problem is that we are invoking PartitionPassesFilters() on these
Iceberg tables. It is usually a no-op for Iceberg tables, as the
template tuple is NULL. But when we have virtual columns a template
tuple has been created in HdfsScanPlanNode::InitTemplateTuple. For
Iceberg tables this tempalte tuple is incomplete, i.e. it doesn't
have the partition values set. This means the filters evaluate to
false and the files are getting filtered out, hence the query
produces an empty result set.

With this patch we don't invoke PartitionPassesFilters() on Iceberg
tables, only the Iceberg-specific IcebergPartitionPassesFilters()
gets invoked. Also added DCHECKs to ensure this.

Testing:
 * e2e tests added

Change-Id: I43f3e0a4df7c1ba6d8ea61410b570d8cf7b31ad3
Reviewed-on: http://gerrit.cloudera.org:8080/19274
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2022-11-23 16:06:28 +01:00
committed by Impala Public Jenkins
parent 16190b4f77
commit d3c3ae41c4
5 changed files with 82 additions and 18 deletions

View File

@@ -739,14 +739,13 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(RuntimeState* state, HdfsFileD
if (filter_ctxs_.size() == 0) return true;
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
filter_ctxs)) {
return false;
} else if (hdfs_table_->IsIcebergTable() && !IcebergPartitionPassesFilters(
metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state)) {
return false;
if (hdfs_table_->IsIcebergTable()) {
return IcebergPartitionPassesFilters(
metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs, file, state);
} else {
return PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
filter_ctxs);
}
return true;
}
void HdfsScanNodeBase::SkipScanRange(io::ScanRange* scan_range) {
@@ -781,7 +780,8 @@ Status HdfsScanNodeBase::StartNextScanRange(const std::vector<FilterContext>& fi
if (filter_ctxs.size() > 0) {
int64_t partition_id =
static_cast<ScanRangeMetadata*>((*scan_range)->meta_data())->partition_id;
if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
if (!hdfs_table()->IsIcebergTable() &&
!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
SkipScanRange(*scan_range);
*scan_range = nullptr;
}
@@ -967,6 +967,7 @@ void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
const string& stats_name, const vector<FilterContext>& filter_ctxs) {
DCHECK(!hdfs_table()->IsIcebergTable());
if (filter_ctxs.empty()) return true;
if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return false;
DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
@@ -992,6 +993,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
bool HdfsScanNodeBase::IcebergPartitionPassesFilters(int64_t partition_id,
const string& stats_name, const vector<FilterContext>& filter_ctxs,
HdfsFileDesc* file, RuntimeState* state) {
DCHECK(hdfs_table()->IsIcebergTable());
file_metadata_utils_.SetFile(state, file);
// Create the template tuple based on file metadata
std::map<const SlotId, const SlotDescriptor*> slot_descs_written;

View File

@@ -840,11 +840,13 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
// Apply any runtime filters to static tuples containing the partition keys for this
// partition. If any filter fails, we return immediately and stop processing this
// scan range.
if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
if (!scan_node_->hdfs_table()->IsIcebergTable()) {
if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
}
}
assemble_rows_timer_.Start();
Status status = AssembleRows(row_batch);

View File

@@ -470,11 +470,13 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
// Apply any runtime filters to static tuples containing the partition keys for this
// partition. If any filter fails, we return immediately and stop processing this
// scan range.
if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
if (!scan_node_->hdfs_table()->IsIcebergTable()) {
if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
}
}
assemble_rows_timer_.Start();
Status status;

View File

@@ -327,3 +327,31 @@ select * from v where ii > 1003;
---- TYPES
BIGINT, STRING
====
---- QUERY
select *
from functional_parquet.iceberg_v2_partitioned_position_deletes_orc a,
functional_parquet.iceberg_partitioned_orc_external b
where a.action = b.action and b.id=3;
---- RESULTS
12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click'
---- TYPES
INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING
====
---- QUERY
select a.input__file__name, a.*
from iceberg_partitioned_orc_external a,
iceberg_partitioned_orc_external b
where a.id = b.id and a.action = b.action and b.user = 'Lisa'
order by a.id;
---- RESULTS
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',2,'Lisa','download'
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',5,'Lisa','download'
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',7,'Lisa','download'
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',8,'Lisa','download'
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',14,'Lisa','download'
regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc/data/action=download/.*orc',16,'Lisa','download'
---- TYPES
STRING, INT, STRING, STRING
====

View File

@@ -530,3 +530,33 @@ select * from v where ii > 1003;
---- TYPES
BIGINT, STRING
====
---- QUERY
SET TIMEZONE='Europe/Budapest';
select *
from functional_parquet.iceberg_v2_partitioned_position_deletes a,
functional_parquet.iceberg_partitioned b
where a.action = b.action and b.id=3;
---- RESULTS
12,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
10,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
18,'Alan','click',2020-01-01 10:00:00,3,'Alan','click',2020-01-01 10:00:00
---- TYPES
INT, STRING, STRING, TIMESTAMP, INT, STRING, STRING, TIMESTAMP
====
---- QUERY
SET TIMEZONE='Europe/Budapest';
select a.input__file__name, a.*
from iceberg_partitioned a,
iceberg_partitioned b
where a.id = b.id and a.action = b.action and b.user = 'Lisa'
order by a.id;
---- RESULTS
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',2,'Lisa','download',2020-01-01 11:00:00
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',5,'Lisa','download',2020-01-01 11:00:00
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',7,'Lisa','download',2020-01-01 11:00:00
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',8,'Lisa','download',2020-01-01 11:00:00
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',14,'Lisa','download',2020-01-01 11:00:00
regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time_hour=2020-01-01-10/action=download/.*parquet',16,'Lisa','download',2020-01-01 11:00:00
---- TYPES
STRING, INT, STRING, STRING, TIMESTAMP
====