IMPALA-12810: Simplify IcebergDeleteNode and IcebergDeleteBuilder

Now that we have the DIRECTED distribution mode, some parts of
IcebergDeleteNode and IcebergDeleteBuilder became dead code. It is
time to simplify the above classes.

IcebergDeleteBuilder and KrpcDataStreamSender now also tolerate
NULL file paths which are also not an error in the hash join mode.

Change-Id: I3ba02b33433990950b49628f11e732e01ed8a34d
Reviewed-on: http://gerrit.cloudera.org:8080/21258
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2024-03-01 15:43:30 +01:00
committed by Impala Public Jenkins
parent ef6dad694d
commit 0334f83704
21 changed files with 219 additions and 104 deletions

View File

@@ -141,7 +141,6 @@ Status IcebergDeleteBuilder::CalculateDataFiles() {
const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
fragment_it->second->instance_ctx_pbs();
for (auto ctx : instance_ctx_pbs) {
ctx->per_node_scan_ranges().size();
auto ranges = ctx->per_node_scan_ranges().find(delete_scan_node->tnode_->node_id);
if (ranges == ctx->per_node_scan_ranges().end()) continue;
@@ -184,8 +183,6 @@ Status IcebergDeleteBuilder::CalculateDataFiles() {
}
}
is_distributed_mode_ = deleted_rows_.empty();
return Status::OK();
}
@@ -218,13 +215,13 @@ Status IcebergDeleteBuilder::Open(RuntimeState* state) {
Status IcebergDeleteBuilder::Send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(profile()->total_time_counter());
RETURN_IF_ERROR(AddBatch(batch));
RETURN_IF_ERROR(AddBatch(state, batch));
COUNTER_ADD(num_build_rows_, batch->num_rows());
return Status::OK();
}
Status IcebergDeleteBuilder::AddBatch(RowBatch* batch) {
RETURN_IF_ERROR(ProcessBuildBatch(batch));
Status IcebergDeleteBuilder::AddBatch(RuntimeState* state, RowBatch* batch) {
RETURN_IF_ERROR(ProcessBuildBatch(state, batch));
return Status::OK();
}
@@ -272,7 +269,8 @@ string IcebergDeleteBuilder::DebugString() const {
return ss.str();
}
Status IcebergDeleteBuilder::ProcessBuildBatch(RowBatch* build_batch) {
Status IcebergDeleteBuilder::ProcessBuildBatch(RuntimeState* state,
RowBatch* build_batch) {
FOREACH_ROW(build_batch, 0, build_batch_iter) {
TupleRow* build_row = build_batch_iter.Get();
@@ -281,39 +279,15 @@ Status IcebergDeleteBuilder::ProcessBuildBatch(RowBatch* build_batch) {
const int length = file_path->Len();
if (UNLIKELY(length == 0)) {
return Status(Substitute("NULL found as file_path in delete file"));
state->LogError(
ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete file"));
continue;
}
int64_t* id = build_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
if (is_distributed_mode_) {
// Distributed mode, deleted_rows_ is empty after init, only the relevant delete
// files are sent to this fragment, processing everything
auto it = deleted_rows_.find(*file_path);
if (it == deleted_rows_.end()) {
char* ptr_copy = reinterpret_cast<char*>(expr_results_pool_->Allocate(length));
if (ptr_copy == nullptr) {
return Status("Failed to allocate memory.");
}
memcpy(ptr_copy, file_path->Ptr(), length);
std::pair<DeleteRowHashTable::iterator, bool> retval =
deleted_rows_.emplace(std::piecewise_construct,
std::forward_as_tuple(ptr_copy, length), std::forward_as_tuple());
// emplace succeeded
DCHECK(retval.second == true);
it = retval.first;
it->second.reserve(INITIAL_DELETE_VECTOR_CAPACITY);
}
auto it = deleted_rows_.find(*file_path);
// deleted_rows_ filled with the relevant data file names, processing only those.
if (it != deleted_rows_.end()) {
it->second.emplace_back(*id);
} else {
// Broadcast mode, deleted_rows_ filled with the relevant data file names,
// processing only those
auto it = deleted_rows_.find(*file_path);
if (it != deleted_rows_.end()) {
it->second.emplace_back(*id);
}
}
}

View File

@@ -71,15 +71,10 @@ class IcebergDeleteBuilderConfig : public JoinBuilderConfig {
/// files, and stores them in unordered_map<file_path, ordered vector of row ids> to allow
/// fast probing.
///
/// Similarly to PartitionedHashJoin, there are 2 modes:
/// Unlike the PartitionedHashJoin, there is only one mode:
///
/// Broadcast: every fragment receives all data from delete files, filters them and
/// stores only the ones which will be needed to process the assigned data files.
///
/// Partitioned: Both data and delete files are hashed by the file path. This means
/// there is no need to filter further the delete files, but it can cause minor data
/// skew due to the imbalance in the number of deleted rows corresponding to different
/// data files.
/// Directed: each fragment will only receive delete records that apply to data files
/// processed by this executor
///
/// Shared Build
/// ------------
@@ -125,15 +120,14 @@ class IcebergDeleteBuilder : public JoinBuilder {
std::unordered_map<impala::StringValue, DeleteRowVector, StringValueHashWrapper>;
DeleteRowHashTable& deleted_rows() { return deleted_rows_; }
bool IsDistributedMode() { return is_distributed_mode_; }
private:
/// Reads the rows in build_batch and collects them into delete_hash_.
Status ProcessBuildBatch(RowBatch* build_batch);
Status ProcessBuildBatch(RuntimeState* state, RowBatch* build_batch);
/// Helper method for Send() that does the actual work apart from updating the
/// counters.
Status AddBatch(RowBatch* build_batch);
Status AddBatch(RuntimeState* state, RowBatch* build_batch);
/// Helper method for FlushFinal() that does the actual work.
Status FinalizeBuild(RuntimeState* state);
@@ -154,9 +148,6 @@ class IcebergDeleteBuilder : public JoinBuilder {
int file_path_offset_;
int pos_offset_;
// Distribution mode of the node
bool is_distributed_mode_;
// Use the length of a cache line as initial capacity
static constexpr size_t INITIAL_DELETE_VECTOR_CAPACITY = 8;

View File

@@ -187,17 +187,10 @@ Status IcebergDeleteNode::ProcessProbeBatch(RowBatch* out_batch) {
DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_IN_BATCH);
DCHECK_NE(probe_batch_pos_, -1);
int rows_added = 0;
Status status;
TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
SCOPED_TIMER(probe_timer_);
rows_added = ProcessProbeBatch(prefetch_mode, out_batch, &status);
if (UNLIKELY(rows_added < 0)) {
DCHECK(!status.ok());
return status;
}
DCHECK(status.ok());
rows_added = ProcessProbeBatch(prefetch_mode, out_batch);
out_batch->CommitRows(rows_added);
return Status::OK();
}
@@ -326,35 +319,15 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
void IcebergDeleteNode::IcebergDeleteState::Update(
impala::StringValue* file_path, int64_t* next_probe_pos) {
DCHECK(builder_ != nullptr);
// Making sure the row ids are in ascending order inside a row batch in broadcast mode
DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
|| current_probe_pos_ < *next_probe_pos);
bool is_consecutive_pos = false;
if(current_probe_pos_ != INVALID_ROW_ID) {
const int64_t step = *next_probe_pos - current_probe_pos_;
is_consecutive_pos = step == 1;
}
// Making sure the row ids are in ascending order inside a row batch in DIRECTED mode
DCHECK(current_probe_pos_ == INVALID_ROW_ID || current_probe_pos_ < *next_probe_pos);
current_probe_pos_ = *next_probe_pos;
if (previous_file_path_ != nullptr
&& (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) {
// Fast path if the file did not change, no need to hash again
if (previous_file_path_ != nullptr) {
// We already have a file path, no need to hash again
if (current_deleted_pos_row_id_ != INVALID_ROW_ID
&& current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) {
UpdateImpl();
} else if (builder_->IsDistributedMode() && !is_consecutive_pos) {
// In distributed mode (which means PARTITIONED JOIN distribution mode) we cannot
// rely on ascending row order, not even inside row batches, not even when the
// previous file path is the same as the current one.
// This is because files with multiple blocks can be processed by multiple hosts
// in parallel, then the rows are getting hash-exchanged based on their file paths.
// Then the exchange-receiver at the LHS coalesces the row batches from multiple
// senders, hence the row IDs getting unordered. So we are always doing a binary
// search here to find the proper delete row id.
// This won't be a problem with the DIRECTED distribution mode (see IMPALA-12308)
// which will behave similarly to the BROADCAST mode in this regard.
DCHECK_EQ(*file_path, *previous_file_path_);
UpdateImpl();
}
} else {
auto it = builder_->deleted_rows().find(*file_path);
@@ -389,7 +362,7 @@ void IcebergDeleteNode::IcebergDeleteState::Delete() {
}
bool IcebergDeleteNode::IcebergDeleteState::NeedCheck() const {
return builder_->IsDistributedMode() || current_deleted_pos_row_id_ != INVALID_ROW_ID
return current_deleted_pos_row_id_ != INVALID_ROW_ID
|| current_probe_pos_ == INVALID_ROW_ID;
}
@@ -407,7 +380,7 @@ void IcebergDeleteNode::IcebergDeleteState::Reset() {
}
bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
DCHECK(current_probe_row_ != nullptr);
TupleRow* out_row = out_batch_iterator->Get();
if (!iceberg_delete_state_.IsDeleted()) {
@@ -423,7 +396,7 @@ bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
}
bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRowNoCheck(
RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
DCHECK(current_probe_row_ != nullptr);
TupleRow* out_row = out_batch_iterator->Get();
out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
@@ -435,7 +408,7 @@ bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRowNoCheck(
}
int IcebergDeleteNode::ProcessProbeBatch(
TPrefetchMode::type prefetch_mode, RowBatch* out_batch, Status* __restrict__ status) {
TPrefetchMode::type prefetch_mode, RowBatch* out_batch) {
DCHECK(!out_batch->AtCapacity());
DCHECK_GE(probe_batch_pos_, 0);
RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
@@ -444,7 +417,7 @@ int IcebergDeleteNode::ProcessProbeBatch(
RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
int remaining_capacity = max_rows;
while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0 && status->ok()) {
while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
current_probe_row_ = probe_batch_iterator.Get();
if (iceberg_delete_state_.NeedCheck()) {
impala::StringValue* file_path =
@@ -453,12 +426,12 @@ int IcebergDeleteNode::ProcessProbeBatch(
current_probe_row_->GetTuple(0)->GetBigIntSlot(pos_offset_);
iceberg_delete_state_.Update(file_path, current_probe_pos);
if (!ProcessProbeRow(&out_batch_iterator, &remaining_capacity, status)) {
if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
if (!ProcessProbeRow(&out_batch_iterator, &remaining_capacity)) {
DCHECK_EQ(remaining_capacity, 0);
}
} else {
if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity, status)) {
if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity)) {
DCHECK_EQ(remaining_capacity, 0);
}
}
@@ -468,12 +441,7 @@ int IcebergDeleteNode::ProcessProbeBatch(
probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0));
}
int num_rows_added;
if (LIKELY(status->ok())) {
num_rows_added = max_rows - remaining_capacity;
} else {
num_rows_added = -1;
}
int num_rows_added = max_rows - remaining_capacity;
// Clear state as ascending order of row ids are not guaranteed between probe row
// batches

View File

@@ -116,18 +116,18 @@ class IcebergDeleteNode : public BlockingJoinNode {
/// Probes 'current_probe_row_' against the hash tables and append outputs
/// to output batch.
bool inline ProcessProbeRow(RowBatch::Iterator* out_batch_iterator,
int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
int* remaining_capacity) WARN_UNUSED_RESULT;
/// Append outputs to output batch.
bool inline ProcessProbeRowNoCheck(RowBatch::Iterator* out_batch_iterator,
int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
int* remaining_capacity) WARN_UNUSED_RESULT;
/// Process probe rows from probe_batch_. Returns either if out_batch is full or
/// probe_batch_ is entirely consumed.
/// Returns the number of rows added to out_batch; -1 on error (and *status will
/// be set). This function doesn't commit rows to the output batch so it's the caller's
/// responsibility to do so.
int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, Status* status);
int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch);
/// Wrapper that ProcessProbeBatch() and commits the rows to 'out_batch' on success.
Status ProcessProbeBatch(RowBatch* out_batch);

View File

@@ -1107,7 +1107,8 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
if (filename_value_ss.ptr == prev_filename_ptr) {
// If the filename pointer is the same as the previous one then we can instantly
// send the row to the same channels as the previous row.
DCHECK(skipped_prev_row || !prev_channels.empty());
DCHECK(skipped_prev_row || !prev_channels.empty() ||
(filename_value_ss.len == 0 && prev_channels.empty()));
for (Channel* ch : prev_channels) RETURN_IF_ERROR(ch->AddRow(tuple_row));
continue;
}
@@ -1121,8 +1122,14 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
// files that remains in the new snapshot.
// Another use-case is table sampling where we read only a subset of the data
// files.
VLOG(3) << "Row from delete file refers to a non-existing data file: " <<
filename;
// A third case is when the delete record is invalid.
if (UNLIKELY(filename_value_ss.len == 0)) {
state->LogError(
ErrorMsg(TErrorCode::GENERAL, "NULL found as file_path in delete file"));
} else {
VLOG(3) << "Row from delete file refers to a non-existing data file: " <<
filename;
}
skipped_prev_row = true;
continue;
}

44
testdata/data/README vendored
View File

@@ -1130,6 +1130,50 @@ iceberg_v2_equality_delete_schema_evolution:
4: Update a row with Nifi where i=4 to the following:
(44, 2024-03-21, "str4", 4444)
iceberg_v2_null_delete_record:
1) Created the table via Impala and added some records to it.
CREATE TABLE iceberg_v2_null_delete_record(i INT, j INT)
STORED BY ICEBERG;
INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);
INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);
(We need at least 2 data files to use DIRECTED mode in KrpcDataStreamSender)
2) Created the following temporary table:
CREATE TABLE iceberg_v2_null_delete_record_pos_delete (file_path STRING, pos BIGINT)
STORED BY ICEBERG;
Manually rewrote the metadata JSON file of this table, so the schema elements have the
following field ids (there are two places where I had to modify the schemas):
file_path : 2147483546
pos : 2147483545
3) Inserted data files into iceberg_v2_null_delete_record_pos_delete:
INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
(NULL, 0);
INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
('<data file path of iceberg_v2_null_delete_record>', 0), (NULL, 3);
INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
(NULL, 2), ('<data file path of iceberg_v2_null_delete_record>', 0);
INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
(NULL, 0), (NULL, 1), (NULL, 2);
The written Parquet files have the schema of position delete files (with the
correct Iceberg field ids)
4) Copied iceberg_v2_null_delete_record to the local filesystem and applied
the following modifications:
* added the Parquet files from iceberg_v2_null_delete_record_pos_delete to
the /data directory
* manually edited the metadata JSON, and the manifest and manifest list files to
register the delete files in the table
arrays_big.parq:
Generated with RandomNestedDataGenerator.java from the following schema:
{

View File

@@ -0,0 +1,89 @@
{
"format-version" : 2,
"table-uuid" : "a4e56638-534e-41f1-848a-921f8a3364e2",
"location" : "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record",
"last-sequence-number" : 2,
"last-updated-ms" : 1712662161442,
"last-column-id" : 2,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "i",
"required" : false,
"type" : "int"
}, {
"id" : 2,
"name" : "j",
"required" : false,
"type" : "int"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"engine.hive.enabled" : "true",
"write.merge.mode" : "merge-on-read",
"write.format.default" : "parquet",
"write.delete.mode" : "merge-on-read",
"iceberg.catalog_location" : "/test-warehouse/iceberg_test/hadoop_catalog",
"OBJCAPABILITIES" : "EXTREAD,EXTWRITE",
"write.update.mode" : "merge-on-read",
"storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
"iceberg.catalog" : "hadoop.catalog",
"iceberg.table_identifier" : "ice.iceberg_v2_null_delete_record"
},
"current-snapshot-id" : 5852039568708655222,
"refs" : {
"main" : {
"snapshot-id" : 5852039568708655222,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 5852039568708655222,
"parent-snapshot-id" : 1905028041526857588,
"timestamp-ms" : 1712662161442,
"summary" : {
"operation" : "overwrite",
"added-position-delete-files" : "1",
"added-delete-files" : "1",
"added-files-size" : "1578",
"added-position-deletes" : "2",
"changed-partition-count" : "1",
"total-records" : "4",
"total-files-size" : "2195",
"total-data-files" : "1",
"total-delete-files" : "1",
"total-position-deletes" : "2",
"total-equality-deletes" : "0"
},
"manifest-list" : "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/snap-5852039568708655222-1-3a813d5e-fc0b-485f-bbba-010972a9f20a.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1712662151105,
"snapshot-id" : 1905028041526857588
}, {
"timestamp-ms" : 1712662161442,
"snapshot-id" : 5852039568708655222
} ],
"metadata-log" : [ {
"timestamp-ms" : 1712662124268,
"metadata-file" : "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record/metadata/v1.metadata.json"
}
]
}

View File

@@ -3954,6 +3954,21 @@ hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/i
---- DATASET
functional
---- BASE_TABLE_NAME
iceberg_v2_null_delete_record
---- CREATE
CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
STORED AS ICEBERG
TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
'iceberg.table_identifier'='ice.iceberg_v2_null_delete_record',
'format-version'='2');
---- DEPENDENT_LOAD
`hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_v2_null_delete_record /test-warehouse/iceberg_test/hadoop_catalog/ice
====
---- DATASET
functional
---- BASE_TABLE_NAME
mv1_alltypes_jointbl
---- HIVE_MAJOR_VERSION
3

View File

@@ -92,6 +92,7 @@ table_name:iceberg_v2_delete_equality_multi_eq_ids, constraint:restrict_to, tabl
table_name:iceberg_v2_delete_pos_and_multi_eq_ids, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_no_deletes, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_no_deletes_orc, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_null_delete_record, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_positional_update_all_rows, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_positional_delete_all_rows, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_v2_positional_delete_all_rows_orc, constraint:restrict_to, table_format:parquet/none/none
1 # Table level constraints:
92 table_name:iceberg_v2_positional_delete_all_rows_orc, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_v2_positional_delete_all_rows, constraint:restrict_to, table_format:parquet/none/none
93 table_name:iceberg_v2_positional_not_all_data_files_have_delete_files, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_v2_positional_delete_all_rows_orc, constraint:restrict_to, table_format:parquet/none/none
94 table_name:iceberg_v2_positional_not_all_data_files_have_delete_files_orc, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_v2_positional_not_all_data_files_have_delete_files, constraint:restrict_to, table_format:parquet/none/none
95 table_name:iceberg_v2_positional_not_all_data_files_have_delete_files_orc, constraint:restrict_to, table_format:parquet/none/none
96 table_name:iceberg_v2_partitioned_position_deletes, constraint:restrict_to, table_format:parquet/none/none
97 table_name:iceberg_v2_partitioned_position_deletes_orc, constraint:restrict_to, table_format:parquet/none/none
98 table_name:iceberg_multiple_storage_locations, constraint:restrict_to, table_format:parquet/none/none

View File

@@ -1449,6 +1449,31 @@ class TestIcebergV2Table(IcebergTestSuite):
def test_read_position_deletes(self, vector):
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_null_delete_records(self, vector):
expected_error = 'NULL found as file_path in delete file'
query_options = vector.get_value('exec_option')
v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1
result = self.execute_query(
'select * from functional_parquet.iceberg_v2_null_delete_record', query_options)
assert len(result.data) == 6
errors = result.log
print(errors)
assert expected_error in errors or v2_op_disabled
result = self.execute_query(
'select count(*) from functional_parquet.iceberg_v2_null_delete_record',
query_options)
assert result.data == ['6']
errors = result.log
assert expected_error in errors or v2_op_disabled
result = self.execute_query(
"""select * from functional_parquet.iceberg_v2_null_delete_record
where j < 3""", query_options)
assert sorted(result.data) == ['1\t1', '2\t2']
errors = result.log
assert expected_error in errors or v2_op_disabled
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_equality_deletes(self, vector):