IMPALA-11387: Introduce virtual column to expose Iceberg's file-level data sequence number

Data sequence number is used for deciding whether an equality
delete file should be applied to a data file or not. New files have
higher data sequence numbers than older files. So if the data sequence
number is lower on the data file than on the equality delete file then
we should apply the delete rows on that data file.

Iceberg has two different sequence numbers on a ContentFile level: file
and data sequence number.
For details see the comments on the ContentFile class in Iceberg:
ebce8538db/api/src/main/java/org/apache/iceberg/ContentFile.java (L130)

This patch adds data sequence number as a virtual column for Iceberg
tables and can be queried like:
SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM <iceberg_table>;

Testing:
  - Added E2E tests to exercise the new virtual column for V1, V2
    tables both partitioned and unpartitioned cases.

Change-Id: Id950e97782a2a29b505164470cfb646c5358dfca
Reviewed-on: http://gerrit.cloudera.org:8080/20595
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Gabor Kaszab
2023-10-18 16:23:16 +02:00
committed by Impala Public Jenkins
parent 967ed18407
commit 4ae7b7bb96
9 changed files with 213 additions and 13 deletions

View File

@@ -59,17 +59,31 @@ void FileMetadataUtils::AddFileLevelVirtualColumns(MemPool* mem_pool,
if (template_tuple == nullptr) return;
for (int i = 0; i < scan_node_->virtual_column_slots().size(); ++i) {
const SlotDescriptor* slot_desc = scan_node_->virtual_column_slots()[i];
if (slot_desc->virtual_column_type() != TVirtualColumnType::INPUT_FILE_NAME) {
continue;
if (slot_desc->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) {
StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset());
const char* filename = file_desc_->filename.c_str();
int len = strlen(filename);
char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
Ubsan::MemCpy(filename_copy, filename, len);
slot->ptr = filename_copy;
slot->len = len;
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
} else if (slot_desc->virtual_column_type() ==
TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) {
using namespace org::apache::impala::fb;
const FbIcebergMetadata* ice_metadata =
file_desc_->file_metadata->iceberg_metadata();
DCHECK(ice_metadata != nullptr);
int64_t data_seq_num = ice_metadata->data_sequence_number();
if (data_seq_num > -1) {
int64_t* slot = template_tuple->GetBigIntSlot(slot_desc->tuple_offset());
*slot = data_seq_num;
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
} else {
template_tuple->SetNull(slot_desc->null_indicator_offset());
}
}
StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset());
const char* filename = file_desc_->filename.c_str();
int len = strlen(filename);
char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
Ubsan::MemCpy(filename_copy, filename, len);
slot->ptr = filename_copy;
slot->len = len;
template_tuple->SetNotNull(slot_desc->null_indicator_offset());
}
}

View File

@@ -1125,6 +1125,9 @@ bool HdfsScanPlanNode::HasVirtualColumnInTemplateTuple() const {
} else if (sd->virtual_column_type() ==
TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) {
return true;
} else if (sd->virtual_column_type() ==
TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) {
return true;
} else {
// Adding DCHECK here so we don't forget to update this when adding new virtual
// column.

View File

@@ -47,6 +47,7 @@ table FbIcebergPartitionTransformValue {
table FbIcebergMetadata {
file_format : FbIcebergDataFileFormat;
record_count : long;
data_sequence_number : long;
spec_id : ushort;
partition_keys : [FbIcebergPartitionTransformValue];
}

View File

@@ -79,7 +79,8 @@ enum TVirtualColumnType {
INPUT_FILE_NAME,
FILE_POSITION,
PARTITION_SPEC_ID,
ICEBERG_PARTITION_SERIALIZED
ICEBERG_PARTITION_SERIALIZED,
ICEBERG_DATA_SEQUENCE_NUMBER
}
// TODO: Since compression is also enabled for Kudu columns, we should

View File

@@ -488,6 +488,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
addVirtualColumn(VirtualColumn.FILE_POSITION);
addVirtualColumn(VirtualColumn.PARTITION_SPEC_ID);
addVirtualColumn(VirtualColumn.ICEBERG_PARTITION_SERIALIZED);
addVirtualColumn(VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER);
}
@Override

View File

@@ -45,6 +45,10 @@ public class VirtualColumn extends Column {
public static VirtualColumn ICEBERG_PARTITION_SERIALIZED = new
VirtualColumn("ICEBERG__PARTITION__SERIALIZED", Type.BINARY,
TVirtualColumnType.ICEBERG_PARTITION_SERIALIZED);
public static VirtualColumn ICEBERG_DATA_SEQUENCE_NUMBER = new VirtualColumn(
"ICEBERG__DATA__SEQUENCE__NUMBER",
Type.BIGINT,
TVirtualColumnType.ICEBERG_DATA_SEQUENCE_NUMBER);
public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) {
switch (virtColType) {
@@ -52,6 +56,7 @@ public class VirtualColumn extends Column {
case FILE_POSITION: return FILE_POSITION;
case PARTITION_SPEC_ID: return PARTITION_SPEC_ID;
case ICEBERG_PARTITION_SERIALIZED: return ICEBERG_PARTITION_SERIALIZED;
case ICEBERG_DATA_SEQUENCE_NUMBER: return ICEBERG_DATA_SEQUENCE_NUMBER;
default: break;
}
return null;

View File

@@ -942,6 +942,17 @@ public class IcebergUtil {
if (partKeysOffset != -1) {
FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset);
}
if (cf.dataSequenceNumber() != null) {
FbIcebergMetadata.addDataSequenceNumber(fbb, cf.dataSequenceNumber());
} else {
// According to comments from the Iceberg code, data sequence numbers could be null
// when files were written with "older" Iceberg versions. Quote from the code
// comments of Iceberg's ContentFile.dataSequenceNumber():
// "This method can return null if the data sequence number is unknown. This may
// happen while reading a v2 manifest that did not persist the data sequence number
// for manifest entries with status DELETED (older Iceberg versions)."
FbIcebergMetadata.addDataSequenceNumber(fbb, -1l);
}
return FbIcebergMetadata.endFbIcebergMetadata(fbb);
}

View File

@@ -800,4 +800,10 @@ CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS I
optimize table ice_complex;
---- CATCH
AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns
====
====
---- QUERY
# ICEBERG__DATA__SEQUENCE__NUMBER is not supported for non-Iceberg tables.
SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM functional_parquet.alltypes;
---- CATCH
AnalysisException: Could not resolve column/field reference: 'iceberg__data__sequence__number'
====

View File

@@ -36,4 +36,162 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=false/.*.0.p
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=true/.*.0.parq',0
---- TYPES
STRING, BIGINT
====
====
---- QUERY
# For a V1 Iceberg table the data sequence number is zero.
select col_i, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl order by col_i;
---- TYPES
INT,BIGINT
---- RESULTS
0,0
1,0
3,0
5,0
====
---- QUERY
# select virtual colum without selecting any other slots.
select max(ICEBERG__DATA__SEQUENCE__NUMBER) from ice_tbl;
---- TYPES
BIGINT
---- RESULTS
0
====
---- QUERY
# Testing data sequence number for unpartitioned V2 tables.
create table ice_tbl_v2 (col_i int, col_str string)
stored as iceberg
tblproperties ('format-version'='2');
insert into ice_tbl_v2 values (1, "str1"), (2, "str2"), (3, "str3");
select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
---- RESULTS
1,1,'str1'
1,2,'str2'
1,3,'str3'
---- TYPES
BIGINT,INT,STRING
====
---- QUERY
insert into ice_tbl_v2 values (4, "str4"), (5, "str5");
select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
---- RESULTS
1,1,'str1'
1,2,'str2'
1,3,'str3'
2,4,'str4'
2,5,'str5'
---- TYPES
BIGINT,INT,STRING
====
---- QUERY
delete from ice_tbl_v2 where col_i % 2 = 0;
select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
---- RESULTS
1,1,'str1'
1,3,'str3'
2,5,'str5'
---- TYPES
BIGINT,INT,STRING
====
---- QUERY
insert into ice_tbl_v2 values (6, "str6"), (7, "str7");
select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
---- RESULTS
1,1,'str1'
1,3,'str3'
2,5,'str5'
4,6,'str6'
4,7,'str7'
---- TYPES
BIGINT,INT,STRING
====
---- QUERY
# Testing data sequence number for partitioned V2 tables.
create table ice_tbl_v2_part (col_i int, col_str string)
partitioned by spec (col_str)
stored as iceberg
tblproperties ('format-version'='2');
insert into ice_tbl_v2_part values (1, "part1"), (2, "part1"), (3, "part2");
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
1,'part1',1
2,'part1',1
3,'part2',1
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
insert into ice_tbl_v2_part values (4, "part1"), (5, "part2");
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
1,'part1',1
2,'part1',1
3,'part2',1
4,'part1',2
5,'part2',2
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
# Delete from both partitions
delete from ice_tbl_v2_part where col_i % 2 = 1;
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
2,'part1',1
4,'part1',2
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
insert into ice_tbl_v2_part values (6, "part1"), (7, "part2");
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
2,'part1',1
4,'part1',2
6,'part1',4
7,'part2',4
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
# Delete from one partition, insert into the other and check the data sequence number
delete from ice_tbl_v2_part where col_i = 4;
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
2,'part1',1
6,'part1',4
7,'part2',4
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
insert into ice_tbl_v2_part values (8, "part2");
select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
---- RESULTS
2,'part1',1
6,'part1',4
7,'part2',4
8,'part2',6
---- TYPES
INT,STRING,BIGINT
====
---- QUERY
# Order by ICEBERG__DATA__SEQUENCE__NUMBER while it's not in the select list.
select * from ice_tbl_v2_part order by ICEBERG__DATA__SEQUENCE__NUMBER desc, col_i;
---- RESULTS
8,'part2'
6,'part1'
7,'part2'
2,'part1'
---- TYPES
INT,STRING
====
---- QUERY
# Test when the sequence number comes from a view and is part of a join condition.
with w as (select ICEBERG__DATA__SEQUENCE__NUMBER as seq from ice_tbl_v2_part)
select seq, ap.i, ap.p_bigint from w
join functional_parquet.iceberg_alltypes_part ap on seq = ap.i;
---- RESULTS
1,1,11
---- TYPES
BIGINT,INT,BIGINT
====