UPDATE FROM statement throws exception when updating Iceberg target
table based on a view. This happens because we didn't correctly
substitute all the expressions used by the IcebergUpdateImpl
class.
This patch fixes expression substitions, so from now VIEWs can also
be used to UPDATE Iceberg tables.
Testing:
* added e2e test
* added Ranger Column Masking test
Change-Id: I80ccdb61327a50082f792a6d51f946b11c467dab
Reviewed-on: http://gerrit.cloudera.org:8080/20825
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Part 2 had some limitations, most importantly it could not update
Iceberg tables if any of the following were true:
* UPDATE value of partitioning column
* UPDATE table that went through partition evolution
* Table has SORT BY properties
The problem with partitions is that the delete record and new
data record might belong to different partitions and records
are shuffled across based on the partitions of the delete
records, hence the data files might not get written efficiently.
The problem with SORT BY properties, is that we need to write
the position delete files ordered by (file_path, position).
To address the above problems, this patch introduces a new
backend operator: IcebergBufferedDeleteSink. This new operator
extracts and aggregates the delete record information from
the incoming row batches, then in FlushFinal it orders the
position delete records and writes them out to files. This
mechanism is similar to Hive's approach:
https://github.com/apache/hive/pull/3251
IcebergBufferedDeleteSink cannot spill to disk, so it can only
run if there's enough memory to store the delete records. Paths
are stored only once, and the int64_t positions are stored in
a vector, so updating 100 Million records per node should require
around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
Spilling could be added later, but currently the need for it is not
too realistic.
Now records can get shuffled around based on the new data records'
partition values, and the SORT operator sorts the records based on
the SORT BY properties.
There's only one case we don't allow the UPDATE statement:
* UPDATE partition column AND
* Right-hand side of assignment is non-constant expression AND
* UPDATE statement has a JOIN
When all of the above conditions meet, it would be possible to
have an incorrect JOIN condition that has multiple matches for the
data records, then the duplicated records would be shuffled
independently (based on the new partition value) to different
backend SINKs, and the different backend SINK would not be able
to detect the duplicates.
If any of the above conditions was false, then the duplicated records
would be shuffled together to the same SINK, that could do the
duplicate check.
This patch also moves some code from IcebergDeleteSink to the
newly introduced IcebergDeleteSinkBase.
Testing:
* planner tests
* e2e tests
* Impala/Hive interop tests
Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
Reviewed-on: http://gerrit.cloudera.org:8080/20760
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds limited UPDATE support for Iceberg tables. The
limitations mean users cannot update Iceberg tables if any of
the following is true:
* UPDATE value of partitioning column
* UPDATE table that went through partition evolution
* Table has SORT BY properties
The above limitations will be resolved by part 3. The usual limitations
like writing non-Parquet files, using copy-on-write, modifying V1 tables
are out of scope of IMPALA-12313.
This patch implements UPDATEs with the merge-on-read technique. This
means the UPDATE statement writes both data files and delete files.
Data files contain the updated records, delete files contain the
position delete records of the old data records that have been
touched.
To achieve the above this patch introduces a new sink: MultiDataSink.
We can configure multiple TableSinks for a single MultiDataSink object.
During execution, the row batches sent to the MultiDataSink will be
forwarded to all the TableSinks that have been registered.
The UPDATE statement for an Iceberg table creates a source select
statement with all table columns and virtual columns INPUT__FILE__NAME
and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema
(i int, s string, k int), and we update the table with:
UPDATE tbl SET k = 5 WHERE i % 100 = 11;
The generated source statement will be ==>
SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION
FROM tbl WHERE i % 100 = 11;
Then we create two table sinks that refer to expressions from the above
source statement:
Insert sink (i, s, 5)
Delete sink (INPUT__FILE__NAME, FILE__POSITION)
The tuples in the rowbatch of MultiDataSink contain slots for all the
above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION).
MultiDataSink forwards each row batch to each registered TableSink.
They will pick their relevant expressions from the tuple and write
data/delete files. The tuples are sorted by INPUTE__FILE__NAME and
FILE__POSITION because we need to write the delete records in this
order.
For partitioned tables we need to shuffle and sort the input tuples.
In this case we also add virtual columns "PARTITION__SPEC__ID" and
"ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle
and sort the rows based on them.
Data files and delete files are now separated in the DmlExecState, so
at the end of the operation we'll have two sets of files. We use these
two sets to create a new Iceberg snapshot.
Why does this patch have the limitations?
- Because we are shuffling and sorting rows based on the delete
records and their partitions. This means that the new data files
might not get written in an efficient way, e.g. there will be
too many of them, or we will need to keep too many open file
handles during writing.
Also, if the table has SORT BY properties, we cannot respect
it as the input rows are ordered in a way to favor the position
deletes.
Part 3 will introduce a buffering writer for position delete
files. This means we will shuffle and sort records based on
the data records' partitions and SORT BY properties while
delete records get buffered and written out at the end (sorted
by file_path and position). In some edge cases the delete records
might not get written efficiently, but it is a smaller problem
then inefficient data files.
Testing:
* negative tests
* planner tests
* update all supported data types
* partitioned tables
* Impala/Hive interop tests
* authz tests
* concurrent tests
Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08
Reviewed-on: http://gerrit.cloudera.org:8080/20677
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>