IMPALA-13674: Enable MERGE statement for Iceberg tables with equality deletes

This change fixes the delete expression calculation for
IcebergMergeImpl, when an Iceberg table contains equality deletes, the
merge implementation now includes the data sequence number in the result
expressions as the underlying tuple descriptor also includes it
implicitly. Without including this field, the row evaluation fails
because of the mismatching number of evaluators and slot descriptors.

Tests:
 - manually validated on an Iceberg table that contains equality delete
 - e2e test added

Change-Id: I60e48e2731a59520373dbb75104d75aae39a94c1
Reviewed-on: http://gerrit.cloudera.org:8080/22423
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Peter Rozsa
2025-01-20 11:05:08 +01:00
committed by Impala Public Jenkins
parent 1164eb5626
commit 167ced7844
11 changed files with 128 additions and 63 deletions

View File

@@ -50,8 +50,8 @@ Status IcebergMergePlanNode::Init(const TPlanNode& tnode, FragmentState* state)
RETURN_IF_ERROR(ScalarExpr::Create(
tnode.merge_node.row_present, *row_descriptor_, state, pool, &row_present_));
RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.position_meta_exprs,
*row_descriptor_, state, pool, &position_meta_exprs_));
RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.delete_meta_exprs,
*row_descriptor_, state, pool, &delete_meta_exprs_));
RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.partition_meta_exprs,
*row_descriptor_, state, pool, &partition_meta_exprs_));
@@ -99,7 +99,7 @@ IcebergMergeNode::IcebergMergeNode(
child_row_idx_(0),
child_eos_(false),
row_present_(pnode.row_present_),
position_meta_exprs_(pnode.position_meta_exprs_),
delete_meta_exprs_(pnode.delete_meta_exprs_),
partition_meta_exprs_(pnode.partition_meta_exprs_) {
DCHECK(pnode.merge_action_tuple_id_ != -1);
DCHECK(pnode.target_tuple_id_ != -1);
@@ -132,8 +132,8 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) {
expr_perm_pool_.get(), expr_results_pool_.get(), &row_present_evaluator_));
RETURN_IF_ERROR(
ScalarExprEvaluator::Create(position_meta_exprs_, state, state->obj_pool(),
expr_perm_pool_.get(), expr_results_pool_.get(), &position_meta_evaluators_));
ScalarExprEvaluator::Create(delete_meta_exprs_, state, state->obj_pool(),
expr_perm_pool_.get(), expr_results_pool_.get(), &delete_meta_evaluators_));
RETURN_IF_ERROR(
ScalarExprEvaluator::Create(partition_meta_exprs_, state, state->obj_pool(),
@@ -155,7 +155,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) {
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
RETURN_IF_ERROR(row_present_evaluator_->Open(state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(delete_meta_evaluators_, state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, state));
for (auto* merge_case : all_cases_) {
@@ -293,18 +293,18 @@ void IcebergMergeNode::Close(RuntimeState* state) {
merge_case->Close(state);
}
row_present_evaluator_->Close(state);
ScalarExprEvaluator::Close(position_meta_evaluators_, state);
ScalarExprEvaluator::Close(delete_meta_evaluators_, state);
ScalarExprEvaluator::Close(partition_meta_evaluators_, state);
row_present_->Close();
ScalarExpr::Close(position_meta_exprs_);
ScalarExpr::Close(delete_meta_exprs_);
ScalarExpr::Close(partition_meta_exprs_);
ExecNode::Close(state);
}
const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PositionMetaEvals() {
return position_meta_evaluators_;
const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::DeleteMetaEvals() {
return delete_meta_evaluators_;
}
const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PartitionMetaEvals() {
@@ -328,7 +328,7 @@ Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& parent)
combined_evaluators_.insert(
combined_evaluators_.end(), output_evaluators_.begin(), output_evaluators_.end());
combined_evaluators_.insert(combined_evaluators_.end(),
parent.PositionMetaEvals().begin(), parent.PositionMetaEvals().end());
parent.DeleteMetaEvals().begin(), parent.DeleteMetaEvals().end());
combined_evaluators_.insert(combined_evaluators_.end(),
parent.PartitionMetaEvals().begin(), parent.PartitionMetaEvals().end());
return Status::OK();

View File

@@ -61,8 +61,8 @@ class IcebergMergePlanNode : public PlanNode {
/// target tuple, the source tuple, or both.
ScalarExpr* row_present_ = nullptr;
/// Exprs used to identify the position of each target record
std::vector<ScalarExpr*> position_meta_exprs_;
/// Exprs used to identify the position/delete information of each target record
std::vector<ScalarExpr*> delete_meta_exprs_;
/// Exprs used to identify the partitioning properties of a record
std::vector<ScalarExpr*> partition_meta_exprs_;
@@ -91,7 +91,7 @@ class IcebergMergeNode : public ExecNode {
Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
Status Reset(RuntimeState* state, RowBatch* row_batch) override;
void Close(RuntimeState* state) override;
const std::vector<ScalarExprEvaluator*>& PositionMetaEvals();
const std::vector<ScalarExprEvaluator*>& DeleteMetaEvals();
const std::vector<ScalarExprEvaluator*>& PartitionMetaEvals();
private:
@@ -119,9 +119,9 @@ class IcebergMergeNode : public ExecNode {
int child_row_idx_;
bool child_eos_;
ScalarExpr* row_present_;
std::vector<ScalarExpr*> position_meta_exprs_;
std::vector<ScalarExpr*> delete_meta_exprs_;
std::vector<ScalarExpr*> partition_meta_exprs_;
std::vector<ScalarExprEvaluator*> position_meta_evaluators_;
std::vector<ScalarExprEvaluator*> delete_meta_evaluators_;
std::vector<ScalarExprEvaluator*> partition_meta_evaluators_;
ScalarExprEvaluator* row_present_evaluator_;

View File

@@ -758,7 +758,7 @@ struct TIcebergMergeCase {
struct TIcebergMergeNode {
1: required list<TIcebergMergeCase> cases
2: required Exprs.TExpr row_present
3: required list<Exprs.TExpr> position_meta_exprs
3: required list<Exprs.TExpr> delete_meta_exprs
4: required list<Exprs.TExpr> partition_meta_exprs
5: required Types.TTupleId merge_action_tuple_id
6: required Types.TTupleId target_tuple_id

View File

@@ -80,7 +80,7 @@ public class IcebergMergeImpl implements MergeImpl {
private TupleId targetTupleId_;
private List<Expr> targetExpressions_;
private List<Expr> targetPositionMetaExpressions_;
private List<Expr> targetDeleteMetaExpressions_;
private List<Expr> targetPartitionMetaExpressions_;
private List<Expr> targetPartitionExpressions_;
private MergeSorting targetSorting_;
@@ -94,7 +94,7 @@ public class IcebergMergeImpl implements MergeImpl {
on_ = on;
table_ = targetTableRef_.getTable();
targetExpressions_ = Lists.newArrayList();
targetPositionMetaExpressions_ = Lists.newArrayList();
targetDeleteMetaExpressions_ = Lists.newArrayList();
targetPartitionMetaExpressions_ = Lists.newArrayList();
targetPartitionExpressions_ = Lists.newArrayList();
}
@@ -127,12 +127,6 @@ public class IcebergMergeImpl implements MergeImpl {
"Unsupported '%s': '%s' for Iceberg table: %s",
TableProperties.MERGE_MODE, modifyWriteMode, icebergTable_.getFullName()));
}
if (!icebergTable_.getContentFileStore().getEqualityDeleteFiles().isEmpty()) {
throw new AnalysisException(
"MERGE statement is not supported for Iceberg tables "
+ "containing equality deletes.");
//TODO: IMPALA-13674
}
for (Column column : icebergTable_.getColumns()) {
Path slotPath =
new Path(targetTableRef_.desc_, Collections.singletonList(column.getName()));
@@ -205,8 +199,8 @@ public class IcebergMergeImpl implements MergeImpl {
targetExpressions_ = Expr.substituteList(targetExpressions_, smap, analyzer, true);
targetPartitionMetaExpressions_ =
Expr.substituteList(targetPartitionMetaExpressions_, smap, analyzer, true);
targetPositionMetaExpressions_ =
Expr.substituteList(targetPositionMetaExpressions_, smap, analyzer, true);
targetDeleteMetaExpressions_ =
Expr.substituteList(targetDeleteMetaExpressions_, smap, analyzer, true);
mergeActionExpression_ = mergeActionExpression_.substitute(smap, analyzer, true);
targetPartitionExpressions_ =
Expr.substituteList(targetPartitionExpressions_, smap, analyzer, true);
@@ -218,7 +212,7 @@ public class IcebergMergeImpl implements MergeImpl {
@Override
public List<Expr> getResultExprs() {
List<Expr> result = Lists.newArrayList(targetExpressions_);
result.addAll(targetPositionMetaExpressions_);
result.addAll(targetDeleteMetaExpressions_);
result.addAll(targetPartitionMetaExpressions_);
result.add(mergeActionExpression_);
return result;
@@ -243,10 +237,10 @@ public class IcebergMergeImpl implements MergeImpl {
// the sink and the merge node differs.
List<MergeCase> copyOfCases =
mergeStmt_.getCases().stream().map(MergeCase::clone).collect(Collectors.toList());
List<Expr> positionMetaExprs = Expr.cloneList(targetPositionMetaExpressions_);
List<Expr> deleteMetaExprs = Expr.cloneList(targetDeleteMetaExpressions_);
List<Expr> partitionMetaExprs = Expr.cloneList(targetPartitionMetaExpressions_);
IcebergMergeNode mergeNode = new IcebergMergeNode(ctx.getNextNodeId(), child,
copyOfCases, rowPresentExpression_.clone(), positionMetaExprs, partitionMetaExprs,
copyOfCases, rowPresentExpression_.clone(), deleteMetaExprs, partitionMetaExprs,
mergeActionTuple_, targetTupleId_);
mergeNode.init(analyzer);
return mergeNode;
@@ -279,7 +273,7 @@ public class IcebergMergeImpl implements MergeImpl {
deletePartitionKeys = targetPartitionMetaExpressions_;
}
return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_,
deletePartitionKeys, targetPositionMetaExpressions_, deleteTableId_);
deletePartitionKeys, targetDeleteMetaExpressions_, deleteTableId_);
}
public TableSink createInsertSink() {
@@ -335,27 +329,34 @@ public class IcebergMergeImpl implements MergeImpl {
VirtualColumn.ICEBERG_PARTITION_SERIALIZED.getName())));
}
List<Expr> positionMetaExpressions;
List<Expr> deleteMetaExpressions = Lists.newArrayList();
if (mergeStmt_.hasOnlyInsertCases() && icebergTable_.getContentFileStore()
.getDataFilesWithDeletes().isEmpty()) {
positionMetaExpressions = Collections.emptyList();
} else {
// DELETE/UPDATE cases require position information to write delete files
positionMetaExpressions = ImmutableList.of(
new SlotRef(
boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore()
.getEqualityDeleteFiles().isEmpty();
boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore()
.getPositionDeleteFiles().isEmpty();
if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) {
// DELETE/UPDATE cases require position information to write/read delete files
deleteMetaExpressions.add(new SlotRef(
ImmutableList.of(targetTableRef_.getUniqueAlias(),
VirtualColumn.INPUT_FILE_NAME.getName())),
new SlotRef(
ImmutableList.of(targetTableRef_.getUniqueAlias(),
VirtualColumn.FILE_POSITION.getName())));
VirtualColumn.INPUT_FILE_NAME.getName())));
deleteMetaExpressions.add(new SlotRef(
ImmutableList.of(targetTableRef_.getUniqueAlias(),
VirtualColumn.FILE_POSITION.getName())));
}
if (hasEqualityDeleteFiles) {
deleteMetaExpressions.add(new SlotRef(
ImmutableList.of(targetTableRef_.getUniqueAlias(),
VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER.getName())));
}
selectListItems.add(new SelectListItem(rowPresentExpression, ROW_PRESENT));
selectListItems.addAll(
targetSlotRefs.stream().map(expr -> new SelectListItem(expr, null))
.collect(Collectors.toList()));
selectListItems.addAll(positionMetaExpressions.stream()
selectListItems.addAll(deleteMetaExpressions.stream()
.map(expr -> new SelectListItem(expr, null))
.collect(Collectors.toList()));
selectListItems.addAll(partitionMetaExpressions.stream()
@@ -366,7 +367,7 @@ public class IcebergMergeImpl implements MergeImpl {
rowPresentExpression_ = rowPresentExpression;
targetPartitionMetaExpressions_ = partitionMetaExpressions;
targetPositionMetaExpressions_ = positionMetaExpressions;
targetDeleteMetaExpressions_ = deleteMetaExpressions;
targetExpressions_ = targetSlotRefs;
FromClause fromClause =

View File

@@ -43,18 +43,18 @@ import org.apache.impala.thrift.TQueryOptions;
public class IcebergMergeNode extends PlanNode {
private final List<MergeCase> cases_;
private final Expr rowPresent_;
private List<Expr> positionMetaExprs_;
private List<Expr> deleteMetaExprs_;
private List<Expr> partitionMetaExprs_;
private final TupleDescriptor mergeActionTuple_;
private final TupleId targetTupleId_;
public IcebergMergeNode(PlanNodeId id, PlanNode child, List<MergeCase> cases,
Expr rowPresent, List<Expr> positionMetaExprs, List<Expr> partitionMetaExprs,
Expr rowPresent, List<Expr> deleteMetaExprs, List<Expr> partitionMetaExprs,
TupleDescriptor mergeActionTuple, TupleId targetTupleId) {
super(id, "MERGE");
this.cases_ = cases;
this.rowPresent_ = rowPresent;
this.positionMetaExprs_ = positionMetaExprs;
this.deleteMetaExprs_ = deleteMetaExprs;
this.partitionMetaExprs_ = partitionMetaExprs;
this.mergeActionTuple_ = mergeActionTuple;
this.targetTupleId_ = targetTupleId;
@@ -76,7 +76,7 @@ public class IcebergMergeNode extends PlanNode {
mergeCases.add(tMergeCase);
}
TIcebergMergeNode mergeNode = new TIcebergMergeNode(mergeCases,
rowPresent_.treeToThrift(), Expr.treesToThrift(positionMetaExprs_),
rowPresent_.treeToThrift(), Expr.treesToThrift(deleteMetaExprs_),
Expr.treesToThrift(partitionMetaExprs_), mergeActionTuple_.getId().asInt(),
targetTupleId_.asInt());
msg.setMerge_node(mergeNode);
@@ -92,8 +92,8 @@ public class IcebergMergeNode extends PlanNode {
}
partitionMetaExprs_ =
Expr.substituteList(partitionMetaExprs_, getOutputSmap(), analyzer, true);
positionMetaExprs_ =
Expr.substituteList(positionMetaExprs_, getOutputSmap(), analyzer, true);
deleteMetaExprs_ =
Expr.substituteList(deleteMetaExprs_, getOutputSmap(), analyzer, true);
rowPresent_.substitute(getOutputSmap(), analyzer, true);
}

View File

@@ -384,6 +384,10 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
+ "id in (select max(id) from functional_parquet.iceberg_non_partitioned)) s "
+ "on t.id = s.id "
+ "when matched and s.id > 2 then delete");
// Target table contains equality delete files
AnalyzesOk("merge into functional_parquet.iceberg_v2_delete_equality t "
+ "using functional_parquet.iceberg_v2_delete_equality s "
+ "on t.id = s.id when not matched then insert *");
// Inline view as target
AnalysisError("merge into "
@@ -527,11 +531,5 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
"Target table 'functional_parquet.iceberg_partition_evolution' is incompatible"
+ " with source expressions.\nExpression 's.`month`' (type: INT) is not "
+ "compatible with column 'date_string_col' (type: STRING)");
// Target table contains equality delete files
AnalysisError("merge into functional_parquet.iceberg_v2_delete_equality t "
+ "using functional_parquet.iceberg_v2_delete_equality s "
+ "on t.id = s.id when not matched then insert *",
"MERGE statement is not supported for Iceberg tables "
+ "containing equality deletes.");
}
}

View File

@@ -0,0 +1,23 @@
====
---- QUERY
# Merge into partitioned target table with equality delete files from the source table
# using when not matched insert * clause
merge into iceberg_v2_delete_equality_partitioned target using (select cast(i+ 1000000 as int) i, s, d from iceberg_v2_delete_equality_partitioned) source
on target.i = source.i
when not matched then insert *
---- TYPES
INT,STRING,DATE
---- DML_RESULTS: iceberg_v2_delete_equality_partitioned
1,'str1',2023-12-24
1,'str1',2023-12-25
2,'str2',2023-12-24
4,'str4',2023-12-24
222,'str2',2023-12-25
333333,'str3',2023-12-24
1000001,'str1',2023-12-24
1000001,'str1',2023-12-25
1000002,'str2',2023-12-24
1000004,'str4',2023-12-24
1000222,'str2',2023-12-25
1333333,'str3',2023-12-24
====

View File

@@ -0,0 +1,16 @@
====
---- QUERY
# Merge into partitioned target table with equality delete files from the source table
# using when matched update clause
merge into iceberg_v2_delete_equality_partitioned target using iceberg_v2_delete_equality_partitioned source
on target.i = source.i and target.i > 10 when matched then update set i = cast(source.i + 100 as int)
---- TYPES
INT,STRING,DATE
---- DML_RESULTS: iceberg_v2_delete_equality_partitioned
1,'str1',2023-12-24
1,'str1',2023-12-25
2,'str2',2023-12-24
4,'str4',2023-12-24
322,'str2',2023-12-25
333433,'str3',2023-12-24
====

View File

@@ -31,15 +31,18 @@ from tests.util.iceberg_metadata_util import rewrite_metadata
def create_iceberg_table_from_directory(impala_client, unique_database, table_name,
file_format):
"""Utility function to create an iceberg table from a directory. The directory must
exist in $IMPALA_HOME/testdata/data/iceberg_test with the name 'table_name'"""
file_format, table_location="${IMPALA_HOME}/testdata/data/iceberg_test",
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")):
"""Utility function to create an iceberg table from a directory."""
if not warehouse_prefix and unique_database:
warehouse_prefix = os.getenv("DEFAULT_FS", WAREHOUSE_PREFIX)
# Only orc and parquet tested/supported
assert file_format == "orc" or file_format == "parquet"
local_dir = os.path.join(
os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name)
table_location = os.path.expandvars(table_location)
local_dir = os.path.join(table_location, table_name)
assert os.path.isdir(local_dir)
# Rewrite iceberg metadata to use the warehouse prefix and use unique_database
@@ -50,7 +53,7 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na
check_call(['mkdir', '-p', tmp_dir])
check_call(['cp', '-r', local_dir, tmp_dir])
local_dir = os.path.join(tmp_dir, table_name)
rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata'))
rewrite_metadata(warehouse_prefix, unique_database, os.path.join(local_dir, 'metadata'))
# Put the directory in the database's directory (not the table directory)
hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database)

View File

@@ -2064,6 +2064,20 @@ class TestIcebergV2Table(IcebergTestSuite):
def test_merge_star(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
def test_merge_equality_update(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_v2_delete_equality_partitioned", "parquet",
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database)
def test_merge_equality_insert(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_v2_delete_equality_partitioned", "parquet",
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database)
def test_cleanup(self, unique_database):
"""Test that all uncommitted files written by Impala are removed from the file
system when a DML commit to an Iceberg table fails, and that the effects of the

View File

@@ -111,6 +111,16 @@ def generate_new_path(table_params, file_path):
start_directory, file_path))
result = file_path[start:]
# Remove unneccessary parts if the table location differs from
# the default location, for example:
# /test-warehouse/iceberg_test/hadoop_catalog/ice/table translates to
# /test-warehouse/table
if unique_database:
table_name_start = file_path.find(table_name)
if table_name_start != start + len(start_directory) + 1:
result = start_directory + result[table_name_start - 1:]
if prefix:
result = prefix + result