Iceberg table iceberg_partitioned_orc has wrong metadata.
The field 'file_size_in_bytes' is wrong for the data files.
This causes issues on object stores where we rely more on information
coming from Iceberg metadata.
This commit updates the manifest and manifest list files to reflect
correct information.
Change-Id: Iae860f401947092d9fdca802f41dd6de79e0638d
Reviewed-on: http://gerrit.cloudera.org:8080/19476
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
Some manifest files were manually edited to support multiple
environments, namely hdfs://localhost:20500 were cropped from every
path. The snapshot files contain the length of these manifest files,
which was not adjusted.
Testing:
- Ran test_iceberg.py locally
- Verified lengths manually
Change-Id: I258055998a1d41b7f6047b6879e919834ed2c247
Reviewed-on: http://gerrit.cloudera.org:8080/19409
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch extends the support of Iceberg tables containing multiple
file formats. Now AVRO data files can also be read in a mixed table
besides Parquet and ORC.
Impala uses its avro scanner to read AVRO files, therefore all the
avro related limitations apply here as well: writes/metadata
changes are not supported.
testing:
- E2E testing: extending 'iceberg-mixed-file-format.test' to include
AVRO files as well, in order to test reading all three currently
supported file formats: avro+orc+parquet
Change-Id: I941adfb659218283eb5fec1b394bb3003f8072a6
Reviewed-on: http://gerrit.cloudera.org:8080/19353
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala generated wrong values for the FILE__POSITION column when the
Parquet file contained multiple row groups and page filtering was
used as well.
We are using the value of 'current_row_' in the Parquet column readers
to populate the file position slot. The problem is that 'current_row_'
denotes the index of the row within the row group and not within the
file. We cannot change 'current_row_' as page filtering depends on its
value, as the page index also uses the row group-based indexes of the
rows, not the file indexes.
In the meantime it turned out FILE__POSITION was also not set correctly
in the Parquet late materialization code, as
BaseScalarColumnReader::SkipRowsInternal() didn't update 'current_row_'
in some code paths.
The value of FILE__POSITION is critical for Iceberg V2 tables as
position delete files store file positions of the deleted rows.
Testing:
* added e2e tests
* the tests are now running w/o PARQUET_READ_STATISTICS to exercise
more code paths
Change-Id: I5ef37a1aa731eb54930d6689621cd6169fed6605
Reviewed-on: http://gerrit.cloudera.org:8080/19328
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Iceberg tables containing only AVRO files or no AVRO files at all
can now be read by Impala. Mixed file format tables with AVRO are
currently unsupported.
Impala uses its avro scanner to read AVRO files, therefore all the
avro related limitations apply here as well: writes/metadata
changes are not supported.
testing:
- created test tables: 'iceberg_avro_only' contains only AVRO files;
'iceberg_avro_mixed' contains all file formats: avro+orc+parquet
- added E2E test that reads Avro-only table
- added test case to iceberg-negative.test that tries to read
mixed file format table
Change-Id: I827e5707e54bebabc614e127daa48255f86f4c4f
Reviewed-on: http://gerrit.cloudera.org:8080/19084
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala already supports said statement, added query tests for it.
Testing:
- Generated parquet table with complex type by hive
- Created hive and iceberg table from said file, provides same output
for describe statement
Change-Id: Ia363b913e101fd49b62a280721680f0eb88761c0
Reviewed-on: http://gerrit.cloudera.org:8080/18969
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Added support for multiple file formats. Previously Impala created a
Scanner class based on the partitions file format, now in case of an
Iceberg table it will read out the file format from the file level
metadata instead.
IcebergScanNode will aggregate file formats as well instead of relying
on partitions, so it can be used for plannig.
Testing:
Created a mixed file format table with hive and added a test for it.
Change-Id: Ifc816595724e8fd2c885c6664f790af61ddf5c07
Reviewed-on: http://gerrit.cloudera.org:8080/18935
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
For Iceberg tables, when one of the following properties is used, it is
considered that the table is possible to have data outside the table
location directory:
- 'write.object-storage.enabled' is true
- 'write.data.path' is not empty
- 'write.location-provider.impl' is configured
- 'write.object-storage.path'(Deprecated) is not empty
- 'write.folder-storage.path'(Deprecated) is not empty
We should tolerate the situation that relative path of the data files
cannot be obtained by the table location path, and we could use the
absolute path in that case. E.g. the ETL program will write the table
that the metadata of the Iceberg tables is placed in
'hdfs://nameservice_meta/warehouse/hadoop_catalog/ice_tbl/metadata',
the recent data files in
'hdfs://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', and the
data files half a year ago in
's3a://nameservice_data/warehouse/hadoop_catalog/ice_tbl/data', it
should still be queried normally by Impala.
Testing:
- added e2e tests
Change-Id: I666bed21d20d5895f4332e92eb30a94fa24250be
Reviewed-on: http://gerrit.cloudera.org:8080/18894
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for reading Iceberg V2 tables use position
deletes. Equality deletes are still not supported. Position delete
files store the file path and file position of the deleted rows.
When an Iceberg table has position delete files we need to do an
ANTI JOIN between data files and delete files. From the data files
we need to query the virtual columns INPUT__FILE__NAME and
FILE__POSITION, while from the delete files we need the data columns
'file_path' and 'pos'. The latter data columns are not part of the
table schema, so we create a virtual table instance of
'IcebergPositionDeleteTable' that has a table schema corresponding
to the delete files ('file_path', 'pos').
This patch introduces a new class 'IcebergScanPlanner' which has
the responsibility of doing a plan for Iceberg table scans. It creates
the aforementioned ANTI JOIN. Also, if there are data files without
corresponding delete files, we can have a separate SCAN node and its
results would be UNIONed to the rows coming from the ANTI JOIN:
UNION
/ \
SCAN data ANTI JOIN
/ \
SCAN data SCAN deletes
Some refactorings in the context of this CR:
Predicate pushdown and time travel logic is transferred from
IcebergScanNode to IcebergScanPlanner. Iceberg snapshot summary
retrieval is moved from FeFsTable to FeIcebergTable.
Testing:
* added planner test
* added e2e tests
TODO in follow-up Jiras:
* better cardinality estimates (IMPALA-11516)
* support unrelative collection columns (select item from t.int_array)
(IMPALA-11517)
Currently such queries return error during analysis
Change-Id: I672cfee18d8e131772d90378d5b12ad4d0f7dd48
Reviewed-on: http://gerrit.cloudera.org:8080/18847
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for BINARY columns for all table formats with
the exception of Kudu.
In Hive the main difference between STRING and BINARY is that STRING is
assumed to be UTF8 encoded, while BINARY can be any byte array.
Some other differences in Hive:
- BINARY can be only cast from/to STRING
- Only a small subset of built-in STRING functions support BINARY.
- In several file formats (e.g. text) BINARY is base64 encoded.
- No NDV is calculated during COMPUTE STATISTICS.
As Impala doesn't treat STRINGs as UTF8, BINARY and STRING become nearly
identical, especially from the backend's perspective. For this reason,
BINARY is implemented a bit differently compared to other types:
while the frontend treats STRING and BINARY as two separate types, most
of the backend uses PrimitiveType::TYPE_STRING for BINARY too, e.g.
in SlotDesc. Only the following parts of backend need to differentiate
between STRING and BINARY:
- table scanners
- table writers
- HS2/Beeswax service
These parts have access to column metadata, which allows to add special
handling for BINARY.
Only a very few builtins are allowed for BINARY at the moment:
- length
- min/max/count
- coalesce and similar "selector" functions
Other STRING functions can be only used by casting to STRING first.
Adding support for more of these functions is very easy, as simply
the BINARY type has to be "connected" to the already existing STRING
function's signature. Functions where the result depends on utf8_mode
need to ensure that with BINARY it always works as if utf8_mode=0 (for
example length() is mapped to bytes() as length count utf8 chars if
utf8_mode=1).
All kinds of UDFs (native, Hive legacy, Hive generic) support BINARY,
though in case of legacy Hive UDFs it is only supported if the argument
and return types are set explicitely to ensure backward compatibility.
See IMPALA-11340 for details.
The original plan was to behave as close to Hive as possible, but I
realized that Hive has more relaxed casting rules than Impala, which
led to STRING<->BINARY casts being necessary in more cases in Impala.
This was needed to disallow passing a BINARY to functions that expect
a STRING argument. An example for the difference is that in
INSERT ... VALUES () string literals need to be explicitly cast to
BINARY, while this is not needed in Hive.
Testing:
- Added functional.binary_tbl for all file formats (except Kudu)
to test scanning.
- Removed functional.unsupported_types and related tests, as now
Impala supports all (non-complex) types that Hive does.
- Added FE/EE tests mainly based on the ones added to the DATE type
Change-Id: I36861a9ca6c2047b0d76862507c86f7f153bc582
Reviewed-on: http://gerrit.cloudera.org:8080/16066
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When external tables are converted to Iceberg, the data files remain
intact, thus missing field IDs. Previously, Impala used name based
column resolution in this case.
Added a feature to traverse through the data files before column
resolution and assign field IDs the same way as iceberg would, to be
able to use field ID based column resolutions.
Testing:
Default resolution method was changed to field id for migrated tables,
existing tests use that from now.
Added new tests to cover edge cases with complex types and schema
evolution.
Change-Id: I77570bbfc2fcc60c2756812d7210110e8cc11ccc
Reviewed-on: http://gerrit.cloudera.org:8080/18639
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IcebergScanNode interprets the timestamp literals as UTC timestamps
during predicate pushdown to Iceberg. It causes problems when the
Iceberg table uses TIMESTAMPTZ (which corresponds to TIMESTAMP WITH
LOCAL TIME ZONE in SQL) because in the scanners we assume that the
timestamp literals in a query are in local timezone.
Hence, if the Iceberg table is partitioned by HOUR(ts), and Impala is
running in a different timezone than UTC, then the following query
doesn't return any rows:
SELECT * from t
WHERE ts = <some ts>;
Because during predicate pushdown the timestamp is interpreted as a
UTC timestamp (no conversion from local to UTC), but during query
execution the timestamp data in the files are converted to local
timezone, then compared to <some ts>. I.e. in the scanner the
assumption is that <some ts> is in local timezone.
On the other hand, when Iceberg type TIMESTAMP (which correcponds
to TIMESTAMP WITHOUT TIME ZONE in SQL) is used, then we should just
push down the timestamp values without any conversion. In this case
there is no conversion in the scanners either.
Testing:
* added e2e test with TIMESTAMPTZ
* added e2e test with TIMESTAMP
Change-Id: I181be5d2fa004f69b457f69ff82dc2f9877f46fa
Reviewed-on: http://gerrit.cloudera.org:8080/18399
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Iceberg V2 DeleteFiles are skipped during scans and the whole content of
the DataFiles are returned. This commit adds an extra check to prevent
scanning tables that have delete files to avoid unexpected results till
merge on read is supported. Metadata operations are allowed on tables
with delete files.
Testing:
- Added e2e test.
Change-Id: I6e9cbf2424b27157883d551f73e728ab4ec6d21e
Reviewed-on: http://gerrit.cloudera.org:8080/18383
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When Impala/Hive creates a table they lowercase the schema elements.
When Spark creates an Iceberg table it doesn't lowercase the names
of the columns in the Iceberg metadata. This triggers a precondition
check in Impala which makes such Iceberg tables unloadable.
This patch converts column names to lowercase when converting Iceberg
schemas to Hive/Impala schemas.
Testing:
* added e2e test
Change-Id: Iffd910f76844fbf34db805dda6c3053c5ad1cf79
Reviewed-on: http://gerrit.cloudera.org:8080/18368
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When Hive (and probably other engines as well) converts a legacy Hive
table to Iceberg it doesn't rewrite the data files. It means that the
data files don't have write ids neither partition column data. Currently
Impala expects the partition columns to be present in the data files,
so it is not able to read converted partitioned tables.
With this patch Impala loads partition values from the Iceberg metadata.
The extra metadata information is attached to the file descriptor
objects and propageted to the scanners. This metadata contains the
Iceberg data file format (later it could be used to handle mixed-format
tables), and partition data.
We use the partition data in the HdfsScanner to create the template
tuple that contains the partition values of identity-partitioned
columns. This is not only true to migrated tables, but all Iceberg
tables with identity partitions, which means we also save some IO
and CPU time for such columns. The partition information could also
be used for Dynamic Partition Pruning later.
We use the (human-readable) string representation of the partition data
when storing them in the flat buffers. This helps debugging, also
it provides the needed flexibility when the partition columns
evolve (e.g. INT -> BIGINT, DECIMAL(4,2) -> DECIMAL(6,2)).
Testing
* e2e test for all data types that can be used to partition a table
* e2e test for migrated partitioned table + schema evolution (without
renaming columns)
* e2e for table where all columns are used as identity-partitions
Change-Id: Iac11a02de709d43532056f71359c49d20c1be2b8
Reviewed-on: http://gerrit.cloudera.org:8080/18240
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Added default 0 for scale if it is not set to comply with parquet spec.
Wrapped reading scale and precision in a function to support reading
LogicalType.DecimalType if it is set, falling back to old ones if it is
not, for backward compatibility.
Regenerated bad_parquet_decimals table with filled DecimalType, moved
missing scale test, as it is no longer a bad table.
Added no_scale.parquet table to test reading table without set scale.
Checked it with parquet-tools:
message schema {
optional fixed_len_byte_array(2) d1 (DECIMAL(4,0));
}
Change-Id: I003220b6e2ef39d25d1c33df62c8432803fdc6eb
Reviewed-on: http://gerrit.cloudera.org:8080/18224
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala crashes on a Parquet file that contains the partition columns.
Data files usually don't contain the partition columns, so Impala don't
expect to find such columns in the data files. Unfortunately min/max
filtering generates a SEGFAULT when the partition column is present in
the data files.
It happens when FindSkipRangesForPagesWithMinMaxFilters() tries to
retrieve the Parquet schema element for a given slot descriptor. When
the slot descriptor refers to a partition column, we usually don't find
a schema element so we don't try to skip pages.
But when the partition column is present in the data file, the code
tries to calculate the filtered pages for the column. It uses the column
reader object corresponding to the column, but this is NULL for
partition columns, hence we get a SEGFAULT.
The code shouldn't do anything at the page-level for partition columns,
as the data in such columns are the same for the whole file and it is
already filtered at a higher level.
Testing:
* added e2e test
Change-Id: I17eff4467da3fd67a21353ba2d52d3bec405acd2
Reviewed-on: http://gerrit.cloudera.org:8080/18265
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala returns "Couldn't skip rows in file" error for old Parquet
file written by an old Impala (e.g. Impala 2.5, 2.6) In DEBUG build
Impala crashes by a DCHECK:
Check failed: num_buffered_values_ > 0 (-1 vs. 0)
The problem is that in some old Parquet files there can be a mismatch
between 'num_values' in a page and the encoded def/rep levels.
There is usually one more def/rep levels encoded in these files.
In SkipTopLevelRows() we skipped values based on how many def levels are
92ce6fe48e/be/src/exec/parquet/parquet-column-readers.cc (L1308-L1314)
Since there are more def levels than values in some old files,
num_buferred_values_ could become negative.
This patch also takes the value of num_buferred_values_ into account
when calculating 'read_count', so we can deal with such files. With
this patch we also include the column name in the "Couldn't skip rows"
error message, so in the future it'll be easier to identify the
problematic columns.
Testing:
* added Parquet file written by Impala 2.5 and e2e test for it
Change-Id: I568fe59df720ea040be4926812412ba4c1510a26
Reviewed-on: http://gerrit.cloudera.org:8080/18257
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds an end-to-end test to validate and characterize HMS'
behavior with respect to external table creation after HIVE-25569 via
which a user is allowed to create an external table associated with a
single file.
Change-Id: Ia4f57f07a9f543c660b102ebf307a6cf590a6784
Reviewed-on: http://gerrit.cloudera.org:8080/18033
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Aman Sinha <amsinha@cloudera.com>
This patch provides an unnest implementation for arrays where unnesting
multiple arrays in one query results the items of the arrays being
zipped together instead of joining. There are two different syntaxes
introduced for this purpose:
1: ISO:SQL 2016 compliant syntax:
SELECT a1.item, a2.item
FROM complextypes_arrays t, UNNEST(t.arr1, t.arr2) AS (a1, a2);
2: Postgres compatible syntax:
SELECT UNNEST(arr1), UNNEST(arr2) FROM complextypes_arrays;
Let me show the expected behaviour through the following example:
Inputs: arr1: {1,2,3}, arr2: {11, 12}
After running any of the above queries we expect the following output:
===============
| arr1 | arr2 |
===============
| 1 | 11 |
| 2 | 12 |
| 3 | NULL |
===============
Expected behaviour:
- When unnesting multiple arrays with zipping unnest then the 'i'th
item of one array will be put next to the 'i'th item of the other
arrays in the results.
- In case the size of the arrays is not the same then the shorter
arrays will be filled with NULL values up to the size of the longest
array.
On a sidenote, UNNEST is added to Impala's SQL language as a new
keyword. This might interfere with use cases where a resource (db,
table, column, etc.) is named "UNNEST".
Restrictions:
- It is not allowed to have WHERE filters on an unnested item of an
array in the same SELECT query. E.g. this is not allowed:
SELECT arr1.item
FROM complextypes_arrays t, UNNEST(t.arr1) WHERE arr1.item < 5;
Note, that it is allowed to have an outer SELECT around the one
doing unnests and have a filter there on the unnested items.
- If there is an outer SELECT filtering on the unnested array's items
from the inner SELECT then these predicates won't be pushed down to
the SCAN node. They are rather evaluated in the UNNEST node to
guarantee result correctness after unnesting.
Note, this restriction is only active when there are multiple arrays
being unnested, or in other words when zipping unnest logic is
required to produce results.
- It's not allowed to do a zipping and a (traditional) joining unnest
together in one SELECT query.
- It's not allowed to perform zipping unnests on arrays from different
tables.
Testing:
- Added a bunch of E2E tests to the test suite to cover both syntaxes.
- Did a manual test run on a table with 1000 rows, 3 array columns
with size of around 5000 items in each array. I did an unnest on all
three arrays in one query to see if there are any crashes or
suspicious slowness when running on this scale.
Change-Id: Ic58ff6579ecff03962e7a8698edfbe0684ce6cf7
Reviewed-on: http://gerrit.cloudera.org:8080/17983
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When a regular Parquet/ORC table is converted to Iceberg via Hive,
only the Iceberg metadata files need to be created. The data files
can stay in place.
This causes problems when the data files don't have field ids for
the schema elements. Currently Impala resolves columns in data
files based on Iceberg field ids, but since they are missing,
Impala raises an error or returns NULLs.
With this patch Impala falls back to the default column resolution
strategy when the data files lack field ids.
Testing:
* added e2e tests both for Parquet and ORC
Change-Id: I85881b09891c7bd101e7a96e92561b70bbe5af41
Reviewed-on: http://gerrit.cloudera.org:8080/17953
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-7087 is about reading Parquet decimal columns with lower
precision/scale than table metadata.
IMPALA-8131 is about reading Parquet decimal columns with higher
scale than table metadata.
Both are resolved by this patch. It reuses some parts from an
earlier change request from Sahil Takiar:
https://gerrit.cloudera.org/#/c/12163/
A new utility class has been introduced, ParquetDataConverter which does
the data conversion. It also helps to decide whether data conversion
is needed or not.
NULL values are returned in case of overflows. This behavior is
consistent with Hive.
Parquet column stats reader is also updated to convert the decimal
values. The stats reader is used to evaluate min/max conjuncts. It
works well because later we also evaluate the conjuncts on the
converted values anyway.
The status of different filterings:
* dictionary filtering: disabled for columns that need conversion
* runtime bloom filters: work on the converted values
* runtime min/max filters: work on the converted values
This patch also enables schema evolution of decimal columns of Iceberg
tables.
Testing:
* added e2e tests
Change-Id: Icefa7e545ca9f7df1741a2d1225375ecf54434da
Reviewed-on: http://gerrit.cloudera.org:8080/17678
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This change adds read support for Parquet Bloom filters for types that
can reasonably be supported in Impala. Other types, such as CHAR(N),
would be very difficult to support because the length may be different
in Parquet and in Impala which results in truncation or padding, and
that changes the hash which makes using the Bloom filter impossible.
Write support will be added in a later change.
The supported Parquet type - Impala type pairs are the following:
---------------------------------------
|Parquet type | Impala type |
|---------------------------------------|
|INT32 | TINYINT, SMALLINT, INT |
|INT64 | BIGINT |
|FLOAT | FLOAT |
|DOUBLE | DOUBLE |
|BYTE_ARRAY | STRING |
---------------------------------------
The following types are not supported for the given reasons:
----------------------------------------------------------------
|Impala type | Problem |
|----------------------------------------------------------------|
|VARCHAR(N) | truncation can change hash |
|CHAR(N) | padding / truncation can change hash |
|DECIMAL | multiple encodings supported |
|TIMESTAMP | multiple encodings supported, timezone conversion |
|DATE | not considered yet |
----------------------------------------------------------------
Support may be added for these types later, see IMPALA-10641.
If a Bloom filter is available for a column that is fully dictionary
encoded, the Bloom filter is not used as the dictionary can give exact
results in filtering.
Testing:
- Added tests/query_test/test_parquet_bloom_filter.py that tests
whether Parquet Bloom filtering works for the supported types and
that we do not incorrectly discard row groups for the unsupported
type VARCHAR. The Parquet file used in the test was generated with
an external tool.
- Added unit tests for ParquetBloomFilter in file
be/src/util/parquet-bloom-filter-test.cc
- A minor, unrelated change was done in
be/src/util/bloom-filter-test.cc: the MakeRandom() function had
return type uint64_t, the documentation claimed it returned a 64 bit
random number, but the actual number of random bits is 32, which is
what is intended in the tests. The return type and documentation
have been corrected to use 32 bits.
Change-Id: I7119c7161fa3658e561fc1265430cb90079d8287
Reviewed-on: http://gerrit.cloudera.org:8080/17026
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Csaba Ringhofer <csringhofer@cloudera.com>
Currently the ORC scanner only supports position-based column
resolution. This patch adds Iceberg field-id based column resolution
which will be the default for Iceberg tables. It is needed to support
schema evolution in the future, i.e. ALTER TABLE DROP/RENAME COLUMNS.
(The Parquet scanner already supports Iceberg field-id based column
resolution)
Testing
* added e2e test 'iceberg-orc-field-id.test' by copying the contents of
nested-types-scanner-basic,
nested-types-scanner-array-materialization,
nested-types-scanner-position,
nested-types-scanner-maps,
and executing the queries on an Iceberg table with ORC data files
Change-Id: Ia2b1abcc25ad2268aa96dff032328e8951dbfb9d
Reviewed-on: http://gerrit.cloudera.org:8080/17398
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches CPC
sketches produced by ds_cpc_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_cpc_estimate(ds_cpc_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_cpc_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_cpc_union() on those sketches
Change-Id: Ib94b45ae79efcc11adc077dd9df9b9868ae82cb6
Reviewed-on: http://gerrit.cloudera.org:8080/17372
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
These functions can be used to get cardinality estimates of data
using CPC algorithm from Apache DataSketches. ds_cpc_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized CPC sketch in string format. This can be written to a
table or be fed directly to ds_cpc_estimate() that returns the
cardinality estimate for that sketch.
Similar to the HLL sketch, the primary use-case for the CPC sketch
is for counting distinct values as a stream, and then merging
multiple sketches together for a total distinct count.
For more details about Apache DataSketches' CPC see:
http://datasketches.apache.org/docs/CPC/CPC.html
Figures-of-Merit Comparison of the HLL and CPC Sketches see:
https://datasketches.apache.org/docs/DistinctCountMeritComparisons.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on tpch_parquet.lineitem to compare perfomance
with ndv(). Depending on data characteristics ndv() appears 2x-3x
faster. CPC gives closer estimate than current ndv(). CPC is more
accurate than HLL in some cases
Change-Id: I731e66fbadc74bc339c973f4d9337db9b7dd715a
Reviewed-on: http://gerrit.cloudera.org:8080/16656
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
ORC-189 and ORC-666 added support for a new timestamp type
'TIMESTMAP WITH LOCAL TIMEZONE' to the Orc library.
This patch adds support for reading such timestamps with Impala.
These are UTC-normalized timestamps, therefore we convert them
to local timezone during scanning.
Testing:
* added test for CREATE TABLE LIKE ORC
* added scanner tests to test_scanners.py
Change-Id: Icb0c6a43ebea21f1cba5b8f304db7c4bd43967d9
Reviewed-on: http://gerrit.cloudera.org:8080/17347
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches Theta
sketches produced by ds_theta_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_theta_estimate(ds_theta_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_theta_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_theta_union() on those sketches
Change-Id: I91baf58c76eb43748acd5245047edac8c66761b2
Reviewed-on: http://gerrit.cloudera.org:8080/17048
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
These functions can be used to get cardinality estimates of data
using Theta algorithm from Apache DataSketches. ds_theta_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized Theta sketch in string format. This can be written to a
table or be fed directly to ds_theta_estimate() that returns the
cardinality estimate for that sketch.
Similar to the HLL sketch, the primary use-case for the Theta sketch
is for counting distinct values as a stream, and then merging
multiple sketches together for a total distinct count.
For more details about Apache DataSketches' Theta see:
https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on tpch25_parquet.lineitem to compare perfomance
with ds_hll_*. ds_theta_* is faster than ds_hll_* on the original
data, the difference is around 1%-10%. ds_hll_estimate() is faster
than ds_theta_estimate() on existing sketch. HLL and Theta gives
closer estimate except for string. see IMPALA-10464.
Change-Id: I14f24c16b815eec75cf90bb92c8b8b0363dcbfbc
Reviewed-on: http://gerrit.cloudera.org:8080/17008
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
We supported resolve column by field id for Iceberg table in this
patch. Currently, we use field id to resolve column for Iceberg
tables, which means 'PARQUET_FALLBACK_SCHEMA_RESOLUTION' is invalid
for Iceberg tables.
Change-Id: I057bdc6ab2859cc4d40de5ed428d0c20028b8435
Reviewed-on: http://gerrit.cloudera.org:8080/16788
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
The DESCRIBE HISTORY works for Iceberg tables and displays the
snapshot history of the table.
An example output:
DESCRIBE HISTORY iceberg_multi_snapshots;
+----------------------------+---------------------+---------------------+---------------------+
| creation_time | snapshot_id | parent_id | is_current_ancestor |
+----------------------------+---------------------+---------------------+---------------------+
| 2020-10-13 14:01:07.234000 | 4400379706200951771 | NULL | TRUE |
| 2020-10-13 14:01:19.307000 | 4221472712544505868 | 4400379706200951771 | TRUE |
+----------------------------+---------------------+---------------------+---------------------+
The purpose here was to have similar output with this new feature as
what SparkSql returns for "SELECT * from tablename.history".
See "History" section of
https://iceberg.apache.org/spark/#inspecting-tables
Testing:
- iceberg-negative.test was extended to check that DESCRIBE HISTORY
is not applicable for non-Iceberg tables.
- iceberg-table-history.test: Covers basic usage of DESCRIBE
HISTORY. Tests on tables created with Impala and also with Spark.
Change-Id: I56a4b92c27e8e4a79109696cbae62735a00750e5
Reviewed-on: http://gerrit.cloudera.org:8080/16599
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: wangsheng <skyyws@163.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In practice we recommend that hdfs block size should align with parquet
row group size.But in fact some compute engine like spark, default
parquet row group size is 128MB, and if ETL user doesn't change the
default property spark will generate row groups that smaller than hdfs
block size. The result is a single hdfs block may contain multiple
parquet row groups.
In planner stage, length of impala generated scan range may be bigger
than row group size. thus a single scan range contains multiple row
group. In current parquet scanner when move to next row group, some of
internal stat in parquet column readers need to reset.
eg: num_buffered_values_, column chunk metadata, reset internal stat of
column chunk readers. But current_row_range_ offset is not reset
currently, this will cause errors
"Couldn't skip rows in file hdfs://xxx" as IMPALA-10310 points out.
This patch simply reset current_row_range_ to 0 when moving into next
row group in parquet column readers. Fix the bug IMPALA-10310.
Testing:
* Add e2e test for parquet multi blocks per file and multi pages
per block
* Ran all core tests offline.
* Manually tested all cases encountered in my production environment.
Change-Id: I964695cd53f5d5fdb6485a85cd82e7a72ca6092c
Reviewed-on: http://gerrit.cloudera.org:8080/16697
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes querying Iceberg table with ORC
file format. We can using following SQL to create table with
ORC file format:
CREATE TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.tables');
But pay attention, there still some problems when scan ORC files
with Timestamp, more details please refer IMPALA-9967. We may add
new tests with Timestmap type after this JIRA fixed.
Testing:
- Create table tests in functional_schema_template.sql
- Iceberg table create test in test_iceberg.py
- Iceberg table query test in test_scanners.py
Change-Id: Ib579461aa57348c9893a6d26a003a0d812346c4d
Reviewed-on: http://gerrit.cloudera.org:8080/16568
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
As IMPALA-4371 and IMPALA-10186 points out, Impala might write
empty data pages. It usually does that when it has to write a bigger
page than the current page size. If we really need to write empty data
pages is a different question, but we need to handle them correctly
as there are already such files out there.
The corresponding Parquet offset index entries to empty data pages
are invalid PageLocation objects with 'compressed_page_size' = 0.
Before this commit Impala didn't ignore the empty page locations, but
generated a warning. Since invalid page index doesn't fail a scan
by default, Impala continued scanning the file with semi-initialized
page filtering. This resulted in 'Top level rows aren't in sync'
error, or a crash in DEBUG builds.
With this commit Impala ignores empty data pages and still able to
filter the rest of the pages. Also, if the page index is corrupt
for some other reason, Impala correctly resets the page filtering
logic and falls back to regular scanning.
Testing:
* Added unit test for empty data pages
* Added e2e test for empty data pages
* Added e2e test for invalid page index
Change-Id: I4db493fc7c383ed5ef492da29c9b15eeb3d17bb0
Reviewed-on: http://gerrit.cloudera.org:8080/16503
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes creating Iceberg table by HadoopCatalog.
We only supported HadoopTables api before this patch, but now we can
use HadoopCatalog to create Iceberg table. When creating managed table,
we can use SQL like this:
CREATE TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test');
We supported two values ('hadoop.catalog', 'hadoop.tables') for
'iceberg.catalog' now. If you don't specify this property in your SQL,
default catalog type is 'hadoop.catalog'.
As for external Iceberg table, you can use SQL like this:
CREATE EXTERNAL TABLE default.iceberg_test_external
STORED AS ICEBERG
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test',
'iceberg.table_identifier'='default.iceberg_test');
We cannot set table location for both managed and external Iceberg
table with 'hadoop.catalog', and 'SHOW CREATE TABLE' will not display
table location yet. We need to use 'DESCRIBE FORMATTED/EXTENDED' to
get this location info.
'iceberg.catalog_location' is necessary for 'hadoop.catalog' table,
which used to reserved Iceberg table metadata and data, and we use this
location to load table metadata from Iceberg.
'iceberg.table_identifier' is used for Icebreg TableIdentifier.If this
property not been specified in SQL, Impala will use database and table name
to load Iceberg table, which is 'default.iceberg_test_external' in above SQL.
This property value is splitted by '.', you can alse set this value like this:
'org.my_db.my_tbl'. And this property is valid for both managed and external
table.
Testing:
- Create table tests in functional_schema_template.sql
- Iceberg table create test in test_iceberg.py
- Iceberg table query test in test_scanners.py
- Iceberg table show create table test in test_show_create_table.py
Change-Id: Ic1893c50a633ca22d4bca6726c9937b026f5d5ef
Reviewed-on: http://gerrit.cloudera.org:8080/16446
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Some tests (e.g. AnalyzeDDLTest.TestCreateTableLikeFileOrc) depend on
hard-coded file paths of managed tables, assuming that there is always a
file named 'base_0000001/bucket_00000_0' under the table dir. However,
the file name is in the form of bucket_${bucket-id}_${attempt-id}. The
last part of the file name is not guaranteed to be 0. If the first
attempt fails and the second attempt succeeds, the file name will be
bucket_00000_1.
This patch replaces these hard-coded file paths to corresponding files
that are uploaded to HDFS by commands. For tests that do need to use the
file paths of managed table files, we do a listing on the table dir to
get the file names, instead of hard-coding the file paths.
Updated chars-formats.orc to contain column names in the file so can be
used in more tests. The original one only has names like col0, col1,
col2.
Tests:
- Run CORE tests
Change-Id: Ie3136ee90e2444c4a12f0f2e1470fca1d5deaba0
Reviewed-on: http://gerrit.cloudera.org:8080/16441
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes the querying of iceberg table through impala,
we can use the following sql to create an external iceberg table:
CREATE EXTERNAL TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg_file_format'='parquet');
Or just including table name and location like this:
CREATE EXTERNAL TABLE default.iceberg_test
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg_file_format'='parquet');
'iceberg_file_format' is the file format in iceberg, currently only
support PARQUET, other format would be supported in the future. And
if you don't specify this property in your SQL, default file format
is PARQUET.
We achieved this function by treating the iceberg table as normal
unpartitioned hdfs table. When querying iceberg table, we pushdown
partition column predicates to iceberg to decide which data files
need to be scanned, and then transfer this information to BE to
do the real scan operation.
Testing:
- Unit test for Iceberg in FileMetadataLoaderTest
- Create table tests in functional_schema_template.sql
- Iceberg table query test in test_scanners.py
Change-Id: I856cfee4f3397d1a89cf17650e8d4fbfe1f2b006
Reviewed-on: http://gerrit.cloudera.org:8080/16143
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Currently Impala checks file metadata 'hive.acid.version' to decide the
full ACID schema. There are cases when Hive forgets to set this value
for full ACID files, e.g. query-based compactions.
So it's more robust to check the schema elements instead of the metadata
field. Also, sometimes Hive write the schema with different character
cases, e.g. originalTransaction vs originaltransaction, so we should
rather compare the column names in a case insensitive way.
Testing:
* added test for full ACID compaction
* added test_full_acid_schema_without_file_metadata_tag to test full
ACID file without metadata 'hive.acid.version'
Change-Id: I52642c1755599efd28fa2c90f13396cfe0f5fa14
Reviewed-on: http://gerrit.cloudera.org:8080/16383
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches KLL
sketches produced by ds_kll_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_kll_quantile(ds_kll_union(sketch_col), 0.5)
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_kll_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_kll_union() on those sketches.
Change-Id: I020aea28d36f9b6ef9fb57c08411f2170f5c24bf
Reviewed-on: http://gerrit.cloudera.org:8080/16267
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
ds_kll_sketch() is an aggregate function that receives a float
parameter (e.g. a float column of a table) and returns a serialized
Apache DataSketches KLL sketch of the input data set wrapped into
STRING type. This sketch can be saved into a table or view and later
used for quantile approximations. ds_kll_quantile() receives two
parameters: a STRING parameter that contains a serialized KLL sketch
and a DOUBLE that represents the rank of the quantile in the range of
[0,1]. E.g. rank=0.1 means the approximate value in the sketch where
10% of the sketched items are less than or equals to this value.
Testing:
- Added automated tests on small data sets to check the basic
functionality of sketching and getting a quantile approximate.
- Tested on TPCH25_parquet.lineitem to check that sketching and
approximating works on bigger scale as well where serialize/merge
phases are also required. On this scale the error range of the
quantile approximation is within 1-1.5%
Change-Id: I11de5fe10bb5d0dd42fb4ee45c4f21cb31963e52
Reviewed-on: http://gerrit.cloudera.org:8080/16235
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of sketches produced by ds_hll_sketch()
and merges them into a single sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_hll_estimate(ds_hll_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Note, currently there is a known limitation of unioning string types
where some input sketches come from Impala and some from Hive. In
this case if there is an overlap in the input data used by Impala and
by Hive this overlapping data is still counted twice due to some
string representation difference between Impala and Hive.
For more details see:
https://issues.apache.org/jira/browse/IMPALA-9939
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_hll_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_hll_union() on those sketches.
Change-Id: I67cdbf6f3ebdb1296fea38465a15642bc9612d09
Reviewed-on: http://gerrit.cloudera.org:8080/16095
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Removed the support for dateless timestamps.
During dateless timestamp casts if the format doesn't contain
date part we get an error during tokenization of the format.
If the input str doesn't contain a date part then we get null result.
Examples:
select cast('01:02:59' as timestamp);
This will come back as NULL value.
select to_timestamp('01:01:01', 'HH:mm:ss');
select cast('01:02:59' as timestamp format 'HH12:MI:SS');
select cast('12 AM' as timestamp FORMAT 'AM.HH12');
These will come back with a parsing errors.
Casting from a table will generate similar results.
Testing:
Modified the previous tests related to dateless timestamps.
Added test to read fromtables which are still containing dateless
timestamps and covered timestamp to string path when no date tokens
are requested in the output string.
Change-Id: I48c49bf027cc4b917849b3d58518facba372b322
Reviewed-on: http://gerrit.cloudera.org:8080/15866
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
These functions can be used to get cardinality estimates of data
using HLL algorithm from Apache DataSketches. ds_hll_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized HLL sketch in string format. This can be written to a
table or be fed directly to ds_hll_estimate() that returns the
cardinality estimate for that sketch.
Comparing to ndv() these functions bring more flexibility as once we
fed data to the sketch it can be written to a table and next time we
can save scanning through the dataset and simply return the estimate
using the sketch. This doesn't come for free, however, as perfomance
measurements show that ndv() is 2x-3.5x faster than sketching. On the
other hand if we query the estimate from an existing sketch then the
runtime is negligible.
Another flexibility with these sketches is that they can be merged
together so e.g. if we had saved a sketch for each of the partitions
of a table then they can be combined with each other based on the
query without touching the actual data.
DataSketches HLL is sensitive for the order of the data fed to the
sketch and as a result running these algorithms in Impala gets
non-deterministic results within the error bounds of the algorithm.
In terms of correctness DataSketches HLL is most of the time in 2%
range from the correct result but there are occasional spikes where
the difference is bigger but never goes out of the range of 5%.
Even though the DataSketches HLL algorithm could be parameterized
currently this implementation hard-codes these parameters and use
HLL_4 and lg_k=12.
For more details about Apache DataSketches' HLL implementation see:
https://datasketches.apache.org/docs/HLL/HLL.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on TPCH25.lineitem to compare perfomance with
ndv(). Depending on data characteristics ndv() appears 2x-3.5x
faster. The lower the cardinality of the dataset the bigger the
difference between the 2 algorithms is.
- Ran manual tests on TPCH25.lineitem and
functional_parquet.alltypes to compare correctness with ndv(). See
results above.
Change-Id: Ic602cb6eb2bfbeab37e5e4cba11fbf0ca40b03fe
Reviewed-on: http://gerrit.cloudera.org:8080/16000
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
"Original files" are files that don't have full ACID schema. We can see
such files if we upgrade a non-ACID table to full ACID. Also, the LOAD
DATA statement can load non-ACID files into full ACID tables. So such
files don't store special ACID columns, that means we need
to auto-generate their values. These are (operation,
originalTransaction, bucket, rowid, and currentTransaction).
With the exception of 'rowid', all of them can be calculated based on
the file path, so I add their values to the scanner's template tuple.
'rowid' is the ordinal number of the row inside a bucket inside a
directory. For now Impala only allows one file per bucket per
directory. Therefore we can generate row ids for each file
independently.
Multiple files in a single bucket in a directory can only be present if
the table was non-transactional earlier and we upgraded it to full ACID
table. After the first compaction we should only see one original file
per bucket per directory.
In HdfsOrcScanner we calculate the first row id for our split then
the OrcStructReader fills the rowid slot with the proper values.
Testing:
* added e2e tests to check if the generated values are correct
* added e2e test to reject tables that have multiple files per bucket
* added unit tests to the new auxiliary functions
Change-Id: I176497ef9873ed7589bd3dee07d048a42dfad953
Reviewed-on: http://gerrit.cloudera.org:8080/16001
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This adds support for reading Parquet files where the
DECIMAL is encoded as a FIXED_LEN_BYTE_ARRAY field
with extra padding. This requires loosening file
validation and fixing up the decoding so that it
no longer assumes that the in-memory value is at
least as large as the encoded representation.
The decimal decoding logic was reworked so that we
could add the extra condition handling without regressing
performance of the decoding logic in the common case. In
the end I was able to significantly speed up the decoding
logic. The bottleneck, revealed by perf record while running
the below benchmark, was CPU stalls on the bitshift used for
sign extension instruction waiting on loading the result of
ByteSwap(). I worked around this by doing the sign-extension
before the ByteSwap(),
Perf:
Ran a microbenchmark to check that scanning perf didn't regress
as a result of the change. The query scans a DECIMAL column
that is mostly plain-encoded, so to maximally stress the
FIXED_LEN_BYTE_ARRAY decoding performance.
set mt_dop=1;
set num_nodes=1;
select min(l_extendedprice)
from tpch_parquet.lineitem
The SCAN time in the summary averaged out to 94ms before
the change and is reduced to 74ms after the change. The
actual speedup of the DECIMAL decoding is greater - it
went from ~20% of time in to ~6% of time as measured by
perf.
Testing:
Added a couple of parquet files that were generated with a
hacked version of Impala to have extra padding. Sanity-checked
that hacked tables returned the same results on Hive. The
tests failed before this code change.
Ran exhaustive tests with the hacked version of Impala
(so that all decimal tables got extra padding).
Change-Id: I2700652eab8ba7f23ffa75800a1712d310d4e1ec
Reviewed-on: http://gerrit.cloudera.org:8080/16090
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Minor compactions can compact several delta directories into a single
delta directory. The current directory filtering algorithm had to be
modified to handle minor compacted directories and prefer those over
plain delta directories. This happens in the Frontend, mostly in
AcidUtils.java.
Hive Streaming Ingestion writes similar delta directories, but they
might contain rows Impala cannot see based on its valid write id list.
E.g. we can have the following delta directory:
full_acid/delta_0000001_0000010/0000 # minWriteId: 1
# maxWriteId: 10
This delta dir contains rows with write ids between 1 and 10. But maybe
we are only allowed to see write ids less than 5. Therefore we need to
check the ACID write id column (named originalTransaction) to determine
which rows are valid.
Delta directories written by Hive Streaming don't have a visibility txn
id, so we can recognize them based on the directory name. If there's
a visibilityTxnId and it is committed => every row is valid:
full_acid/delta_0000001_0000010_v01234 # has visibilityTxnId
# every row is valid
If there's no visibilityTxnId then it was created via Hive Streaming,
therefore we need to validate rows. Fortunately Hive Streaming writes
rows with different write ids into different ORC stripes, therefore we
don't need to validate the write id per row. If we had statistics,
we could validate per stripe, but since Hive Streaming doesn't write
statistics we validate the write id per ORC row batch (an alternative
could be to do a 2-pass read, first we'd read a single value from each
stripe's 'currentTransaction' field, then we'd read the stripe if the
write id is valid).
Testing
* the frontend logic is tested in AcidUtilsTest
* the backend row validation is tested in test_acid_row_validation
Change-Id: I5ed74585a2d73ebbcee763b0545be4412926299d
Reviewed-on: http://gerrit.cloudera.org:8080/15818
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In this patch, we add support for reading zstd encoded text files.
This includes:
1. support reading zstd file written by Hive which uses streaming.
2. support reading zstd file compressed by standard zstd library which
uses block.
To support decompressing both formats, a function ProcessBlockStreaming
is added in zstd decompressor.
Testing done:
Added two backend tests:
1. streaming decompress test.
2. large data test for both block and streaming decompress.
Added two end to end tests:
1. hive and impala integration. For four compression codecs, write in
hive and read from impala.
2. zstd library and impala integration. Copy a zstd lib compressed file
to HDFS, and read from impala.
Change-Id: I2adce9fe00190558525fa5cd3d50cf5e0f0b0aa4
Reviewed-on: http://gerrit.cloudera.org:8080/15023
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Hudi Read Optimized Table contains multiple versions of parquet files,
in order to load the table correctly, Impala needs to recognize Hudi Read
Optimized Table as a HdfsTable and load the latest version of the file
using HoodieROTablePathFilter.
Tests
- Unit test for Hudi in FileMetadataLoader
- Create table tests in functional_schema_template.sql
- Query tests in hudi-parquet.test
Change-Id: I65e146b347714df32fe968409ef2dde1f6a25cdf
Reviewed-on: http://gerrit.cloudera.org:8080/14711
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Implements read path for the date type in ORC scanner. The internal
representation of a date is an int32 meaning the number of days since
Unix epoch using proleptic Gregorian calendar.
Similarly to the Parquet implementation (IMPALA-7370) this
representation introduces an interoperability issue between Impala
and older versions of Hive (before 3.1). For more details see the
commit message of the mentioned Parquet implementation.
Change-Id: I672a2cdd2452a46b676e0e36942fd310f55c4956
Reviewed-on: http://gerrit.cloudera.org:8080/14982
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>