IMPALA-14496: Impala crashes when it writes multiple delete files per partition in a single DELETE operation

Impala crashes when it needs to write multiple delete files per
partition in a single DELETE operation. It is because
IcebergBufferedDeleteSink has its own DmlExecState object, but
sometimes the methods in TableSinkBase use the RuntimeState's
DmlExecState object. I.e. it can happen that we add a partition
to the IcebergBufferedDeleteSink's DmlExecState, but later we
expect to find it in the RuntimeState's DmlExecState.

This patch adds new methods to TableSinkBase that are specific
for writing delete files, and they always take a DmlExecState
object as a parameter. They are now used by IcebergBufferedDeleteSink.

Testing
 * added e2e tests

Change-Id: I46266007a6356e9ff3b63369dd855aff1396bb72
Reviewed-on: http://gerrit.cloudera.org:8080/23537
Reviewed-by: Mihaly Szjatinya <mszjat@pm.me>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2025-10-14 10:11:15 +02:00
committed by Michael Smith
parent 1a74ee03f3
commit bfae4d0b32
5 changed files with 135 additions and 20 deletions

View File

@@ -270,11 +270,12 @@ Status IcebergBufferedDeleteSink::FlushBufferedRecords(RuntimeState* state) {
row_batch.Reset();
RETURN_IF_ERROR(GetNextRowBatch(&row_batch, &it));
row_batch.VLogRows("IcebergBufferedDeleteSink");
RETURN_IF_ERROR(WriteRowsToPartition(state, &row_batch, current_partition_.get()));
RETURN_IF_ERROR(WriteDeleteRowsToPartition(state, &row_batch,
current_partition_.get(), &dml_exec_state_));
}
DCHECK(current_partition_ != nullptr);
RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.get(),
/*is_delete=*/true, &dml_exec_state_));
RETURN_IF_ERROR(FinalizeDeletePartitionFile(state, current_partition_.get(),
&dml_exec_state_));
current_partition_->writer->Close();
}
return Status::OK();

View File

@@ -375,8 +375,36 @@ Status TableSinkBase::WriteRowsToPartition(
// set.
bool new_file;
while (true) {
RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, indices, &new_file));
if (!new_file) break;
RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
}
return Status::OK();
}
Status TableSinkBase::WriteDeleteRowsToPartition(
RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
DmlExecState* dml_exec_state) {
// The rows of this batch may span multiple files. We repeatedly pass the row batch to
// the writer until it sets new_file to false, indicating that all rows have been
// written. The writer tracks where it is in the batch when it returns with new_file
// set.
bool new_file;
while (true) {
RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, {}, &new_file));
if (!new_file) break;
RETURN_IF_ERROR(FinalizeDeletePartitionFile(state, output_partition, dml_exec_state));
RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
}
return Status::OK();
}
Status TableSinkBase::WriteRowsToFile(
RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
const std::vector<int32_t>& indices, bool *new_file) {
Status status =
output_partition->writer->AppendRows(batch, indices, &new_file);
output_partition->writer->AppendRows(batch, indices, new_file);
if (!status.ok()) {
// IMPALA-10607: Deletes partition file if staging is skipped when appending rows
// fails. Otherwise, it leaves the file in un-finalized state.
@@ -387,10 +415,6 @@ Status TableSinkBase::WriteRowsToPartition(
}
return status;
}
if (!new_file) break;
RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
}
return Status::OK();
}
@@ -414,9 +438,23 @@ bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, OutputPartition* part
}
Status TableSinkBase::FinalizePartitionFile(
RuntimeState* state, OutputPartition* partition, bool is_delete,
DmlExecState* dml_exec_state) {
if (dml_exec_state == nullptr) dml_exec_state = state->dml_exec_state();
RuntimeState* state, OutputPartition* partition) {
return FinalizePartitionFileImpl(
state, partition, /*is_delete=*/false, state->dml_exec_state());
}
Status TableSinkBase::FinalizeDeletePartitionFile(
RuntimeState* state, OutputPartition* partition, DmlExecState* dml_exec_state) {
DCHECK(IsIceberg());
DCHECK(dml_exec_state != nullptr);
DCHECK_NE(dml_exec_state, state->dml_exec_state());
return FinalizePartitionFileImpl(state, partition, /*is_delete=*/true, dml_exec_state);
}
Status TableSinkBase::FinalizePartitionFileImpl(RuntimeState* state,
OutputPartition* partition, bool is_delete, DmlExecState* dml_exec_state) {
DCHECK(dml_exec_state != nullptr);
if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return Status::OK();
SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));

View File

@@ -126,8 +126,13 @@ protected:
/// Updates runtime stats of HDFS with rows written, then closes the file associated
/// with the partition by calling ClosePartitionFile()
Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
bool is_delete = false, DmlExecState* dml_exec_state = nullptr) WARN_UNUSED_RESULT;
Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
WARN_UNUSED_RESULT;
/// Same as above, but for delete files, as such table sinks have their own
/// DmlExecState.
Status FinalizeDeletePartitionFile(RuntimeState* state, OutputPartition* partition,
DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
/// Writes all rows in 'batch' referenced by the row index vector in 'indices' to the
/// partition's writer. If 'indices' is empty, then it writes all rows in 'batch'.
@@ -136,6 +141,13 @@ protected:
const std::vector<int32_t>& indices = {})
WARN_UNUSED_RESULT;
/// Writes all rows to the partition's writer. It is only used for writing delete files
/// as such table sinks have their own DmlExecState.
Status WriteDeleteRowsToPartition(
RuntimeState* state, RowBatch* batch, OutputPartition* partition,
DmlExecState* dml_exec_state)
WARN_UNUSED_RESULT;
/// Closes the hdfs file for this partition as well as the writer.
Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
WARN_UNUSED_RESULT;
@@ -204,6 +216,16 @@ protected:
RuntimeProfile::Counter* hdfs_write_timer_;
/// Time spent compressing data
RuntimeProfile::Counter* compress_timer_;
private:
/// Writes rows to the partition's writer. Sets 'new_file' to true when it cannot write
/// all rows to the current output file.
Status WriteRowsToFile(
RuntimeState* state, RowBatch* batch, OutputPartition* partition,
const std::vector<int32_t>& indices, bool *new_file) WARN_UNUSED_RESULT;
Status FinalizePartitionFileImpl(RuntimeState* state, OutputPartition* partition,
bool is_delete, DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
};
}

View File

@@ -0,0 +1,49 @@
====
---- QUERY
# Regression test for IMPALA-14496 where a DELETE operation needs to write
# multiple delete files per partition.
CREATE TABLE multiple_deletes(
str STRING NULL,
year INT NULL,
last_modified TIMESTAMP NULL)
PARTITIONED BY SPEC (year)
STORED AS ICEBERG
TBLPROPERTIES ('format-version'='2');
INSERT INTO multiple_deletes SELECT string_col, year, timestamp_col FROM functional_parquet.alltypes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
SELECT count(*) FROM multiple_deletes;
---- RESULTS
7475200
---- TYPES
BIGINT
====
---- QUERY
SET parquet_file_size=8m;
DELETE FROM multiple_deletes WHERE last_modified >= '2008-12-30';
SELECT count(*) FROM multiple_deletes;
---- RESULTS
0
---- TYPES
BIGINT
====
---- QUERY
# Verify that the above DELETE statement wrote 4 delete files (2 per partition) in total.
SELECT count(*) FROM $DATABASE.multiple_deletes.`files`
WHERE content = 1;
---- RESULTS
4
---- TYPES
BIGINT
====

View File

@@ -2188,6 +2188,11 @@ class TestIcebergV2Table(IcebergTestSuite):
vector.unset_exec_option('num_nodes')
self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, unique_database)
def test_writing_multiple_deletes_per_partition(self, vector, unique_database):
"""Test writing multiple delete files partition in a single DELETE operation."""
self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', vector,
use_db=unique_database)
def test_cleanup(self, unique_database):
"""Test that all uncommitted files written by Impala are removed from the file
system when a DML commit to an Iceberg table fails, and that the effects of the