mirror of
https://github.com/apache/impala.git
synced 2026-02-01 03:00:22 -05:00
This commit adds support for INSERT INTO statements against Iceberg tables when the table is non-partitioned and the underlying file format is Parquet. We still use Impala's HdfsParquetTableWriter to write the data files, though they needed some modifications to conform to the Iceberg spec, namely: * write Iceberg/Parquet 'field_id' for the columns * TIMESTAMPs are encoded as INT64 micros (without time zone) We use DmlExecState to transfer information from the table sink operators to the coordinator, then updateCatalog() invokes the AppendFiles API to add files atomically. DmlExecState is encoded in protobuf, communication with the Frontend uses Thrift. Therefore to avoid defining Iceberg DataFile multiple times they are stored in FlatBuffers. The commit also does some corrections on Impala type <-> Iceberg type mapping: * Impala TIMESTAMP is Iceberg TIMESTAMP (without time zone) * Impala CHAR is Iceberg FIXED Testing: * Added INSERT tests to iceberg-insert.test * Added negative tests to iceberg-negative.test * I also did some manual testing with Spark. Spark is able to read Iceberg tables written by Impala until we use TIMESTAMPs. In that case Spark rejects the data files because it only accepts TIMESTAMPS with time zone. * Added concurrent INSERT tests to test_insert_stress.py Change-Id: I5690fb6c2cc51f0033fa26caf8597c80a11bcd8e Reviewed-on: http://gerrit.cloudera.org:8080/16545 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
176 lines
5.2 KiB
Plaintext
176 lines
5.2 KiB
Plaintext
====
|
|
---- QUERY
|
|
# Create a table that is a subset of 'alltypes' table, i.e. it only
|
|
# contains the data types supported by Iceberg.
|
|
create table iceberg_alltypes(
|
|
id INT COMMENT 'Add a comment',
|
|
bool_col BOOLEAN,
|
|
int_col INT,
|
|
bigint_col BIGINT,
|
|
float_col FLOAT,
|
|
double_col DOUBLE,
|
|
date_col DATE,
|
|
string_col STRING,
|
|
timestamp_col TIMESTAMP
|
|
)
|
|
stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.tables');
|
|
---- RESULTS
|
|
'Table has been created.'
|
|
====
|
|
---- QUERY
|
|
insert into iceberg_alltypes
|
|
select id, bool_col, int_col, bigint_col, float_col, double_col,
|
|
CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
|
|
from functional.alltypes
|
|
order by id
|
|
limit 5;
|
|
---- RESULTS
|
|
: 5
|
|
====
|
|
---- QUERY
|
|
select * from iceberg_alltypes;
|
|
---- RESULTS
|
|
0,true,0,0,0,0,2009-01-01,'0',2009-01-01 00:00:00
|
|
1,false,1,10,1.100000023841858,10.1,2009-01-01,'1',2009-01-01 00:01:00
|
|
2,true,2,20,2.200000047683716,20.2,2009-01-01,'2',2009-01-01 00:02:00.100000000
|
|
3,false,3,30,3.299999952316284,30.3,2009-01-01,'3',2009-01-01 00:03:00.300000000
|
|
4,true,4,40,4.400000095367432,40.4,2009-01-01,'4',2009-01-01 00:04:00.600000000
|
|
---- TYPES
|
|
INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DATE, STRING, TIMESTAMP
|
|
====
|
|
---- QUERY
|
|
# Create table with decimal types
|
|
CREATE TABLE decimal_tbl (
|
|
d1 DECIMAL(9,0),
|
|
d2 DECIMAL(10,0),
|
|
d3 DECIMAL(20,10),
|
|
d4 DECIMAL(38,38),
|
|
d5 DECIMAL(10,5),
|
|
d6 DECIMAL(9,0)
|
|
)
|
|
STORED AS iceberg
|
|
TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
|
|
---- RESULTS
|
|
'Table has been created.'
|
|
====
|
|
---- QUERY
|
|
insert into decimal_tbl select * from functional_parquet.decimal_tbl;
|
|
select * from decimal_tbl;
|
|
---- RESULTS
|
|
1234,2222,1.2345678900,0.12345678900000000000000000000000000000,12345.78900,1
|
|
2345,111,12.3456789000,0.12345678900000000000000000000000000000,3.14100,1
|
|
12345,333,123.4567890000,0.12345678900000000000000000000000000000,11.22000,1
|
|
12345,333,1234.5678900000,0.12345678900000000000000000000000000000,0.10000,1
|
|
132842,333,12345.6789000000,0.12345678900000000000000000000000000000,0.77889,1
|
|
---- TYPES
|
|
DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL, DECIMAL
|
|
====
|
|
---- QUERY
|
|
# Create table with string types. VARCHAR is not supported by Iceberg, so
|
|
# let's substitute it with STRING.
|
|
CREATE TABLE chars_formats (
|
|
cs CHAR(5),
|
|
cl CHAR(140),
|
|
vc STRING
|
|
)
|
|
STORED AS iceberg
|
|
TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
|
|
---- RESULTS
|
|
'Table has been created.'
|
|
====
|
|
---- QUERY
|
|
insert into chars_formats select * from functional_parquet.chars_formats;
|
|
select * from chars_formats;
|
|
---- RESULTS
|
|
'abcde','88db79c70974e02deb3f01cfdcc5daae2078f21517d1021994f12685c0144addae3ce0dbd6a540b55b88af68486251fa6f0c8f9f94b3b1b4bc64c69714e281f388db79c70974','variable length'
|
|
'abc ','8d3fffddf79e9a232ffd19f9ccaa4d6b37a6a243dbe0f23137b108a043d9da13121a9b505c804956b22e93c7f93969f4a7ba8ddea45bf4aab0bebc8f814e09918d3fffddf79e','abc'
|
|
'abcde','68f8c4575da360c32abb46689e58193a0eeaa905ae6f4a5e6c702a6ae1db35a6f86f8222b7a5489d96eb0466c755b677a64160d074617096a8c6279038bc720468f8c4575da3','b2fe9d4638503a57f93396098f24103a'
|
|
---- TYPES
|
|
CHAR, CHAR, STRING
|
|
====
|
|
---- QUERY
|
|
# Create non-Iceberg table with INT96 nanos.
|
|
create table int96_nanos (ts timestamp) stored as parquet;
|
|
====
|
|
---- QUERY
|
|
# Insert edge values as "normal" int96 timestamps that can represent all values.
|
|
set parquet_timestamp_type=INT96_NANOS;
|
|
insert into int96_nanos values
|
|
("1400-01-01"),
|
|
("2019-01-18 00:00:00.000000001"),
|
|
("2019-01-18 00:00:00.000001"),
|
|
("2019-01-18 00:00:00.001"),
|
|
("2019-01-18 23:59:59.999"),
|
|
("2019-01-18 23:59:59.999999"),
|
|
("2019-01-18 23:59:59.999999999")
|
|
====
|
|
---- QUERY
|
|
# Iceberg should write timestamps as INT64 micros.
|
|
create table ts_iceberg (ts timestamp) stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.tables');
|
|
insert into ts_iceberg select * from int96_nanos;
|
|
select * from ts_iceberg;
|
|
---- RESULTS
|
|
1400-01-01 00:00:00
|
|
2019-01-18 00:00:00
|
|
2019-01-18 00:00:00.000001000
|
|
2019-01-18 00:00:00.001000000
|
|
2019-01-18 23:59:59.999000000
|
|
2019-01-18 23:59:59.999999000
|
|
2019-01-18 23:59:59.999999000
|
|
====
|
|
---- QUERY
|
|
# Insert into hadoop catalog.
|
|
create table iceberg_hadoop_cat (i int)
|
|
stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.catalog',
|
|
'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test');
|
|
insert into iceberg_hadoop_cat values (1), (2), (3);
|
|
---- RESULTS
|
|
: 3
|
|
====
|
|
---- QUERY
|
|
select * from iceberg_hadoop_cat;
|
|
---- RESULTS
|
|
1
|
|
2
|
|
3
|
|
---- TYPES
|
|
INT
|
|
====
|
|
---- QUERY
|
|
show files in iceberg_hadoop_cat;
|
|
---- RESULTS: VERIFY_IS_SUBSET
|
|
row_regex:'$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/$DATABASE/iceberg_hadoop_cat/data/.*.0.parq','.*',''
|
|
---- TYPES
|
|
STRING, STRING, STRING
|
|
====
|
|
---- QUERY
|
|
# Insert into hadoop catalog with custom table identifier.
|
|
create table iceberg_hadoop_cat_ti (i int)
|
|
stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.catalog',
|
|
'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test',
|
|
'iceberg.table_identifier'='test.custom_db.int_table');
|
|
insert into iceberg_hadoop_cat_ti values (1), (2), (3);
|
|
---- RESULTS
|
|
: 3
|
|
====
|
|
---- QUERY
|
|
select * from iceberg_hadoop_cat_ti;
|
|
---- RESULTS
|
|
1
|
|
2
|
|
3
|
|
---- TYPES
|
|
INT
|
|
====
|
|
---- QUERY
|
|
show files in iceberg_hadoop_cat_ti;
|
|
---- RESULTS: VERIFY_IS_SUBSET
|
|
row_regex:'$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/test/custom_db/int_table/data/.*.0.parq','.*',''
|
|
---- TYPES
|
|
STRING, STRING, STRING
|
|
====
|