diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index bdb1b45a4..93a2007d0 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -50,8 +50,8 @@ Status IcebergMergePlanNode::Init(const TPlanNode& tnode, FragmentState* state) RETURN_IF_ERROR(ScalarExpr::Create( tnode.merge_node.row_present, *row_descriptor_, state, pool, &row_present_)); - RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.position_meta_exprs, - *row_descriptor_, state, pool, &position_meta_exprs_)); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.delete_meta_exprs, + *row_descriptor_, state, pool, &delete_meta_exprs_)); RETURN_IF_ERROR(ScalarExpr::Create(tnode.merge_node.partition_meta_exprs, *row_descriptor_, state, pool, &partition_meta_exprs_)); @@ -99,7 +99,7 @@ IcebergMergeNode::IcebergMergeNode( child_row_idx_(0), child_eos_(false), row_present_(pnode.row_present_), - position_meta_exprs_(pnode.position_meta_exprs_), + delete_meta_exprs_(pnode.delete_meta_exprs_), partition_meta_exprs_(pnode.partition_meta_exprs_) { DCHECK(pnode.merge_action_tuple_id_ != -1); DCHECK(pnode.target_tuple_id_ != -1); @@ -132,8 +132,8 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) { expr_perm_pool_.get(), expr_results_pool_.get(), &row_present_evaluator_)); RETURN_IF_ERROR( - ScalarExprEvaluator::Create(position_meta_exprs_, state, state->obj_pool(), - expr_perm_pool_.get(), expr_results_pool_.get(), &position_meta_evaluators_)); + ScalarExprEvaluator::Create(delete_meta_exprs_, state, state->obj_pool(), + expr_perm_pool_.get(), expr_results_pool_.get(), &delete_meta_evaluators_)); RETURN_IF_ERROR( ScalarExprEvaluator::Create(partition_meta_exprs_, state, state->obj_pool(), @@ -155,7 +155,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) { new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); RETURN_IF_ERROR(row_present_evaluator_->Open(state)); - RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(delete_meta_evaluators_, state)); RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, state)); for (auto* merge_case : all_cases_) { @@ -293,18 +293,18 @@ void IcebergMergeNode::Close(RuntimeState* state) { merge_case->Close(state); } row_present_evaluator_->Close(state); - ScalarExprEvaluator::Close(position_meta_evaluators_, state); + ScalarExprEvaluator::Close(delete_meta_evaluators_, state); ScalarExprEvaluator::Close(partition_meta_evaluators_, state); row_present_->Close(); - ScalarExpr::Close(position_meta_exprs_); + ScalarExpr::Close(delete_meta_exprs_); ScalarExpr::Close(partition_meta_exprs_); ExecNode::Close(state); } -const std::vector& IcebergMergeNode::PositionMetaEvals() { - return position_meta_evaluators_; +const std::vector& IcebergMergeNode::DeleteMetaEvals() { + return delete_meta_evaluators_; } const std::vector& IcebergMergeNode::PartitionMetaEvals() { @@ -328,7 +328,7 @@ Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& parent) combined_evaluators_.insert( combined_evaluators_.end(), output_evaluators_.begin(), output_evaluators_.end()); combined_evaluators_.insert(combined_evaluators_.end(), - parent.PositionMetaEvals().begin(), parent.PositionMetaEvals().end()); + parent.DeleteMetaEvals().begin(), parent.DeleteMetaEvals().end()); combined_evaluators_.insert(combined_evaluators_.end(), parent.PartitionMetaEvals().begin(), parent.PartitionMetaEvals().end()); return Status::OK(); diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h index be08ac698..bc2af5d14 100644 --- a/be/src/exec/iceberg-merge-node.h +++ b/be/src/exec/iceberg-merge-node.h @@ -61,8 +61,8 @@ class IcebergMergePlanNode : public PlanNode { /// target tuple, the source tuple, or both. ScalarExpr* row_present_ = nullptr; - /// Exprs used to identify the position of each target record - std::vector position_meta_exprs_; + /// Exprs used to identify the position/delete information of each target record + std::vector delete_meta_exprs_; /// Exprs used to identify the partitioning properties of a record std::vector partition_meta_exprs_; @@ -91,7 +91,7 @@ class IcebergMergeNode : public ExecNode { Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; Status Reset(RuntimeState* state, RowBatch* row_batch) override; void Close(RuntimeState* state) override; - const std::vector& PositionMetaEvals(); + const std::vector& DeleteMetaEvals(); const std::vector& PartitionMetaEvals(); private: @@ -119,9 +119,9 @@ class IcebergMergeNode : public ExecNode { int child_row_idx_; bool child_eos_; ScalarExpr* row_present_; - std::vector position_meta_exprs_; + std::vector delete_meta_exprs_; std::vector partition_meta_exprs_; - std::vector position_meta_evaluators_; + std::vector delete_meta_evaluators_; std::vector partition_meta_evaluators_; ScalarExprEvaluator* row_present_evaluator_; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 47872faf2..3760a6b9f 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -758,7 +758,7 @@ struct TIcebergMergeCase { struct TIcebergMergeNode { 1: required list cases 2: required Exprs.TExpr row_present - 3: required list position_meta_exprs + 3: required list delete_meta_exprs 4: required list partition_meta_exprs 5: required Types.TTupleId merge_action_tuple_id 6: required Types.TTupleId target_tuple_id diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java index 6c6c00c45..536b57245 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java @@ -80,7 +80,7 @@ public class IcebergMergeImpl implements MergeImpl { private TupleId targetTupleId_; private List targetExpressions_; - private List targetPositionMetaExpressions_; + private List targetDeleteMetaExpressions_; private List targetPartitionMetaExpressions_; private List targetPartitionExpressions_; private MergeSorting targetSorting_; @@ -94,7 +94,7 @@ public class IcebergMergeImpl implements MergeImpl { on_ = on; table_ = targetTableRef_.getTable(); targetExpressions_ = Lists.newArrayList(); - targetPositionMetaExpressions_ = Lists.newArrayList(); + targetDeleteMetaExpressions_ = Lists.newArrayList(); targetPartitionMetaExpressions_ = Lists.newArrayList(); targetPartitionExpressions_ = Lists.newArrayList(); } @@ -127,12 +127,6 @@ public class IcebergMergeImpl implements MergeImpl { "Unsupported '%s': '%s' for Iceberg table: %s", TableProperties.MERGE_MODE, modifyWriteMode, icebergTable_.getFullName())); } - if (!icebergTable_.getContentFileStore().getEqualityDeleteFiles().isEmpty()) { - throw new AnalysisException( - "MERGE statement is not supported for Iceberg tables " - + "containing equality deletes."); - //TODO: IMPALA-13674 - } for (Column column : icebergTable_.getColumns()) { Path slotPath = new Path(targetTableRef_.desc_, Collections.singletonList(column.getName())); @@ -205,8 +199,8 @@ public class IcebergMergeImpl implements MergeImpl { targetExpressions_ = Expr.substituteList(targetExpressions_, smap, analyzer, true); targetPartitionMetaExpressions_ = Expr.substituteList(targetPartitionMetaExpressions_, smap, analyzer, true); - targetPositionMetaExpressions_ = - Expr.substituteList(targetPositionMetaExpressions_, smap, analyzer, true); + targetDeleteMetaExpressions_ = + Expr.substituteList(targetDeleteMetaExpressions_, smap, analyzer, true); mergeActionExpression_ = mergeActionExpression_.substitute(smap, analyzer, true); targetPartitionExpressions_ = Expr.substituteList(targetPartitionExpressions_, smap, analyzer, true); @@ -218,7 +212,7 @@ public class IcebergMergeImpl implements MergeImpl { @Override public List getResultExprs() { List result = Lists.newArrayList(targetExpressions_); - result.addAll(targetPositionMetaExpressions_); + result.addAll(targetDeleteMetaExpressions_); result.addAll(targetPartitionMetaExpressions_); result.add(mergeActionExpression_); return result; @@ -243,10 +237,10 @@ public class IcebergMergeImpl implements MergeImpl { // the sink and the merge node differs. List copyOfCases = mergeStmt_.getCases().stream().map(MergeCase::clone).collect(Collectors.toList()); - List positionMetaExprs = Expr.cloneList(targetPositionMetaExpressions_); + List deleteMetaExprs = Expr.cloneList(targetDeleteMetaExpressions_); List partitionMetaExprs = Expr.cloneList(targetPartitionMetaExpressions_); IcebergMergeNode mergeNode = new IcebergMergeNode(ctx.getNextNodeId(), child, - copyOfCases, rowPresentExpression_.clone(), positionMetaExprs, partitionMetaExprs, + copyOfCases, rowPresentExpression_.clone(), deleteMetaExprs, partitionMetaExprs, mergeActionTuple_, targetTupleId_); mergeNode.init(analyzer); return mergeNode; @@ -279,7 +273,7 @@ public class IcebergMergeImpl implements MergeImpl { deletePartitionKeys = targetPartitionMetaExpressions_; } return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_, - deletePartitionKeys, targetPositionMetaExpressions_, deleteTableId_); + deletePartitionKeys, targetDeleteMetaExpressions_, deleteTableId_); } public TableSink createInsertSink() { @@ -335,27 +329,34 @@ public class IcebergMergeImpl implements MergeImpl { VirtualColumn.ICEBERG_PARTITION_SERIALIZED.getName()))); } - List positionMetaExpressions; + List deleteMetaExpressions = Lists.newArrayList(); - if (mergeStmt_.hasOnlyInsertCases() && icebergTable_.getContentFileStore() - .getDataFilesWithDeletes().isEmpty()) { - positionMetaExpressions = Collections.emptyList(); - } else { - // DELETE/UPDATE cases require position information to write delete files - positionMetaExpressions = ImmutableList.of( - new SlotRef( + boolean hasEqualityDeleteFiles = !icebergTable_.getContentFileStore() + .getEqualityDeleteFiles().isEmpty(); + boolean hasPositionDeleteFiles = !icebergTable_.getContentFileStore() + .getPositionDeleteFiles().isEmpty(); + + if (!mergeStmt_.hasOnlyInsertCases() || hasPositionDeleteFiles) { + // DELETE/UPDATE cases require position information to write/read delete files + deleteMetaExpressions.add(new SlotRef( ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.INPUT_FILE_NAME.getName())), - new SlotRef( - ImmutableList.of(targetTableRef_.getUniqueAlias(), - VirtualColumn.FILE_POSITION.getName()))); + VirtualColumn.INPUT_FILE_NAME.getName()))); + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.FILE_POSITION.getName()))); + } + + if (hasEqualityDeleteFiles) { + deleteMetaExpressions.add(new SlotRef( + ImmutableList.of(targetTableRef_.getUniqueAlias(), + VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER.getName()))); } selectListItems.add(new SelectListItem(rowPresentExpression, ROW_PRESENT)); selectListItems.addAll( targetSlotRefs.stream().map(expr -> new SelectListItem(expr, null)) .collect(Collectors.toList())); - selectListItems.addAll(positionMetaExpressions.stream() + selectListItems.addAll(deleteMetaExpressions.stream() .map(expr -> new SelectListItem(expr, null)) .collect(Collectors.toList())); selectListItems.addAll(partitionMetaExpressions.stream() @@ -366,7 +367,7 @@ public class IcebergMergeImpl implements MergeImpl { rowPresentExpression_ = rowPresentExpression; targetPartitionMetaExpressions_ = partitionMetaExpressions; - targetPositionMetaExpressions_ = positionMetaExpressions; + targetDeleteMetaExpressions_ = deleteMetaExpressions; targetExpressions_ = targetSlotRefs; FromClause fromClause = diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java index e608d131e..c8df63a20 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java @@ -43,18 +43,18 @@ import org.apache.impala.thrift.TQueryOptions; public class IcebergMergeNode extends PlanNode { private final List cases_; private final Expr rowPresent_; - private List positionMetaExprs_; + private List deleteMetaExprs_; private List partitionMetaExprs_; private final TupleDescriptor mergeActionTuple_; private final TupleId targetTupleId_; public IcebergMergeNode(PlanNodeId id, PlanNode child, List cases, - Expr rowPresent, List positionMetaExprs, List partitionMetaExprs, + Expr rowPresent, List deleteMetaExprs, List partitionMetaExprs, TupleDescriptor mergeActionTuple, TupleId targetTupleId) { super(id, "MERGE"); this.cases_ = cases; this.rowPresent_ = rowPresent; - this.positionMetaExprs_ = positionMetaExprs; + this.deleteMetaExprs_ = deleteMetaExprs; this.partitionMetaExprs_ = partitionMetaExprs; this.mergeActionTuple_ = mergeActionTuple; this.targetTupleId_ = targetTupleId; @@ -76,7 +76,7 @@ public class IcebergMergeNode extends PlanNode { mergeCases.add(tMergeCase); } TIcebergMergeNode mergeNode = new TIcebergMergeNode(mergeCases, - rowPresent_.treeToThrift(), Expr.treesToThrift(positionMetaExprs_), + rowPresent_.treeToThrift(), Expr.treesToThrift(deleteMetaExprs_), Expr.treesToThrift(partitionMetaExprs_), mergeActionTuple_.getId().asInt(), targetTupleId_.asInt()); msg.setMerge_node(mergeNode); @@ -92,8 +92,8 @@ public class IcebergMergeNode extends PlanNode { } partitionMetaExprs_ = Expr.substituteList(partitionMetaExprs_, getOutputSmap(), analyzer, true); - positionMetaExprs_ = - Expr.substituteList(positionMetaExprs_, getOutputSmap(), analyzer, true); + deleteMetaExprs_ = + Expr.substituteList(deleteMetaExprs_, getOutputSmap(), analyzer, true); rowPresent_.substitute(getOutputSmap(), analyzer, true); } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java index 5f6ac3af1..fff069a32 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java @@ -384,6 +384,10 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { + "id in (select max(id) from functional_parquet.iceberg_non_partitioned)) s " + "on t.id = s.id " + "when matched and s.id > 2 then delete"); + // Target table contains equality delete files + AnalyzesOk("merge into functional_parquet.iceberg_v2_delete_equality t " + + "using functional_parquet.iceberg_v2_delete_equality s " + + "on t.id = s.id when not matched then insert *"); // Inline view as target AnalysisError("merge into " @@ -527,11 +531,5 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { "Target table 'functional_parquet.iceberg_partition_evolution' is incompatible" + " with source expressions.\nExpression 's.`month`' (type: INT) is not " + "compatible with column 'date_string_col' (type: STRING)"); - // Target table contains equality delete files - AnalysisError("merge into functional_parquet.iceberg_v2_delete_equality t " - + "using functional_parquet.iceberg_v2_delete_equality s " - + "on t.id = s.id when not matched then insert *", - "MERGE statement is not supported for Iceberg tables " - + "containing equality deletes."); } } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test new file mode 100644 index 000000000..049829ee5 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-insert.test @@ -0,0 +1,23 @@ +==== +---- QUERY +# Merge into partitioned target table with equality delete files from the source table +# using when not matched insert * clause +merge into iceberg_v2_delete_equality_partitioned target using (select cast(i+ 1000000 as int) i, s, d from iceberg_v2_delete_equality_partitioned) source +on target.i = source.i +when not matched then insert * +---- TYPES +INT,STRING,DATE +---- DML_RESULTS: iceberg_v2_delete_equality_partitioned +1,'str1',2023-12-24 +1,'str1',2023-12-25 +2,'str2',2023-12-24 +4,'str4',2023-12-24 +222,'str2',2023-12-25 +333333,'str3',2023-12-24 +1000001,'str1',2023-12-24 +1000001,'str1',2023-12-25 +1000002,'str2',2023-12-24 +1000004,'str4',2023-12-24 +1000222,'str2',2023-12-25 +1333333,'str3',2023-12-24 +==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test new file mode 100644 index 000000000..1fc2f5ea5 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-equality-update.test @@ -0,0 +1,16 @@ +==== +---- QUERY +# Merge into partitioned target table with equality delete files from the source table +# using when matched update clause +merge into iceberg_v2_delete_equality_partitioned target using iceberg_v2_delete_equality_partitioned source +on target.i = source.i and target.i > 10 when matched then update set i = cast(source.i + 100 as int) +---- TYPES +INT,STRING,DATE +---- DML_RESULTS: iceberg_v2_delete_equality_partitioned +1,'str1',2023-12-24 +1,'str1',2023-12-25 +2,'str2',2023-12-24 +4,'str4',2023-12-24 +322,'str2',2023-12-25 +333433,'str3',2023-12-24 +==== diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index 66431dd16..45718c717 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -31,15 +31,18 @@ from tests.util.iceberg_metadata_util import rewrite_metadata def create_iceberg_table_from_directory(impala_client, unique_database, table_name, - file_format): - """Utility function to create an iceberg table from a directory. The directory must - exist in $IMPALA_HOME/testdata/data/iceberg_test with the name 'table_name'""" + file_format, table_location="${IMPALA_HOME}/testdata/data/iceberg_test", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")): + """Utility function to create an iceberg table from a directory.""" + + if not warehouse_prefix and unique_database: + warehouse_prefix = os.getenv("DEFAULT_FS", WAREHOUSE_PREFIX) # Only orc and parquet tested/supported assert file_format == "orc" or file_format == "parquet" - local_dir = os.path.join( - os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name) + table_location = os.path.expandvars(table_location) + local_dir = os.path.join(table_location, table_name) assert os.path.isdir(local_dir) # Rewrite iceberg metadata to use the warehouse prefix and use unique_database @@ -50,7 +53,7 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na check_call(['mkdir', '-p', tmp_dir]) check_call(['cp', '-r', local_dir, tmp_dir]) local_dir = os.path.join(tmp_dir, table_name) - rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata')) + rewrite_metadata(warehouse_prefix, unique_database, os.path.join(local_dir, 'metadata')) # Put the directory in the database's directory (not the table directory) hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database) diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index de7f772ff..36b4909b8 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -2064,6 +2064,20 @@ class TestIcebergV2Table(IcebergTestSuite): def test_merge_star(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database) + def test_merge_equality_update(self, vector, unique_database): + create_iceberg_table_from_directory(self.client, unique_database, + "iceberg_v2_delete_equality_partitioned", "parquet", + table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) + self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database) + + def test_merge_equality_insert(self, vector, unique_database): + create_iceberg_table_from_directory(self.client, unique_database, + "iceberg_v2_delete_equality_partitioned", "parquet", + table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", + warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) + self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database) + def test_cleanup(self, unique_database): """Test that all uncommitted files written by Impala are removed from the file system when a DML commit to an Iceberg table fails, and that the effects of the diff --git a/tests/util/iceberg_metadata_util.py b/tests/util/iceberg_metadata_util.py index a97b14d90..0f8e9b830 100644 --- a/tests/util/iceberg_metadata_util.py +++ b/tests/util/iceberg_metadata_util.py @@ -111,6 +111,16 @@ def generate_new_path(table_params, file_path): start_directory, file_path)) result = file_path[start:] + + # Remove unneccessary parts if the table location differs from + # the default location, for example: + # /test-warehouse/iceberg_test/hadoop_catalog/ice/table translates to + # /test-warehouse/table + if unique_database: + table_name_start = file_path.find(table_name) + if table_name_start != start + len(start_directory) + 1: + result = start_directory + result[table_name_start - 1:] + if prefix: result = prefix + result