Files
jichen0919 7e29ac23da IMPALA-14092 Part2: Support querying of paimon data table via JNI
This patch mainly implement the querying of paimon data table
through JNI based scanner.

Features implemented:
- support column pruning.
The partition pruning and predicate push down will be submitted
as the third part of the patch.

We implemented this by treating the paimon table as normal
unpartitioned table. When querying paimon table:
- PaimonScanNode will decide paimon splits need to be scanned,
  and then transfer splits to BE do the jni-based scan operation.

- We also collect the required columns that need to be scanned,
  and pass the columns to Scanner for column pruning. This is
  implemented by passing the field ids of the columns to BE,
  instead of column position to support schema evolution.

- In the original implementation, PaimonJniScanner will directly
  pass paimon row object to BE, and call corresponding paimon row
  field accessor, which is a java method to convert row fields to
  impala row batch tuples. We find it is slow due to overhead of
  JVM method calling.
  To minimize the overhead, we refashioned the implementation,
  the PaimonJniScanner will convert the paimon row batches to
  arrow recordbatch, which stores data in offheap region of
  impala JVM. And PaimonJniScanner will pass the arrow offheap
  record batch memory pointer to the BE backend.
  BE PaimonJniScanNode will directly read data from JVM offheap
  region, and convert the arrow record batch to impala row batch.

  The benchmark shows the later implementation is 2.x better
  than the original implementation.

  The lifecycle of arrow row batch is mainly like this:
  the arrow row batch is generated in FE,and passed to BE.
  After the record batch is imported to BE successfully,
  BE will be in charge of freeing the row batch.
  There are two free paths: the normal path, and the
  exception path. For the normal path, when the arrow batch
  is totally consumed by BE, BE will call jni to fetch the next arrow
  batch. For this case, the arrow batch is freed automatically.
  For the exceptional path, it happends when query  is cancelled, or memory
  failed to allocate. For these corner cases, arrow batch is freed in the
  method close if it is not totally consumed by BE.

Current supported impala data types for query includes:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE

TODO:
    - Patches pending submission:
        - Support tpcds/tpch data-loading
          for paimon data table.
        - Virtual Column query support for querying
          paimon data table.
        - Query support with time travel.
        - Query support for paimon meta tables.
    - WIP:
        - Snapshot incremental read.
        - Complex type query support.
        - Native paimon table scanner, instead of
          jni based.

Testing:
    - Create tests table in functional_schema_template.sql
    - Add TestPaimonScannerWithLimit in test_scanners.py
    - Add test_paimon_query in test_paimon.py.
    - Already passed the tpcds/tpch test for paimon table, due to the
      testing table data is currently generated by spark, and it is
      not supported by impala now, we have to do this since hive
      doesn't support generating paimon table for dynamic-partitioned
      tables. we plan to submit a separate patch for tpcds/tpch data
      loading and associated tpcds/tpch query tests.
    - JVM Offheap memory leak tests, have run looped tpch tests for
      1 day, no obvious offheap memory increase is observed,
      offheap memory usage is within 10M.

Change-Id: Ie679a89a8cc21d52b583422336b9f747bdf37384
Reviewed-on: http://gerrit.cloudera.org:8080/23613
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
2025-12-05 18:19:57 +00:00
..
2015-02-27 18:48:56 +00:00
2014-05-16 22:26:11 -07:00
2014-04-14 21:07:32 -07:00
2014-04-14 21:07:32 -07:00

bad_parquet_data.parquet:
Generated with parquet-mr 1.2.5
Contains 3 single-column rows:
"parquet"
"is"
"fun"

bad_compressed_dict_page_size.parquet:
Generated by hacking Impala's Parquet writer.
Contains a single string column 'col' with one row ("a"). The compressed_page_size field
in dict page header is modifed to 0 to test if it is correctly handled.

bad_rle_literal_count.parquet:
Generated by hacking Impala's Parquet writer.
Contains a single bigint column 'c' with the values 1, 3, 7 stored
in a single data chunk as dictionary plain. The RLE encoded dictionary
indexes are all literals (and not repeated), but the literal count
is incorrectly 0 in the file to test that such data corruption is
proprly handled.

bad_rle_repeat_count.parquet:
Generated by hacking Impala's Parquet writer.
Contains a single bigint column 'c' with the value 7 repeated 7 times
stored in a single data chunk as dictionary plain. The RLE encoded dictionary
indexes are a single repeated run (and not literals), but the repeat count
is incorrectly 0 in the file to test that such data corruption is proprly
handled.

zero_rows_zero_row_groups.parquet:
Generated by hacking Impala's Parquet writer.
The file metadata indicates zero rows and no row groups.

zero_rows_one_row_group.parquet:
Generated by hacking Impala's Parquet writer.
The file metadata indicates zero rows but one row group.

huge_num_rows.parquet:
Generated by hacking Impala's Parquet writer.
The file metadata indicates 2 * MAX_INT32 rows.
The single row group also has the same number of rows in the metadata.

repeated_values.parquet:
Generated with parquet-mr 1.2.5
Contains 3 single-column rows:
"parquet"
"parquet"
"parquet"

multiple_rowgroups.parquet:
Generated with parquet-mr 1.2.5
Populated with:
hive> set parquet.block.size=500;
hive> INSERT INTO TABLE tbl
      SELECT l_comment FROM tpch.lineitem LIMIT 1000;

alltypesagg_hive_13_1.parquet:
Generated with parquet-mr version 1.5.0-cdh5.4.0-SNAPSHOT
hive> create table alltypesagg_hive_13_1 stored as parquet as select * from alltypesagg;

bad_column_metadata.parquet:
Generated with hacked version of parquet-mr 1.8.2-SNAPSHOT
Schema:
 {"type": "record",
  "namespace": "org.apache.impala",
  "name": "bad_column_metadata",
  "fields": [
      {"name": "id", "type": ["null", "long"]},
      {"name": "int_array", "type": ["null", {"type": "array", "items": ["null", "int"]}]}
  ]
 }
Contains 3 row groups, each with ten rows and each array containing ten elements. The
first rowgroup column metadata for 'int_array' incorrectly states there are 50 values
(instead of 100), and the second rowgroup column metadata for 'id' incorrectly states
there are 11 values (instead of 10). The third rowgroup has the correct metadata.

data-bzip2.bz2:
Generated with bzip2, contains single bzip2 stream
Contains 1 column, uncompressed data size < 8M

large_bzip2.bz2:
Generated with bzip2, contains single bzip2 stream
Contains 1 column, uncompressed data size > 8M

data-pbzip2.bz2:
Generated with pbzip2, contains multiple bzip2 streams
Contains 1 column, uncompressed data size < 8M

large_pbzip2.bz2:
Generated with pbzip2, contains multiple bzip2 stream
Contains 1 column, uncompressed data size > 8M

out_of_range_timestamp.parquet:
Generated with a hacked version of Impala parquet writer.
Contains a single timestamp column with 4 values, 2 of which are out of range
and should be read as NULL by Impala:
   1399-12-31 00:00:00 (invalid - date too small)
   1400-01-01 00:00:00
   9999-12-31 00:00:00
  10000-01-01 00:00:00 (invalid - date too large)

table_with_header.csv:
Created with a text editor, contains a header line before the data rows.

table_with_header_2.csv:
Created with a text editor, contains two header lines before the data rows.

table_with_header.gz, table_with_header_2.gz:
Generated by gzip'ing table_with_header.csv and table_with_header_2.csv.

deprecated_statistics.parquet:
Generated with with hive shell, which uses parquet-mr version 1.5.0-cdh5.12.0-SNAPSHOT
Contains a copy of the data in functional.alltypessmall with statistics that use the old
'min'/'max' fields.

repeated_root_schema.parquet:
Generated by hacking Impala's Parquet writer.
Created to reproduce IMPALA-4826. Contains a table of 300 rows where the
repetition level of the root schema is set to REPEATED.
Reproduction steps:
1: Extend HdfsParquetTableWriter::CreateSchema with the following line:
   file_metadata_.schema[0].__set_repetition_type(FieldRepetitionType::REQUIRED);
2: Run test_compute_stats and grab the created Parquet file for
   alltypes_parquet table.

binary_decimal_dictionary.parquet,
binary_decimal_no_dictionary.parquet:
Generated using parquet-mr and contents verified using parquet-tools-1.9.1.
Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
and non-dictionary encoding respectively.

alltypes_agg_bitpacked_def_levels.parquet:
Generated by hacking Impala's Parquet writer to write out bitpacked def levels instead
of the standard RLE-encoded levels. See
https://github.com/timarmstrong/incubator-impala/tree/hack-bit-packed-levels. This
is a single file containing all of the alltypesagg data, which includes a mix of
null and non-null values. This is not actually a valid Parquet file because the
bit-packed levels are written in the reverse order specified in the Parquet spec
for BIT_PACKED. However, this is the order that Impala attempts to read the levels
in - see IMPALA-3006.

signed_integer_logical_types.parquet:
Generated using a utility that uses the java Parquet API.
The file has the following schema:
  schema {
    optional int32 id;
    optional int32 tinyint_col (INT_8);
    optional int32 smallint_col (INT_16);
    optional int32 int_col;
    optional int64 bigint_col;
  }

min_max_is_nan.parquet:
Generated by Impala's Parquet writer before the fix for IMPALA-6527. Git hash: 3a049a53
Created to test the read path for a Parquet file with invalid metadata, namely when
'max_value' and 'min_value' are both NaN. Contains 2 single-column rows:
NaN
42

bad_codec.parquet:
Generated by Impala's Parquet writer, hacked to use the invalid enum value 5000 for the
compression codec. The data in the file is the whole of the "alltypestiny" data set, with
the same columns: id int, bool_col boolean, tinyint_col tinyint, smallint_col smallint,
int_col int, bigint_col bigint, float_col float, double_col double,
date_string_col string, string_col string, timestamp_col timestamp, year int, month int

num_values_def_levels_mismatch.parquet:
A file with a single boolean column with page metadata reporting 2 values but only def
levels for a single literal value. Generated by hacking Impala's parquet writer to
increment page.header.data_page_header.num_values. This caused Impala to hit a DCHECK
(IMPALA-6589).

rle_encoded_bool.parquet:
Parquet v1 file with RLE encoded boolean column "b" and int column "i".
Created for IMPALA-6324, generated with modified parquet-mr. Contains 279 rows,
139 with value false, and 140 with value true. "i" is always 1 if "b" is True
and always 0 if "b" is false.

dict_encoding_with_large_bit_width.parquet:
Parquet file with a single TINYINT column "i" with 33 rows. Created by a modified
Impala to use 9 bit dictionary indices for encoding. Reading this file used to lead
to DCHECK errors (IMPALA-7147).

decimal_stored_as_int32.parquet:
Parquet file generated by Spark 2.3.1 that contains decimals stored as int32.
Impala needs to be able to read such values (IMPALA-5542)

decimal_stored_as_int64.parquet:
Parquet file generated by Spark 2.3.1 that contains decimals stored as int64.
Impala needs to be able to read such values (IMPALA-5542)

decimal_padded_fixed_len_byte_array.parquet:
Parquet file generated by a hacked Impala where decimals are encoded as
FIXED_LEN_BYTE_ARRAY with an extra byte of padding (IMPALA-2515). The
data is the same as functional.decimal_tbl.

decimal_padded_fixed_len_byte_array2.parquet:
Parquet file generated by a hacked Impala where decimals are encoded as
FIXED_LEN_BYTE_ARRAY with an extra byte of padding (IMPALA-2515).
The Impala was hacked to limit dictionaries to 2000 entries, and
PARQUET_PAGE_ROW_COUNT_LIMIT was set to 1234. The resulted in
a file with two dictionary encoded pages and multiple plain
encoded pages. The values are distributed across the full
range of DECIMAL(10, 0). The file was created as follows:

  create table d(d decimal(10, 0)) stored as parquet;
  set num_nodes=1;
  set PARQUET_PAGE_ROW_COUNT_LIMIT=1234;
  insert into d
  select distinct cast((1000000000 - (o_orderkey * 110503)) % 1000000000 as decimal(9, 0))
  from tpch_parquet.orders where o_orderkey < 50000;


primitive_type_widening.parquet:
Parquet file that contains two rows with the following schema:
- int32 tinyint_col1
- int32 tinyint_col2
- int32 tinyint_col3
- int32 tinyint_col4
- int32 smallint_col1
- int32 smallint_col2
- int32 smallint_col3
- int32 int_col1
- int32 int_col2
- float float_col
It is used to test primitive type widening (IMPALA-6373).

corrupt_footer_len_decr.parquet:
Parquet file that contains one row of the following schema:
- bigint c
The footer size is manually modified (using hexedit) to be the original file size minus
1, to cause metadata deserialization in footer parsing to fail, thus trigger the printing
of an error message with incorrect file offset, to verify that it's fixed by IMPALA-6442.

corrupt_footer_len_incr.parquet:
Parquet file that contains one row of the following schema:
- bigint c
The footer size is manually modified (using hexedit) to be larger than the original file
size and cause footer parsing to fail. It's used to test an error message related to
IMPALA-6442.

hive_single_value_timestamp.parq:
Parquet file written by Hive with the followin schema:
i int, timestamp d
Contains a single row. It is used to test IMPALA-7559 which only occurs when all values
in a column chunk are the same timestamp and the file is written with parquet-mr (which
is used by Hive).

out_of_range_time_of_day.parquet:
IMPALA-7595: Parquet file that contains timestamps where the time part is out of the
valid range [0..24H). Before the fix, select * returned these values:
1970-01-01 -00:00:00.000000001  (invalid - negative time of day)
1970-01-01 00:00:00
1970-01-01 23:59:59.999999999
1970-01-01 24:00:00 (invalid - time of day should be less than a whole day)

strings_with_quotes.csv:
Various strings with quotes in them to reproduce bugs like IMPALA-7586.

int64_timestamps_plain.parq:
Parquet file generated with Parquet-mr that contains plain encoded int64 columns with
Timestamp logical types. Has the following columns:
new_logical_milli_utc, new_logical_milli_local,
new_logical_micro_utc, new_logical_micro_local

int64_timestamps_dict.parq:
Parquet file generated with Parquet-mr that contains dictionary encoded int64 columns
with Timestamp logical types. Has the following columns:
id,
new_logical_milli_utc, new_logical_milli_local,
new_logical_micro_utc, new_logical_micro_local

int64_timestamps_at_dst_changes.parquet:
Parquet file generated with Parquet-mr that contains plain encoded int64 columns with
Timestamp logical types. The file contains 3 row groups, and all row groups contain
3 distinct values, so there is a "min", a "max", and a "middle" value. The values were
selected in such a way that the UTC->CET conversion changes the order of the values (this
is possible during Summer->Winter DST change) and "middle" falls outside the "min".."max"
range after conversion. This means that a naive stat filtering implementation could drop
"middle" incorrectly.
Example (all dates are 2017-10-29):
UTC: 00:45:00, 01:00:00, 01:10:00 =>
CET: 02:45:00, 02:00:00, 02:10:00
Columns: rawvalue bigint, rowgroup int, millisutc timsestamp, microsutc timestamp

int64_timestamps_nano.parquet:
Parquet file generated with Parquet-mr that contains int64 columns with nanosecond
precision. Tested separately from the micro/millisecond columns because of the different
valid range.
Columns: rawvalue bigint, nanoutc timestamp, nanononutc timestamp

out_of_range_timestamp_hive_211.parquet:
Hive-generated file with an out-of-range timestamp. Generated with Hive 2.1.1 using
the following query:
create table alltypes_hive stored as parquet as
select * from functional.alltypes
union all
select -1, false, 0, 0, 0, 0, 0, 0, '', '', cast('1399-01-01 00:00:00' as timestamp), 0, 0

out_of_range_timestamp2_hive_211.parquet:
Hive-generated file with out-of-range timestamps every second value, to exercise code
paths in Parquet scanner for non-repeated runs. Generated with Hive 2.1.1 using
the following query:
create table hive_invalid_timestamps stored as parquet as
select id,
  case id % 3
    when 0 then timestamp_col
    when 1 then NULL
    when 2 then cast('1300-01-01 9:9:9' as timestamp)
  end timestamp_col
from functional.alltypes
sort by id

decimal_rtf_tbl.txt:
This was generated using formulas in Google Sheets.  The goal was to create various
decimal values that covers the 3 storage formats with various precision and scale.
This is a reasonably large table that is used for testing min-max filters
with decimal types on Kudu.

decimal_rtf_tiny_tbl.txt:
Small table with specific decimal values picked from decimal_rtf_tbl.txt so that
min-max filter based pruning can be tested with decimal types on Kudu.

date_tbl.orc
Small orc table with one DATE column, created by Hive.

date_tbl.avro
Small avro table with one DATE column, created by Hive.

date_tbl.parquet
Small parquet table with one DATE column, created by Parquet MR.

out_of_range_date.parquet:
Generated with a hacked version of Impala parquet writer.
Contains a single DATE column with 9 values, 4 of which are out of range
and should be read as NULL by Impala:
  -0001-12-31 (invalid - date too small)
   0000-01-01 (invalid - date too small)
   0000-01-02 (invalid - date too small)
   1969-12-31
   1970-01-01
   1970-01-02
   9999-12-30
   9999-12-31
  10000-01-01 (invalid - date too large)

out_of_range_date.orc:
Created using a pre-3.1. Hive version (2.1.1.) to contain an out-of-range date value.
Took advantage of the incompatibility between Hive and Impala when Hive (before 3.1)
writes a date before 1582-10-15 is interpreted incorrectly by Impala. The values I wrote
with Hive:
2019-10-04, 1582-10-15, 0001-01-01, 9999-12-31
This is interpreted by Impala as:
2019-10-04, 1582-10-15, 0000-12-30 (invalid - date too small), 9999-12-31



hive2_pre_gregorian.parquet:
Small parquet table with one DATE column, created by Hive 2.1.1.
Used to demonstrate parquet interoperability issues between Hive and Impala for dates
before the introduction of Gregorian calendar in 1582-10-15.

hive2_pre_gregorian.orc:
Same as the above but in ORC format instead of Parquet.

decimals_1_10.parquet:
Contains two decimal columns, one with precision 1, the other with precision 10.
I used Hive 2.1.1 with a modified version of Parquet-MR (6901a20) to create tiny,
misaligned pages in order to test the value-skipping logic in the Parquet column readers.
The modification in Parquet-MR was to set MIN_SLAB_SIZE to 1. You can find the change
here: https://github.com/boroknagyz/parquet-mr/tree/tiny_pages
hive  --hiveconf parquet.page.row.count.limit=5 --hiveconf parquet.page.size=5
 --hiveconf parquet.enable.dictionary=false --hiveconf parquet.page.size.row.check.min=1
create table decimals_1_10 (d_1 DECIMAL(1, 0), d_10 DECIMAL(10, 0)) stored as PARQUET
insert into decimals_1_10 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5),
                            (NULL, 1), (2, 2), (3, 3), (4, 4), (5, 5),
                            (1, 1), (NULL, 2), (3, 3), (4, 4), (5, 5),
                            (1, 1), (2, 2), (NULL, 3), (4, 4), (5, 5),
                            (1, 1), (2, 2), (3, 3), (NULL, 4), (5, 5),
                            (1, 1), (2, 2), (3, 3), (4, 4), (NULL, 5),
                            (NULL, 1), (NULL, 2), (3, 3), (4, 4), (5, 5),
                            (1, 1), (NULL, 2), (3, 3), (NULL, 4), (5, 5),
                            (1, 1), (2, 2), (3, 3), (NULL, 4), (NULL, 5),
                            (NULL, 1), (2, 2), (NULL, 3), (NULL, 4), (5, 5),
                            (1, 1), (2, 2), (3, 3), (4, 4), (5, NULL);

nested_decimals.parquet:
Contains two columns, one is a decimal column, the other is an array of decimals.
I used Hive 2.1.1 with a modified Parquet-MR, see description at decimals_1_10.parquet.
hive  --hiveconf parquet.page.row.count.limit=5 --hiveconf parquet.page.size=16
 --hiveconf parquet.enable.dictionary=false --hiveconf parquet.page.size.row.check.min=1
create table nested_decimals (d_38 Decimal(38, 0), arr array<Decimal(1, 0)>) stored as parquet;
insert into nested_decimals select 1, array(cast (1 as decimal(1,0)), cast (1 as decimal(1,0)), cast (1 as decimal(1,0)) ) union all
                            select 2, array(cast (2 as decimal(1,0)), cast (2 as decimal(1,0)), cast (2 as decimal(1,0)) ) union all
                            select 3, array(cast (3 as decimal(1,0)), cast (3 as decimal(1,0)), cast (3 as decimal(1,0)) ) union all
                            select 4, array(cast (4 as decimal(1,0)), cast (4 as decimal(1,0)), cast (4 as decimal(1,0)) ) union all
                            select 5, array(cast (5 as decimal(1,0)), cast (5 as decimal(1,0)), cast (5 as decimal(1,0)) ) union all

                            select 1, array(cast (1 as decimal(1,0)) ) union all
                            select 2, array(cast (2 as decimal(1,0)), cast (2 as decimal(1,0)) ) union all
                            select 3, array(cast (3 as decimal(1,0)), cast (3 as decimal(1,0)), cast (3 as decimal(1,0)) ) union all
                            select 4, array(cast (4 as decimal(1,0)), cast (4 as decimal(1,0)), cast (4 as decimal(1,0)), cast (4 as decimal(1,0)) ) union all
                            select 5, array(cast (5 as decimal(1,0)), cast (5 as decimal(1,0)), cast (5 as decimal(1,0)), cast (5 as decimal(1,0)), cast (5 as decimal(1,0)) ) union all

                            select 1, array(cast (NULL as decimal(1, 0)), NULL, NULL) union all
                            select 2, array(cast (2 as decimal(1,0)), NULL, NULL) union all
                            select 3, array(cast (3 as decimal(1,0)), NULL, cast (3 as decimal(1,0))) union all
                            select 4, array(NULL, cast (4 as decimal(1,0)), cast (4 as decimal(1,0)), NULL) union all
                            select 5, array(NULL, cast (5 as decimal(1,0)), NULL, NULL, cast (5 as decimal(1,0)) ) union all

                            select 6, array(cast (6 as decimal(1,0)), NULL, cast (6 as decimal(1,0)) ) union all
                            select 7, array(cast (7 as decimal(1,0)), cast (7 as decimal(1,0)), cast (7 as decimal(1,0)), NULL ) union all
                            select 8, array(NULL, NULL, cast (8 as decimal(1,0)) ) union all
                            select 7, array(cast (7 as decimal(1,0)), cast (7 as decimal(1,0)), cast (7 as decimal(1,0)) ) union all
                            select 6, array(NULL, NULL, NULL, cast (6 as decimal(1,0)) );

double_nested_decimals.parquet:
Contains two columns, one is a decimal column, the other is an array of arrays of
decimals. I used Hive 2.1.1 with a modified Parquet-MR, see description
at decimals_1_10.parquet.
hive  --hiveconf parquet.page.row.count.limit=5 --hiveconf parquet.page.size=16
  --hiveconf parquet.enable.dictionary=false --hiveconf parquet.page.size.row.check.min=1
create table double_nested_decimals (d_38 Decimal(38, 0), arr array<array<Decimal(1, 0)>>) stored as parquet;
insert into double_nested_decimals select 1, array(array(cast (1 as decimal(1,0)), cast (1 as decimal(1,0)) )) union all
                                   select 2, array(array(cast (2 as decimal(1,0)), cast (2 as decimal(1,0)) )) union all
                                   select 3, array(array(cast (3 as decimal(1,0)), cast (3 as decimal(1,0)), cast (3 as decimal(1,0)) )) union all
                                   select 4, array(array(cast (4 as decimal(1,0)), cast (4 as decimal(1,0)), cast (4 as decimal(1,0)) )) union all
                                   select 5, array(array(cast (5 as decimal(1,0)), cast (5 as decimal(1,0)), cast (5 as decimal(1,0)) )) union all

                                   select 1, array(array(cast (1 as decimal(1,0))), array(cast (1 as decimal(1,0))), array(cast (1 as decimal(1,0))) ) union all
                                   select 2, array(array(cast (2 as decimal(1,0))), array(cast (2 as decimal(1,0))) ) union all
                                   select 3, array(array(cast (3 as decimal(1,0))), array(cast (3 as decimal(1,0))), array(cast (3 as decimal(1,0))) ) union all
                                   select 4, array(array(cast (4 as decimal(1,0))), array(cast (4 as decimal(1,0))) ) union all
                                   select 5, array(array(cast (5 as decimal(1,0))), array(cast (5 as decimal(1,0))) ) union all

                                   select 1, array(array(cast (1 as decimal(1,0))) ) union all
                                   select 2, array(array(cast (2 as decimal(1,0))), array(cast (2 as decimal(1,0))) ) union all
                                   select 3, array(array(cast (3 as decimal(1,0))), array(cast (3 as decimal(1,0))), array(cast (3 as decimal(1,0))) ) union all
                                   select 4, array(array(cast (4 as decimal(1,0))), array(cast (4 as decimal(1,0))) ) union all
                                   select 5, array(array(cast (5 as decimal(1,0))) ) union all

                                   select 1, array(array(cast (1 as decimal(1,0))), array(cast (1 as decimal(1,0))), array(cast (1 as decimal(1,0))) ) union all
                                   select 2, array(array(cast (2 as decimal(1,0))), array(cast (2 as decimal(1,0))) ) union all
                                   select 3, array(array(cast (3 as decimal(1,0))) ) union all
                                   select 4, array(array(cast (4 as decimal(1,0))), array(cast (4 as decimal(1,0))) ) union all
                                   select 5, array(array(cast (5 as decimal(1,0))), array(cast (5 as decimal(1,0))), array(cast (5 as decimal(1,0))) ) union all

                                   select 1, array(array(cast (1 as decimal(1,0))), array(cast (1 as decimal(1,0)), cast (1 as decimal(1,0))) ) union all
                                   select 2, array(array(cast (2 as decimal(1,0))) ) union all
                                   select 3, array(array(cast (3 as decimal(1,0)), cast (3 as decimal(1,0))), array(cast (3 as decimal(1,0))) ) union all
                                   select 4, array(array(cast (4 as decimal(1,0))), array(cast (4 as decimal(1,0)), cast (4 as decimal(1,0))), array(cast (4 as decimal(1,0))) ) union all
                                   select 5, array(array(cast (5 as decimal(1,0))), array(cast (5 as decimal(1,0))), array(cast (5 as decimal(1,0))) ) union all

                                   select 1, array(array(cast (NULL as decimal(1,0))), array(cast (NULL as decimal(1,0))), array(cast (1 as decimal(1,0))) ) union all
                                   select 2, array(array(cast (NULL as decimal(1,0))), array(cast (NULL as decimal(1,0))), array(cast (NULL as decimal(1,0))) ) union all
                                   select 3, array(array(cast (NULL as decimal(1,0))), array(cast (3 as decimal(1,0))), NULL ) union all
                                   select 4, array(NULL, NULL, array(cast (NULL as decimal(1,0)), NULL, NULL, NULL, NULL) ) union all
                                   select 5, array(array(NULL, cast (5 as decimal(1,0)), NULL, NULL, NULL) ) union all

                                   select 6, array(array(cast (6 as decimal(1,0)), NULL), array(cast (6 as decimal(1,0))) ) union all
                                   select 7, array(array(cast (7 as decimal(1,0)), cast (7 as decimal(1,0))), NULL ) union all
                                   select 8, array(array(NULL, NULL, cast (8 as decimal(1,0))) ) union all
                                   select 7, array(array(cast (7 as decimal(1,0)), cast (NULL as decimal(1,0))), array(cast (7 as decimal(1,0))) ) union all
                                   select 6, array(array(NULL, NULL, cast (6 as decimal(1,0))), array(NULL, cast (6 as decimal(1,0))) );

alltypes_tiny_pages.parquet:
Created from 'functional.alltypes' with small page sizes.
I used Hive 2.1.1 with a modified Parquet-MR, see description at decimals_1_10.parquet.
I used the following commands to create the file:
hive  --hiveconf parquet.page.row.count.limit=90 --hiveconf parquet.page.size=90 --hiveconf parquet.page.size.row.check.min=7
create table alltypes_tiny_pages stored as parquet as select * from functional_parquet.alltypes

alltypes_tiny_pages_plain.parquet:
Created from 'functional.alltypes' with small page sizes without dictionary encoding.
I used Hive 2.1.1 with a modified Parquet-MR, see description at decimals_1_10.parquet.
I used the following commands to create the file:
hive  --hiveconf parquet.page.row.count.limit=90 --hiveconf parquet.page.size=90 --hiveconf parquet.enable.dictionary=false  --hiveconf parquet.page.size.row.check.min=7
create table alltypes_tiny_pages_plain stored as parquet as select * from functional_parquet.alltypes

parent_table:
Created manually. Contains two columns, an INT and a STRING column. Together they form primary key for the table. This table is used to test primary key and foreign key
relationships along with parent_table_2 and child_table.

parent_table_2:
Created manually. Contains just one int column which is also the table's primary key. This table is used to test primary key and foreign key
relationships along with parent_table and child_table.

child_table:
Created manually. Contains four columns. 'seq' column is the primary key of this table. ('id', 'year') form a foreign key referring to parent_table('id', 'year') and 'a' is a
foreign key referring to parent_table_2's primary column 'a'.

out_of_range_timestamp.orc:
Created with Hive. ORC file with a single timestamp column 'ts'.
Contains one row (1300-01-01 00:00:00) which is outside Impala's valid time range.

corrupt_schema.orc:
ORC file from IMPALA-9277, generated by fuzz test. The file contains malformed metadata.

corrupt_root_type.orc:
ORC file for IMPALA-9249, generated by fuzz test. The root type of the schema is not
struct, which used to hit a DCHECK.

hll_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches HLL sketches created by Hive. Each column contains a sketch for a
specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
STRING, CHAR and VARCHAR. Has an additional column for NULL values.

hll_sketches_from_impala.parquet:
This holds the same sketches as hll_sketches_from_hive.parquet but these sketches were
created by Impala instead of Hive.

cpc_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches CPC sketches created by Hive. Each column contains a sketch for a
specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
STRING, CHAR and VARCHAR. Has an additional column for NULL values.

cpc_sketches_from_impala.parquet:
This holds the same sketches as cpc_sketches_from_hive.parquet but these sketches were
created by Impala instead of Hive.

theta_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches Theta sketches created by Hive. Each column contains a sketch for a
specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
STRING, CHAR and VARCHAR. Has an additional column for NULL values.

theta_sketches_from_impala.parquet:
This holds the same sketches as theta_sketches_from_hive.parquet but these sketches were
created by Impala instead of Hive.

kll_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches KLL sketches created by Hive. Each column is for a different purpose:
  - 'f': Float with distinct values.
  - 'repetitions': Float with some repetition in the values.
  - 'some_nulls': Float values and some NULLs.
  - 'all_nulls': All values are NULLs.
  - 'some_nans': Floats with some NaN values.
  - 'all_nans': All values are NaNs.

kll_sketches_from_impala.parquet:
This holds the same sketches as kll_sketches_from_hive.parquet but these sketches were
created by Impala instead of Hive.

iceberg:
IMPALA-9741: Support querying Iceberg table by impala
We generated data by spark-shell, version is 2.4.x, and there are two tables' data in
testdata/data/iceberg_test. Iceberg table location contains
two directories: metadata, which contains table metadata managed
by iceberg; data, which contains the data files.

iceberg_test/hadoop_catalog/ice/complextypestbl_iceberg_orc:
Iceberg table generated by Hive 3.1 + Iceberg 0.11. Originally it was a HiveCatalog
table, so I've renamed the metadata JSON files and added a version-hint.text file.
I've also edited the metadata JSON and AVRO files to remove 'hdfs://localhost:20500',
and updated the file paths. Now it can be used as a HadoopCatalog table.

hudi_parquet:
IMPALA-8778: Support read Apache Hudi tables
Hudi parquet is a special format of parquet files managed by Apache Hudi
(hudi.incubator.apache.org) to provide ACID transaction.
In order to provide snapshot isolation between writer and queries,
Hudi will write a newer version of the existing parquet file
if there is any update comes into the file.
Hudi store the indexing information and version information in the file name.
For example:
`ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-5-10_20200112194517.parquet`
`ca51fa17-681b-4497-85b7-4f68e7a63ee7-0` is the bloom index hash of this file
`20200112194517` is the timestamp of this version
If there is a record was updated in this file, Hudi will write a new file with
the same indexing hash but a newer version depends on the time of writing.
`ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-38-282_20200112194529.parquet`
If the impala table was refreshed after this file was written, impala will
only query on the file with latest version.

streaming.orc:
ORC file generated by Hive Streaming Ingestion. I used a slightly altered version of
TestStreaming.testNoBuckets() from Hive 3.1 to generate this file. It contains
values coming from two transactions. The file has two stripes (one per transaction).

alltypes_non_acid.orc:
Non-acid ORC file generated by Hive 3.1 with the following command:
CREATE TABLE alltypes_clone STORED AS ORC AS SELECT * FROM functional.alltypes.
It's used as an original file in ACID tests.

dateless_timestamps.parq:
Parquet file generated before the removal of dateless timestamp support.
Created as: CREATE TABLE timestamps_pq(t TIMESTAMP) STORED AS parquet;
It contains the folloving lines:
1996-04-22 10:00:00.432100000
1996-04-22 10:00:00.432100000
1996-04-22 10:00:00
1996-04-22 10:00:00
1996-04-22 00:00:00
10:00:00.432100000
10:00:00

full_acid_schema_but_no_acid_version.orc
IMPALA-10115: Impala should check file schema as well to check full ACIDv2 files
Genereted by query-based compaction by Hive 3.1. The file has full ACIDv2 schema, but
doesn't have the file user metadata 'hive.acid.version'.

alltypes_empty_pages.parquet
Parquet file that contians empty data pages. Needed to test IMPALA-9952.
Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I modified
HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page:
int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true;
Also modified HdfsParquetTableWriter::NewPage() to randomly insert empty pages:
if (r ... ) pages_.push_back(DataPage());

alltypes_invalid_pages.parquet
Parquet file that contains invalid data pages Needed to test IMPALA-9952.
Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I modified
HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page:
int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true;
Also modified HdfsParquetTableWriter::BaseColumnWriter::Flush to randomly invalidate
the offset index:
if (r ... ) location.offset = -1;

customer_multiblock_page_index.parquet
Parquet file that contains multiple blocks in a single file Needed to test IMPALA-10310.
In order to generate this file, execute the following instruments in beeline
(Beeline version 3.1.3000.7.3.1.0-160 by Apache Hive):
1. SET parquet.block.size=8192;         // use little block size
2. SET parquet.page.row.count.limit=10; // little page row count generate multi pages
3. CREATE TABLE customer_multiblock_page_index_6
   STORED AS PARQUET
   TBLPROPERTIES('parquet.compression'='SNAPPY')
   AS SELECT * FROM tpcds.customer
   WHERE c_current_cdemo_sk IS NOT NULL
   ORDER BY c_current_cdemo_sk, c_customer_sk
   LIMIT 2000;
generated file will contains multi blocks, multi pages per block.

customer_nested_multiblock_multipage.parquet
Parquet file that contains multiple row groups multiple pages and store nested
data.
Used Hive (version 3.1.3000.7.2.16.0-233) to generate Parquet file:
1. SET parquet.block.size=8192;
2. SET parquet.page.row.count.limit=20;
3. CREATE TABLE customer_nested_multiblock_multipage
   LIKE tpch_nested_parquet.customer STORED AS PARQUET;
4. INSERT INTO customer_nested_multiblock_multipage
   SELECT * FROM tpch_nested_parquet.customer ORDER BY c_custkey LIMIT 300;

IMPALA-10361: Use field id to resolve columns for Iceberg tables
We generated data by spark-shell, version is 2.4.x, and table data is in
testdata/data/iceberg_test/hadoop_catalog/iceberg_resolution_test, this table
are generated with HadoopCatalog and Parquet fileformat. We use this table to
test complex types for field id resolving.

alltypes_tiny_rle_dictionary.parquet:
Tiny file using the RLE_DICTIONARY encoding.
Started impala with -write_new_parquet_dictionary_encodings=true
  set max_fs_writers=1;
  create table att stored as parquet as
  select * from functional_parquet.alltypestiny;

timestamp_with_local_timezone.orc:
ORC file that contains column with type 'timestamp with local timezone'.
Generated by Spark/Iceberg.

parquet-bloom-filtering.parquet:
Generated by hacking
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
(65b95fb72be8f5a8a193a6f7bc4560fdcd742fc7).
The schema was completely changed to allow us to test types supported in Parquet Bloom
filters.

ComplexTypesTbl/arrays.orc and arrays.parq
These tables hold 3 columns, an int ID and two arrays, one with int and the second with
string. The purpose of introducing these tables is to give more test coverage for zipping
unnests. There are rows where the 2 arrays are of the same lenght, or one of them is
longer than the other plus there are NULL and empty arrays as well.

binary_decimal_precision_and_scale_widening.parquet
Parquet file written with schema (decimal(9,2), decimal(18,2), decimal(38,2)). The rows
inside the file are carefully chosen so that they don't cause an overflow when being read
by an Impala table with a higher precision/scale.

iceberg_test/hadoop_catalog/ice/airports_parquet:
Regular Parquet table converted to Iceberg, which means that the data file doesn't contain
field ids.

iceberg_test/hadoop_catalog/ice/airports_orc:
Regular ORC table converted to Iceberg, which means that the data file doesn't contain
field ids.

too_many_def_levels.parquet:
Written by Impala 2.5. The Parquet pages have more encoded def levels than num_values.

partition_col_in_parquet.parquet:
Written by Impala 4.0. Parquet file with INT and DATE column. Values in the DATE columns
are identical. There's only a single value per page in the Parquet file (written by
setting query option 'parquet_page_row_count_limit' to 1).

no_scale.parquet
Generated by code injection, removed scale from written parquet files:
Status HdfsParquetTableWriter::WriteFileFooter() {
+  file_metadata_.schema[1].__set_scale(1);
+  file_metadata_.schema[1].__isset.scale = false;
+  file_metadata_.schema[1].logicalType.DECIMAL.scale = 1;
+  file_metadata_.schema[1].logicalType.__isset.DECIMAL = false;
+  file_metadata_.schema[1].__isset.logicalType = false;
create table my_decimal_tbl (d1 decimal(4,2)) stored as parquet;
insert into my_decimal_tbl values (cast(0 as decimal(4,2)));

iceberg_test/hadoop_catalog/ice/alltypes_part:
iceberg_test/hadoop_catalog/ice/alltypes_part_orc:
Generated by Hive 3.1 + Iceberg 0.11. Then the JSON and AVRO files were manually edited
to make these tables correspond to an Iceberg table in a HadoopCatalog instead of
HiveCatalog.
alltypes_part has PARQUET data files, alltypes_part_orc has ORC data files. They have
identity partitions with all the supported data types.

iceberg_test/hadoop_catalog/ice/iceberg_legacy_partition_schema_evolution:
iceberg_test/hadoop_catalog/ice/iceberg_legacy_partition_schema_evolution_orc:
Generated by Hive 3.1 + Iceberg 0.11. Then the JSON and AVRO files were manually edited
to make these tables correspond to an Iceberg table in a HadoopCatalog instead of
HiveCatalog.
iceberg_legacy_partition_schema_evolution has PARQUET data files,
iceberg_legacy_partition_schema_evolution_orc has ORC data files.
The tables that have the following schema changes since table migration:
* Partition INT column to BIGINT
* Partition FLOAT column to DOUBLE
* Partition DECIMAL(5,3) column to DECIMAL(8,3)
* Non-partition column has been moved to end of the schema

iceberg_test/hadoop_catalog/ice/iceberg_timestamp_part:
Written by Hive, contains Iceberg TIMESTAMP type and the table is partitioned by HOUR(ts).
create table iceberg_timestamp_part (i int, ts timestamp) partitioned by spec (hour(ts))  stored by iceberg;
insert into iceberg_timestamp_part values (-2, '1969-01-01 01:00:00'), (-1, '1969-01-01 01:15:00'), (1, '2021-10-31 02:15:00'), (2, '2021-01-10 12:00:00'), (3, '2022-04-11 00:04:00'), (4, '2022-04-11 12:04:55');

iceberg_test/hadoop_catalog/ice/iceberg_timestamptz_part:
Written by Hive, contains Iceberg TIMESTAMPTZ type and the table is partitioned by
HOUR(ts). The local timezone was 'Europe/Budapest';
create table iceberg_timestamptz_part (i int, ts timestamp with local time zone) partitioned by spec (hour(ts))  stored by iceberg;
insert into iceberg_timestamptz_part values (-2, '1969-01-01 01:00:00'), (-1, '1969-01-01 01:15:00'), (0, '2021-10-31 00:15:00 UTC'), (1, '2021-10-31 01:15:00 UTC'), (2, '2021-01-10 12:00:00'), (3, '2022-04-11 00:04:00'), (4, '2022-04-11 12:04:55');

iceberg_test/hadoop_catalog/ice/iceberg_uppercase_col:
Generated by Impala, then modified the metadata.json file to contain uppercase characters.

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_positional:
Generated by Spark 3.2 + Iceberg 0.13. Then the JSON and AVRO files were manually edited
to make these tables correspond to an Iceberg table in a HadoopCatalog instead of
HiveCatalog.
The table has a positional delete file.

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_equality:
Since Hive/Spark is unable to write equality delete files Flink has been used to create
an equality delete file by overwriting an existing unique row with the following
statements:
    CREATE TABLE `ssb`.`ssb_default`.`iceberg_v2_delete_equality` (
      `id`  BIGINT UNIQUE COMMENT 'unique id',
      `data` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
    ) with ('format-version'='2',
    'write.upsert.enabled'='true',
    'connector' = 'iceberg',
    'catalog-database' = 'test_db',
    'catalog-type' = 'hive',
    'catalog-name' = 'iceberg_hive_catalog',
    'hive-conf-dir' = '/etc/hive/conf',
    'engine.hive.enabled' = 'true'
    );
    insert into iceberg_v2_delete_equality values (1, 'test_1_base');
    insert into iceberg_v2_delete_equality values (2, 'test_2_base');
    insert into iceberg_v2_delete_equality values (2, 'test_2_updated');
This table was created with HDFS absolute paths, which were replaced with the script
specified in `iceberg_test/hadoop_catalog/ice/iceberg_v2_no_deletes`.

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_equality_nulls:
This table has an equality delete file that contains NULL value. Created by a hacked
Impala where IcebergCatalogOpExecutor is changed to write equality delete metadata
instead of position delete metadata when running a DELETE FROM statement. In a second
step the underlying delete file was replaced by another parquet file with the desired
content.
The content:
1: insert into functional_parquet.iceberg_v2_delete_equality_nulls values (1, "str1"), (null, "str2"), (3, "str3");
2: EQ-delete file for the first column with values: (null), (3)
3: insert into functional_parquet.iceberg_v2_delete_equality_nulls values (4, "str4"), (null, "str5");
As a result 2 values (including the row with null) will be dropped from the first data
file, while there is going to be another data file containing a null value that has
greater data sequence number than the delete file.

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos:
This table is created by Flink with 2 columns as primary key. Some data and equality
delete files were added by Flink, and then Impala was used for dropping a row by writing
a positional delete file.
Steps:
1-Flink:
  create table ice.iceberg_v2_delete_both_eq_and_pos
    (i int, s string, d date, primary key (i, d) not enforced)
    with ('format-version'='2', 'write.upsert.enabled'='true');
2: Flink:
  insert into ice.iceberg_v2_delete_both_eq_and_pos values
    (1, 'str1', to_date('2023-12-13')),
    (2, 'str2', to_date('2023-12-13'));
3-Flink:
  insert into ice.iceberg_v2_delete_both_eq_and_pos values
    (3, 'str3', to_date('2023-12-23')),
    (2, 'str2_updated', to_date('2023-12-13'));
4-Impala: delete from functional_parquet.iceberg_v2_delete_both_eq_and_pos where i = 1;

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_equality_partitioned:
Flink is used for creating this test table. The statements executed are the follow:
1: Create the table with the partition column part of the primary key. This is enforced
by Flink:
  create table ice.iceberg_v2_delete_equality_partitioned
    (i int, s string, d date, primary key (d, s) not enforced)
    partitioned by (d)
  with ('format-version'='2', 'write.upsert.enabled'='true');
2: Populate one partition.
  insert into ice.iceberg_v2_delete_equality_partitioned partition (d='2023-12-24') values
    (1, 'str1'), (2, 'str2'), (3, 'str3');
3: Populate another partition
  insert into ice.iceberg_v2_delete_equality_partitioned partition (d='2023-12-25') values
    (1, 'str1'), (2, 'str2');
4: Update one row in the first partiton and add a new row.
  insert into ice.iceberg_v2_delete_equality_partitioned partition (d='2023-12-24') values
    (333333, 'str3'), (4, 'str4');
5: Update one row in the second partition.
  insert into ice.iceberg_v2_delete_equality_partitioned partition (d='2023-12-25') values
    (222, 'str2');

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_equality_partition_evolution:
Flink is used to create and populate this simple table to have an equlity delete file.
Impala is used for doing some partition evolution on this table.
1-Flink:
  create table ice.iceberg_v2_delete_equality_partition_evolution
    (i int, s string, d date, primary key (d, s) not enforced)
    partitioned by (d)
    with ('format-version'='2', 'write.upsert.enabled'='true');
2-Flink:
  insert into ice.iceberg_v2_delete_equality_partition_evolution
    partition (d='2023-12-24') values (1, 'str1'), (2, 'str2');
3-Flink:
  insert into ice.iceberg_v2_delete_equality_partition_evolution
    partition (d='2023-12-24') values (111, 'str1');
4-Impala:
  alter table functional_parquet.iceberg_v2_delete_equality_partition_evolution
    set partition spec (d, i);

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_equality_multi_eq_ids:
Used Impala and Nifi to create a table that has equality delete files with different
equality field ID lists. Steps:
1-Impala:
  create table functional_parquet.iceberg_v2_delete_equality_multi_eq_ids
      (i int not null, s string not null, primary key(i))
      STORED AS ICEBERG
      TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
              'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
              'iceberg.table_identifier'='ice.iceberg_v2_delete_equality_multi_eq_ids',
              'format-version'='2');
2-Nifi: Insert 3 rows in one data file:
  (1, "str1"), (2, "str2"), (3, "str3")
3-Nifi: Update a row using column 'i' as PK:
  (3, "str3_updated")
4: Manually edited 'identifier-field-ids' from [1] to [2]
5-Nifi: In One step insert new rows and update existing ones using column 's' as PK:
  Insert (4, "str4"), (5, "str5")
  Update (2222, "str2"), (3333, "str3_updated")
6: Manually edited 'identifier-field-ids' from [2] to [1,2]
7: Update rows using columns [i,s] as PK:
  (4, "str4") -> (4, "str4_updated"), (3333, "str3_updated") -> (33, "str3_updated_twice")

iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_pos_and_multi_eq_ids:
Used Flink and Impala to create a table that has both positional and equality delete
files where some of the equality deletes have different equality field IDs.
1-Flink:
  create table hadoop_catalog.ice.iceberg_v2_delete_pos_and_multi_eq_ids
      (i int not null, s string not null, d date not null, primary key(i,s) not enforced)
      with ('format-version'='2', 'write.upsert.enabled'='true');
2-Flink:
  insert into hadoop_catalog.ice.iceberg_v2_delete_pos_and_multi_eq_ids values
      (1, 'str1', to_date('2024-01-23')),
      (2, 'str2', to_date('2024-01-24')),
      (3, 'str3', to_date('2024-01-25'));
3-Flink:
  insert into hadoop_catalog.ice.iceberg_v2_delete_pos_and_multi_eq_ids values
      (1, 'str1', to_date('2020-12-01')),
      (4, 'str4', to_date('2024-01-26'));
4-Impala:
  delete from functional_parquet.iceberg_v2_delete_pos_and_multi_eq_ids where s = 'str2';
5: Manually edited 'identifier-field-ids' from [1,2] to [3,2].
6: Restarted Flink to forget the table metadata.
7-Flink:
  insert into hadoop_catalog.ice.iceberg_v2_delete_pos_and_multi_eq_ids values
      (333333, 'str3', to_date('2024-01-25')),
      (5, 'str5', to_date('2024-01-27'));

iceberg_test/iceberg_migrated_alter_test
Generated and migrated by Hive
CREATE TABLE iceberg_migrated_alter_test (int_col int, string_col string, double_col double) stored as parquet;
insert into table iceberg_migrated_alter_test values (0, "A", 0.5), (1, "B", 1.5), (2, "C", 2.5);
ALTER TABLE iceberg_migrated_alter_test SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
Then extracted from hdfs and modified to be able to load as an external hadoop table

iceberg_test/iceberg_migrated_alter_test_orc
Generated and migrated by Hive
CREATE TABLE iceberg_migrated_alter_test_orc (int_col int, string_col string, double_col double) stored as orc;
insert into table iceberg_migrated_alter_test_orc values (0, "A", 0.5), (1, "B", 1.5), (2, "C", 2.5);
ALTER TABLE iceberg_migrated_alter_test_orc SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
Then extracted from hdfs and modified to be able to load as an external hadoop table

iceberg_test/iceberg_migrated_complex_test
Generated and migrated by Hive
CREATE TABLE iceberg_migrated_complex_test (struct_1_col struct<int_array_col: array<int>, string_col: string, bool_int_map_col: map<boolean, int>>,  int_bigint_map_col map<int, bigint>, struct_2_col struct<struct_3_col: struct<float_col: float, string_double_map_col: map<string, double>, bigint_array_col: array<bigint>>, int_int_map_col: map<int, int>>) stored as parquet;
insert into table iceberg_migrated_complex_test values (named_struct("int_array_col", array(0), "string_col", "A", "bool_int_map_col", map(True, 1 )), map(2,CAST(3 as bigint)), named_struct("struct_3_col", named_struct("float_col", cast(0.5 as float), "string_double_map_col", map("B", cast(1.5 as double)), "bigint_array_col", array(cast(4 as bigint))), "int_int_map_col", map(5,6)));
ALTER TABLE iceberg_migrated_complex_test SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
Then extracted from hdfs and modified to be able to load as an external hadoop table

iceberg_test/iceberg_migrated_complex_test_orc
Generated and migrated by Hive
CREATE TABLE iceberg_migrated_complex_test_orc (struct_1_col struct<int_array_col: array<int>, string_col: string, bool_int_map_col: map<boolean, int>>,  int_bigint_map_col map<int, bigint>, struct_2_col struct<struct_3_col: struct<float_col: float, string_double_map_col: map<string, double>, bigint_array_col: array<bigint>>, int_int_map_col: map<int, int>>) stored as orc;
insert into table iceberg_migrated_complex_test_orc values (named_struct("int_array_col", array(0), "string_col", "A", "bool_int_map_col", map(True, 1 )), map(2,CAST(3 as bigint)), named_struct("struct_3_col", named_struct("float_col", cast(0.5 as float), "string_double_map_col", map("B", cast(1.5 as double)), "bigint_array_col", array(cast(4 as bigint))), "int_int_map_col", map(5,6)));
ALTER TABLE iceberg_migrated_complex_test_orc SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
Then extracted from hdfs and modified to be able to load as an external hadoop table

iceberg_test/hadoop_catalog/ice/iceberg_v2_no_deletes:
Created by Hive 3.1.3000.2022.0.10.0-49 r5cd8759d0df2ecbfb788b7f4ee0edce6022ee459
create table iceberg_v2_no_deletes (i int, s string)
stored by iceberg
location '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_no_deletes'
tblproperties ('format-version'='2');
insert into iceberg_v2_no_deletes values (1, 'x'), (2, 'y'), (3, 'z');
(setting table location is important because it makes it easier the conversion later).
Then saved the contents from HDFS to local  ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice
And converted the HiveCatalog metadata to HadoopCatalog metadata via the following scripts:
convert_to_iceberg.sh:
#!/bin/bash

i=0
for f in *.json; do
  i=$((i+1))
  sed -i 's|hdfs://localhost:20500/test-warehouse/|/test-warehouse/|' $f
  mv $f v${i}.metadata.json
done
echo ${i} > version-hint.txt

for f in *.avro; do
  avro_iceberg_convert.sh $f 'hdfs://localhost:20500/test-warehouse/'  '/test-warehouse/'
  mv ${f}_mod $f
done

rm *.avro_json

avro_iceberg_convert.sh:
#!/bin/bash

# Usage: avro_iceberg_convert.sh <source avro> <search string> <replace string>
# Example:
SOURCE_FILE=$1
SEARCH_STR=$2
REPLACE_STR=$3
TMP_JSON=$1_json
DST_AVRO=$1_mod
AVRO_TOOLS="/path/to/avro-tools-1.11.0.jar"

if [ ! -f "$AVRO_TOOLS" ]; then
    echo "Can't find $AVRO_TOOLS."
    exit
fi
if [ ! -f $SOURCE_FILE ]; then
    echo "Can't find source file: $SOURCE_FILE!"
    exit
fi
# Transform avro to json:
java -jar $AVRO_TOOLS tojson --pretty $SOURCE_FILE  > $TMP_JSON
# Replace search string with replace string
sed --in-place "s|$SEARCH_STR|$REPLACE_STR|g" $TMP_JSON
# Convert the file back to avro
SCHEMA=`java -jar $AVRO_TOOLS getschema $SOURCE_FILE`
java -jar $AVRO_TOOLS fromjson $TMP_JSON --schema "$SCHEMA" > $DST_AVRO

These updates the manifest files and their length probably change. Snapshot files store the
length of the manifest files, so it need to be changed as well. If it only stores one manifest:

avro_iceberg_convert.sh FILE "\"manifest_length.*$" "\"manifest_length\" : SIZE,"
mv FILE_mod FILE

If a snapshot has multiple manifest files, then you need to change manually the previously generated
*_json files and transform it back using avro tools, the last step of avro_iceberg_convert.sh, or
use testdata/bin/rewrite-iceberg-metadata.py

iceberg_test/hadoop_catalog/ice/iceberg_with_key_metadata:
Created by the following steps:
 - saved the HDFS directory of 'iceberg_v2_no_deletes' to local
   ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice
 - converted the avro manifest file to json
 - manually replaced the 'null' value for "key_metadata" with "{"bytes" :
   "binary_key_metadata"}"
 - converted the modified json file back to avro.
 - adjusted the length of manifest file in the avro snapshot file

The commands for converting the avro file to json and back are listed under
'iceberg_v2_no_deletes' in the script avro_iceberg_convert.sh. Adjusting the length is
described after the script.

iceberg_v2_partitioned_position_deletes:
iceberg_v2_partitioned_position_deletes_orc:
Created similarly to iceberg_v2_no_deletes.
Hive> create table iceberg_v2_partitioned_position_deletes (
  id int, `user` string, action string, event_time timestamp)
partitioned by spec (action)
STORED BY ICEBERG
location '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_partitioned_position_deletes'
tblproperties ('format-version'='2', 'write.format.default'='parquet');
Impala> insert into iceberg_v2_partitioned_position_deletes select * from functional_parquet.iceberg_partitioned;
Hive> delete from iceberg_v2_partitioned_position_deletes where id % 2 = 1;
ORC table is similarly created.


iceberg_v2_positional_delete_all_rows:
iceberg_v2_positional_delete_all_rows_orc:
Created similarly to iceberg_v2_no_deletes.
Hive> create table iceberg_v2_positional_delete_all_rows_orc (
        i int, s string)
      stored by iceberg
      location '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_delete_all_rows_orc'
      tblproperties ('format-version'='2', 'write.format.default'='orc');
Hive> insert into iceberg_v2_positional_delete_all_rows_orc values (1, 'x'), (2, 'y'), (3, 'z');
Hive> delete from iceberg_v2_positional_delete_all_rows_orc;

iceberg_v2_positional_not_all_data_files_have_delete_files:
iceberg_v2_positional_not_all_data_files_have_delete_files_orc:
Created similarly to iceberg_v2_no_deletes.
create table iceberg_v2_positional_not_all_data_files_have_delete_files_orc (i int, s string)
stored by iceberg
location '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_not_all_data_files_have_delete_files_orc'
tblproperties ('format-version'='2', 'write.format.default'='orc');
insert into iceberg_v2_positional_not_all_data_files_have_delete_files_orc values (1,'a'), (2,'b'), (3,'c');
insert into iceberg_v2_positional_not_all_data_files_have_delete_files_orc values (4,'d'), (5,'e'), (6,'f');
insert into iceberg_v2_positional_not_all_data_files_have_delete_files_orc values (7,'g'), (8,'h'), (9,'i');
update iceberg_v2_positional_not_all_data_files_have_delete_files_orc set s='X' where i = 5;
delete from iceberg_v2_positional_not_all_data_files_have_delete_files_orc where i > 6;

iceberg_v2_positional_update_all_rows:
Created similarly to iceberg_v2_no_deletes
create table iceberg_v2_positional_update_all_rows (i int, s string)
stored by iceberg
location '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_positional_update_all_rows'
tblproperties ('format-version'='2');
insert into iceberg_v2_positional_update_all_rows values (1,'a'), (2,'b'), (3,'c')
update iceberg_v2_positional_update_all_rows set s = upper(s);

iceberg_with_puffin_stats:
Created similarly to iceberg_v2_no_deletes.
With Impala:
 create table iceberg_with_puffin_stats(i INT, d DECIMAL(9, 0)) stored as iceberg;
 insert into iceberg_with_puffin_stats values (1, 1), (2, 2);
With Trino:
 use iceberg.default;
 analyze iceberg_with_puffin_stats;
And then converted the table with 'convert_to_iceberg.sh' and 'avro_iceberg_convert.sh'
described under the section of 'iceberg_v2_no_deletes.'.

iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations*:
- 'iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations'
- 'iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data'
- 'iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data01'
- 'iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data02'
Generated by Iceberg Java API version 0.13.2, the address of the document is https://iceberg.apache.org/docs/latest/api/
Step 1, create the Iceberg table 'iceberg_multiple_storage_locations' that location is 'iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations':
'col_name','data_type'
col_int,int
col_bigint,bigint
col_float,float
col_double,double
col_string,string
col_timestamp,timestamp
col_date,date
'col_name','transform_type'
col_int,IDENTITY
Step 2, set the table property 'write.data.path' to '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data' and insert 3 records:
0,12345678900,3.1400001049,2.7182,'a',1970-01-01 00:00:00,1974-02-09
0,12345678901,3.1400001049,2.71821,'b',1970-01-01 00:00:00,1974-02-09
1,12345678902,3.1400001049,2.71822,'c',1970-01-01 00:00:00,1974-02-09
Step 3, update the table property 'write.data.path' to '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data01' and insert 3 records:
1,12345678900,3.1400001049,2.7182,'a',1970-01-01 00:00:00,1974-02-09
1,12345678901,3.1400001049,2.71821,'b',1970-01-01 00:00:00,1974-02-09
2,12345678902,3.1400001049,2.71822,'c',1970-01-01 00:00:00,1974-02-09
Step 4, update the table property 'write.data.path' to '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_multiple_storage_locations_data02' and insert 3 records:
2,12345678900,3.1400001049,2.7182,'a',1970-01-01 00:00:00,1974-02-09
2,12345678901,3.1400001049,2.71821,'b',1970-01-01 00:00:00,1974-02-09
0,12345678902,3.1400001049,2.71822,'c',1970-01-01 00:00:00,1974-02-09

iceberg_test/iceberg_migrated_alter_test_orc
Generated by Hive
create table iceberg_mixed_file_format_test (i int, s string, d double) stored by iceberg;
insert into iceberg_mixed_file_format_test values (1, "A", 0.5);
alter table iceberg_mixed_file_format_test set tblproperties("write.format.default"="orc");
insert into iceberg_mixed_file_format_test values (2, "B", 1.5);
alter table iceberg_mixed_file_format_test set tblproperties("write.format.default"="parquet");
insert into iceberg_mixed_file_format_test values (3, "C", 2.5);
alter table iceberg_mixed_file_format_test set tblproperties("write.format.default"="orc");
insert into iceberg_mixed_file_format_test values (4, "D", 3.5);
Converted similarly to iceberg_v2_no_deletes

create_table_like_parquet_test.parquet:
Generated by Hive
create table iceberg_create_table_like_parquet_test (col_int int, col_float float, col_double double, col_string string, col_struct struct<col_int:int, col_float:float>, col_array array<string>, col_map map<string,array<int>>) stored as parquet;
insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22)));

iceberg_lineitem_multiblock
Generated by Hive, see testdata/LineItemMultiBlock/README.dox for more details
set parquet.block.size=4086;
create table functional_parquet.iceberg_lineitem_multiblock like tpch.lineitem stored by iceberg location 'hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock' tblproperties('format-version'='2');
insert into functional_parquet.iceberg_lineitem_multiblock select * from tpch.lineitem limit 20000;
Delete by Ímpala
delete from functional_parquet.iceberg_lineitem_multiblock where l_linenumber%5=0;
Then saved the contents from HDFS to local  ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice
And converted the HiveCatalog metadata to HadoopCatalog metadata via scripts at iceberg_v2_no_deletes
And rewrote metadata content to the correct lengths with
testdata/bin/rewrite-iceberg-metadata.py "" testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/

iceberg_spark_compaction_with_dangling_delete:
1) Create an Iceberg table with Impala and insert some rows.
  create table functional_parquet.iceberg_spark_compaction_with_dangling_delete (id int, j bigint)
  STORED AS ICEBERG
  TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
    'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
    'iceberg.table_identifier'='ice.iceberg_spark_compaction_with_dangling_delete',
    'format-version'='2');
  insert into functional_parquet.iceberg_spark_compaction_with_dangling_delete values
  (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);
2) Update one field of a row by Impala. This adds a new data and a new delete file to the table.
  update functional_parquet.iceberg_spark_compaction_with_dangling_delete set j = -100 where id = 4;
3) Delete the same row with Impala that we updated in step 2). This adds another delete file.
  delete from functional_parquet.iceberg_spark_compaction_with_dangling_delete where id = 4;
4) Run compaction on the table with Spark.
  spark.sql(s"CALL hadoop_catalog.system.rewrite_data_files(table => 'ice.iceberg_spark_compaction_with_dangling_delete', options => map('min-input-files','2') )")

iceberg_v2_equality_delete_schema_evolution:
1: Create and populate an Iceberg table with primary keys with Impala:
  create table functional_parquet.iceberg_v2_equality_delete_schema_evolution
    (i int not null, d date not null, s string, primary key(i, d) not enforced)
    PARTITIONED BY SPEC (d)
    STORED AS ICEBERG
    TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
        'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
        'iceberg.table_identifier'='ice.iceberg_v2_equality_delete_schema_evolution',
        'format-version'='2');
  insert into functional_parquet.iceberg_v2_equality_delete_schema_evolution values
    (1, "2024-03-20", "str1"),
    (2, "2024-03-20", "str2"),
    (3, "2024-03-21", "str3"),
    (4, "2024-03-21", "str4"),
    (5, "2024-03-22", "str5");
2: Delete some rows with Nifi where i=2, i=3
3: Do some schema evolution on the table with Impala:
  alter table functional_parquet.iceberg_v2_equality_delete_schema_evolution change s str string;
  alter table functional_parquet.iceberg_v2_equality_delete_schema_evolution add column j int;
4: Update a row with Nifi where i=4 to the following:
  (44, 2024-03-21, "str4", 4444)

iceberg_v2_null_delete_record:
1) Created the table via Impala and added some records to it.
  CREATE TABLE iceberg_v2_null_delete_record(i INT, j INT)
  STORED BY ICEBERG;
  INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);
  INSERT INTO iceberg_v2_null_delete_record VALUES (1,1), (2,2), (3,3), (4,4);

  (We need at least 2 data files to use DIRECTED mode in KrpcDataStreamSender)

2) Created the following temporary table:
  CREATE TABLE iceberg_v2_null_delete_record_pos_delete (file_path STRING, pos BIGINT)
  STORED BY ICEBERG;

  Manually rewrote the metadata JSON file of this table, so the schema elements have the
  following field ids (there are two places where I had to modify the schemas):

    file_path : 2147483546
    pos       : 2147483545

3) Inserted data files into iceberg_v2_null_delete_record_pos_delete:

  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
  (NULL, 0);

  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
  ('<data file path of iceberg_v2_null_delete_record>', 0), (NULL, 3);

  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
  (NULL, 2), ('<data file path of iceberg_v2_null_delete_record>', 0);

  INSERT INTO iceberg_v2_null_delete_record_pos_delete VALUES
  (NULL, 0), (NULL, 1), (NULL, 2);

  The written Parquet files have the schema of position delete files (with the
  correct Iceberg field ids)

4) Copied iceberg_v2_null_delete_record to the local filesystem and applied
  the following modifications:

  * added the Parquet files from iceberg_v2_null_delete_record_pos_delete to
    the /data directory
  * manually edited the metadata JSON, and the manifest and manifest list files to
    register the delete files in the table

arrays_big.parq:
Generated with RandomNestedDataGenerator.java from the following schema:
{
  "fields": [
    {
      "name": "int_col",
      "type": "int"
    },
    {
      "name": "string_col",
      "type": "string"
    },
    {
      "name": "int_array",
      "type": [
        "null",
        {
          "type": "array",
          "items": ["int", "null"]
        }
      ]
    },
    {
      "name": "double_map",
      "type": [
        "null",
        {
          "type": "map",
          "values": ["double", "null"]
        }
      ]
    },
    {
      "name": "string_array",
      "type": [
        "null",
        {
          "type": "array",
          "items": ["string", "null"]
        }
      ]
    },
    {
      "name" : "mixed",
      "type" : {
        "type" : "map",
        "values" : [
          "null",
          {
            "type" : "array",
            "items" : [
              "null",
              {
                "type": "map",
                "values": [
                  "null",
                  {
                    "name": "struct_in_mixed",
                    "type": "record",
                    "fields": [
                      {
                        "name": "string_member",
                        "type": ["string", "null"]
                      },
                      {
                        "name": "int_member",
                        "type": ["int", "null"]
                      }
                    ]
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ],
  "name": "table_0",
  "namespace": "org.apache.impala",
  "type": "record"
}
The following command was used:
mvn -f "${IMPALA_HOME}/java/datagenerator/pom.xml" exec:java \
-Dexec.mainClass="org.apache.impala.datagenerator.RandomNestedDataGenerator" \
-Dexec.args="${INPUT_TBL_SCHEMA} 1500000 20 15 ${OUTPUT_FILE}"

empty_present_stream.orc:
Generated by ORC C++ library using the following code

  size_t num = 500;
  WriterOptions options;
  options.setRowIndexStride(100);
  auto stream = writeLocalFile("empty_present_stream.orc");
  std::unique_ptr<Type> type(Type::buildTypeFromString(
      "struct<s1:struct<id:int>,s2:struct<id:int>>"));

  std::unique_ptr<Writer> writer = createWriter(*type, stream.get(), options);

  std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(num);
  StructVectorBatch* structBatch =
      dynamic_cast<StructVectorBatch*>(batch.get());
  StructVectorBatch* structBatch2 =
      dynamic_cast<StructVectorBatch*>(structBatch->fields[0]);
  LongVectorBatch* intBatch =
      dynamic_cast<LongVectorBatch*>(structBatch2->fields[0]);

  StructVectorBatch* structBatch3 =
      dynamic_cast<StructVectorBatch*>(structBatch->fields[1]);
  LongVectorBatch* intBatch2 =
      dynamic_cast<LongVectorBatch*>(structBatch3->fields[0]);

  structBatch->numElements = num;
  structBatch2->numElements = num;

  structBatch3->numElements = num;
  structBatch3->hasNulls = true;

  for (size_t i = 0; i < num; ++i) {
    intBatch->data.data()[i] = i;
    intBatch->notNull[i] = 1;

    intBatch2->notNull[i] = 0;
    intBatch2->hasNulls = true;

    structBatch3->notNull[i] = 0;
  }
  intBatch->hasNulls = false;

  writer->add(*batch);
  writer->close();

invalid_binary_data.txt:
Hand-written file where BINARY values are not Base64 encoded.