When Hive (and probably other engines as well) converts a legacy Hive
table to Iceberg it doesn't rewrite the data files. It means that the
data files don't have write ids neither partition column data. Currently
Impala expects the partition columns to be present in the data files,
so it is not able to read converted partitioned tables.
With this patch Impala loads partition values from the Iceberg metadata.
The extra metadata information is attached to the file descriptor
objects and propageted to the scanners. This metadata contains the
Iceberg data file format (later it could be used to handle mixed-format
tables), and partition data.
We use the partition data in the HdfsScanner to create the template
tuple that contains the partition values of identity-partitioned
columns. This is not only true to migrated tables, but all Iceberg
tables with identity partitions, which means we also save some IO
and CPU time for such columns. The partition information could also
be used for Dynamic Partition Pruning later.
We use the (human-readable) string representation of the partition data
when storing them in the flat buffers. This helps debugging, also
it provides the needed flexibility when the partition columns
evolve (e.g. INT -> BIGINT, DECIMAL(4,2) -> DECIMAL(6,2)).
Testing
* e2e test for all data types that can be used to partition a table
* e2e test for migrated partitioned table + schema evolution (without
renaming columns)
* e2e for table where all columns are used as identity-partitions
Change-Id: Iac11a02de709d43532056f71359c49d20c1be2b8
Reviewed-on: http://gerrit.cloudera.org:8080/18240
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Added default 0 for scale if it is not set to comply with parquet spec.
Wrapped reading scale and precision in a function to support reading
LogicalType.DecimalType if it is set, falling back to old ones if it is
not, for backward compatibility.
Regenerated bad_parquet_decimals table with filled DecimalType, moved
missing scale test, as it is no longer a bad table.
Added no_scale.parquet table to test reading table without set scale.
Checked it with parquet-tools:
message schema {
optional fixed_len_byte_array(2) d1 (DECIMAL(4,0));
}
Change-Id: I003220b6e2ef39d25d1c33df62c8432803fdc6eb
Reviewed-on: http://gerrit.cloudera.org:8080/18224
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala crashes on a Parquet file that contains the partition columns.
Data files usually don't contain the partition columns, so Impala don't
expect to find such columns in the data files. Unfortunately min/max
filtering generates a SEGFAULT when the partition column is present in
the data files.
It happens when FindSkipRangesForPagesWithMinMaxFilters() tries to
retrieve the Parquet schema element for a given slot descriptor. When
the slot descriptor refers to a partition column, we usually don't find
a schema element so we don't try to skip pages.
But when the partition column is present in the data file, the code
tries to calculate the filtered pages for the column. It uses the column
reader object corresponding to the column, but this is NULL for
partition columns, hence we get a SEGFAULT.
The code shouldn't do anything at the page-level for partition columns,
as the data in such columns are the same for the whole file and it is
already filtered at a higher level.
Testing:
* added e2e test
Change-Id: I17eff4467da3fd67a21353ba2d52d3bec405acd2
Reviewed-on: http://gerrit.cloudera.org:8080/18265
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala returns "Couldn't skip rows in file" error for old Parquet
file written by an old Impala (e.g. Impala 2.5, 2.6) In DEBUG build
Impala crashes by a DCHECK:
Check failed: num_buffered_values_ > 0 (-1 vs. 0)
The problem is that in some old Parquet files there can be a mismatch
between 'num_values' in a page and the encoded def/rep levels.
There is usually one more def/rep levels encoded in these files.
In SkipTopLevelRows() we skipped values based on how many def levels are
92ce6fe48e/be/src/exec/parquet/parquet-column-readers.cc (L1308-L1314)
Since there are more def levels than values in some old files,
num_buferred_values_ could become negative.
This patch also takes the value of num_buferred_values_ into account
when calculating 'read_count', so we can deal with such files. With
this patch we also include the column name in the "Couldn't skip rows"
error message, so in the future it'll be easier to identify the
problematic columns.
Testing:
* added Parquet file written by Impala 2.5 and e2e test for it
Change-Id: I568fe59df720ea040be4926812412ba4c1510a26
Reviewed-on: http://gerrit.cloudera.org:8080/18257
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds an end-to-end test to validate and characterize HMS'
behavior with respect to external table creation after HIVE-25569 via
which a user is allowed to create an external table associated with a
single file.
Change-Id: Ia4f57f07a9f543c660b102ebf307a6cf590a6784
Reviewed-on: http://gerrit.cloudera.org:8080/18033
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Aman Sinha <amsinha@cloudera.com>
This patch provides an unnest implementation for arrays where unnesting
multiple arrays in one query results the items of the arrays being
zipped together instead of joining. There are two different syntaxes
introduced for this purpose:
1: ISO:SQL 2016 compliant syntax:
SELECT a1.item, a2.item
FROM complextypes_arrays t, UNNEST(t.arr1, t.arr2) AS (a1, a2);
2: Postgres compatible syntax:
SELECT UNNEST(arr1), UNNEST(arr2) FROM complextypes_arrays;
Let me show the expected behaviour through the following example:
Inputs: arr1: {1,2,3}, arr2: {11, 12}
After running any of the above queries we expect the following output:
===============
| arr1 | arr2 |
===============
| 1 | 11 |
| 2 | 12 |
| 3 | NULL |
===============
Expected behaviour:
- When unnesting multiple arrays with zipping unnest then the 'i'th
item of one array will be put next to the 'i'th item of the other
arrays in the results.
- In case the size of the arrays is not the same then the shorter
arrays will be filled with NULL values up to the size of the longest
array.
On a sidenote, UNNEST is added to Impala's SQL language as a new
keyword. This might interfere with use cases where a resource (db,
table, column, etc.) is named "UNNEST".
Restrictions:
- It is not allowed to have WHERE filters on an unnested item of an
array in the same SELECT query. E.g. this is not allowed:
SELECT arr1.item
FROM complextypes_arrays t, UNNEST(t.arr1) WHERE arr1.item < 5;
Note, that it is allowed to have an outer SELECT around the one
doing unnests and have a filter there on the unnested items.
- If there is an outer SELECT filtering on the unnested array's items
from the inner SELECT then these predicates won't be pushed down to
the SCAN node. They are rather evaluated in the UNNEST node to
guarantee result correctness after unnesting.
Note, this restriction is only active when there are multiple arrays
being unnested, or in other words when zipping unnest logic is
required to produce results.
- It's not allowed to do a zipping and a (traditional) joining unnest
together in one SELECT query.
- It's not allowed to perform zipping unnests on arrays from different
tables.
Testing:
- Added a bunch of E2E tests to the test suite to cover both syntaxes.
- Did a manual test run on a table with 1000 rows, 3 array columns
with size of around 5000 items in each array. I did an unnest on all
three arrays in one query to see if there are any crashes or
suspicious slowness when running on this scale.
Change-Id: Ic58ff6579ecff03962e7a8698edfbe0684ce6cf7
Reviewed-on: http://gerrit.cloudera.org:8080/17983
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When a regular Parquet/ORC table is converted to Iceberg via Hive,
only the Iceberg metadata files need to be created. The data files
can stay in place.
This causes problems when the data files don't have field ids for
the schema elements. Currently Impala resolves columns in data
files based on Iceberg field ids, but since they are missing,
Impala raises an error or returns NULLs.
With this patch Impala falls back to the default column resolution
strategy when the data files lack field ids.
Testing:
* added e2e tests both for Parquet and ORC
Change-Id: I85881b09891c7bd101e7a96e92561b70bbe5af41
Reviewed-on: http://gerrit.cloudera.org:8080/17953
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-7087 is about reading Parquet decimal columns with lower
precision/scale than table metadata.
IMPALA-8131 is about reading Parquet decimal columns with higher
scale than table metadata.
Both are resolved by this patch. It reuses some parts from an
earlier change request from Sahil Takiar:
https://gerrit.cloudera.org/#/c/12163/
A new utility class has been introduced, ParquetDataConverter which does
the data conversion. It also helps to decide whether data conversion
is needed or not.
NULL values are returned in case of overflows. This behavior is
consistent with Hive.
Parquet column stats reader is also updated to convert the decimal
values. The stats reader is used to evaluate min/max conjuncts. It
works well because later we also evaluate the conjuncts on the
converted values anyway.
The status of different filterings:
* dictionary filtering: disabled for columns that need conversion
* runtime bloom filters: work on the converted values
* runtime min/max filters: work on the converted values
This patch also enables schema evolution of decimal columns of Iceberg
tables.
Testing:
* added e2e tests
Change-Id: Icefa7e545ca9f7df1741a2d1225375ecf54434da
Reviewed-on: http://gerrit.cloudera.org:8080/17678
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This change adds read support for Parquet Bloom filters for types that
can reasonably be supported in Impala. Other types, such as CHAR(N),
would be very difficult to support because the length may be different
in Parquet and in Impala which results in truncation or padding, and
that changes the hash which makes using the Bloom filter impossible.
Write support will be added in a later change.
The supported Parquet type - Impala type pairs are the following:
---------------------------------------
|Parquet type | Impala type |
|---------------------------------------|
|INT32 | TINYINT, SMALLINT, INT |
|INT64 | BIGINT |
|FLOAT | FLOAT |
|DOUBLE | DOUBLE |
|BYTE_ARRAY | STRING |
---------------------------------------
The following types are not supported for the given reasons:
----------------------------------------------------------------
|Impala type | Problem |
|----------------------------------------------------------------|
|VARCHAR(N) | truncation can change hash |
|CHAR(N) | padding / truncation can change hash |
|DECIMAL | multiple encodings supported |
|TIMESTAMP | multiple encodings supported, timezone conversion |
|DATE | not considered yet |
----------------------------------------------------------------
Support may be added for these types later, see IMPALA-10641.
If a Bloom filter is available for a column that is fully dictionary
encoded, the Bloom filter is not used as the dictionary can give exact
results in filtering.
Testing:
- Added tests/query_test/test_parquet_bloom_filter.py that tests
whether Parquet Bloom filtering works for the supported types and
that we do not incorrectly discard row groups for the unsupported
type VARCHAR. The Parquet file used in the test was generated with
an external tool.
- Added unit tests for ParquetBloomFilter in file
be/src/util/parquet-bloom-filter-test.cc
- A minor, unrelated change was done in
be/src/util/bloom-filter-test.cc: the MakeRandom() function had
return type uint64_t, the documentation claimed it returned a 64 bit
random number, but the actual number of random bits is 32, which is
what is intended in the tests. The return type and documentation
have been corrected to use 32 bits.
Change-Id: I7119c7161fa3658e561fc1265430cb90079d8287
Reviewed-on: http://gerrit.cloudera.org:8080/17026
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Csaba Ringhofer <csringhofer@cloudera.com>
Currently the ORC scanner only supports position-based column
resolution. This patch adds Iceberg field-id based column resolution
which will be the default for Iceberg tables. It is needed to support
schema evolution in the future, i.e. ALTER TABLE DROP/RENAME COLUMNS.
(The Parquet scanner already supports Iceberg field-id based column
resolution)
Testing
* added e2e test 'iceberg-orc-field-id.test' by copying the contents of
nested-types-scanner-basic,
nested-types-scanner-array-materialization,
nested-types-scanner-position,
nested-types-scanner-maps,
and executing the queries on an Iceberg table with ORC data files
Change-Id: Ia2b1abcc25ad2268aa96dff032328e8951dbfb9d
Reviewed-on: http://gerrit.cloudera.org:8080/17398
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches CPC
sketches produced by ds_cpc_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_cpc_estimate(ds_cpc_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_cpc_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_cpc_union() on those sketches
Change-Id: Ib94b45ae79efcc11adc077dd9df9b9868ae82cb6
Reviewed-on: http://gerrit.cloudera.org:8080/17372
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
These functions can be used to get cardinality estimates of data
using CPC algorithm from Apache DataSketches. ds_cpc_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized CPC sketch in string format. This can be written to a
table or be fed directly to ds_cpc_estimate() that returns the
cardinality estimate for that sketch.
Similar to the HLL sketch, the primary use-case for the CPC sketch
is for counting distinct values as a stream, and then merging
multiple sketches together for a total distinct count.
For more details about Apache DataSketches' CPC see:
http://datasketches.apache.org/docs/CPC/CPC.html
Figures-of-Merit Comparison of the HLL and CPC Sketches see:
https://datasketches.apache.org/docs/DistinctCountMeritComparisons.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on tpch_parquet.lineitem to compare perfomance
with ndv(). Depending on data characteristics ndv() appears 2x-3x
faster. CPC gives closer estimate than current ndv(). CPC is more
accurate than HLL in some cases
Change-Id: I731e66fbadc74bc339c973f4d9337db9b7dd715a
Reviewed-on: http://gerrit.cloudera.org:8080/16656
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
ORC-189 and ORC-666 added support for a new timestamp type
'TIMESTMAP WITH LOCAL TIMEZONE' to the Orc library.
This patch adds support for reading such timestamps with Impala.
These are UTC-normalized timestamps, therefore we convert them
to local timezone during scanning.
Testing:
* added test for CREATE TABLE LIKE ORC
* added scanner tests to test_scanners.py
Change-Id: Icb0c6a43ebea21f1cba5b8f304db7c4bd43967d9
Reviewed-on: http://gerrit.cloudera.org:8080/17347
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches Theta
sketches produced by ds_theta_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_theta_estimate(ds_theta_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_theta_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_theta_union() on those sketches
Change-Id: I91baf58c76eb43748acd5245047edac8c66761b2
Reviewed-on: http://gerrit.cloudera.org:8080/17048
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
These functions can be used to get cardinality estimates of data
using Theta algorithm from Apache DataSketches. ds_theta_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized Theta sketch in string format. This can be written to a
table or be fed directly to ds_theta_estimate() that returns the
cardinality estimate for that sketch.
Similar to the HLL sketch, the primary use-case for the Theta sketch
is for counting distinct values as a stream, and then merging
multiple sketches together for a total distinct count.
For more details about Apache DataSketches' Theta see:
https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on tpch25_parquet.lineitem to compare perfomance
with ds_hll_*. ds_theta_* is faster than ds_hll_* on the original
data, the difference is around 1%-10%. ds_hll_estimate() is faster
than ds_theta_estimate() on existing sketch. HLL and Theta gives
closer estimate except for string. see IMPALA-10464.
Change-Id: I14f24c16b815eec75cf90bb92c8b8b0363dcbfbc
Reviewed-on: http://gerrit.cloudera.org:8080/17008
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
We supported resolve column by field id for Iceberg table in this
patch. Currently, we use field id to resolve column for Iceberg
tables, which means 'PARQUET_FALLBACK_SCHEMA_RESOLUTION' is invalid
for Iceberg tables.
Change-Id: I057bdc6ab2859cc4d40de5ed428d0c20028b8435
Reviewed-on: http://gerrit.cloudera.org:8080/16788
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
The DESCRIBE HISTORY works for Iceberg tables and displays the
snapshot history of the table.
An example output:
DESCRIBE HISTORY iceberg_multi_snapshots;
+----------------------------+---------------------+---------------------+---------------------+
| creation_time | snapshot_id | parent_id | is_current_ancestor |
+----------------------------+---------------------+---------------------+---------------------+
| 2020-10-13 14:01:07.234000 | 4400379706200951771 | NULL | TRUE |
| 2020-10-13 14:01:19.307000 | 4221472712544505868 | 4400379706200951771 | TRUE |
+----------------------------+---------------------+---------------------+---------------------+
The purpose here was to have similar output with this new feature as
what SparkSql returns for "SELECT * from tablename.history".
See "History" section of
https://iceberg.apache.org/spark/#inspecting-tables
Testing:
- iceberg-negative.test was extended to check that DESCRIBE HISTORY
is not applicable for non-Iceberg tables.
- iceberg-table-history.test: Covers basic usage of DESCRIBE
HISTORY. Tests on tables created with Impala and also with Spark.
Change-Id: I56a4b92c27e8e4a79109696cbae62735a00750e5
Reviewed-on: http://gerrit.cloudera.org:8080/16599
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: wangsheng <skyyws@163.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In practice we recommend that hdfs block size should align with parquet
row group size.But in fact some compute engine like spark, default
parquet row group size is 128MB, and if ETL user doesn't change the
default property spark will generate row groups that smaller than hdfs
block size. The result is a single hdfs block may contain multiple
parquet row groups.
In planner stage, length of impala generated scan range may be bigger
than row group size. thus a single scan range contains multiple row
group. In current parquet scanner when move to next row group, some of
internal stat in parquet column readers need to reset.
eg: num_buffered_values_, column chunk metadata, reset internal stat of
column chunk readers. But current_row_range_ offset is not reset
currently, this will cause errors
"Couldn't skip rows in file hdfs://xxx" as IMPALA-10310 points out.
This patch simply reset current_row_range_ to 0 when moving into next
row group in parquet column readers. Fix the bug IMPALA-10310.
Testing:
* Add e2e test for parquet multi blocks per file and multi pages
per block
* Ran all core tests offline.
* Manually tested all cases encountered in my production environment.
Change-Id: I964695cd53f5d5fdb6485a85cd82e7a72ca6092c
Reviewed-on: http://gerrit.cloudera.org:8080/16697
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes querying Iceberg table with ORC
file format. We can using following SQL to create table with
ORC file format:
CREATE TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.tables');
But pay attention, there still some problems when scan ORC files
with Timestamp, more details please refer IMPALA-9967. We may add
new tests with Timestmap type after this JIRA fixed.
Testing:
- Create table tests in functional_schema_template.sql
- Iceberg table create test in test_iceberg.py
- Iceberg table query test in test_scanners.py
Change-Id: Ib579461aa57348c9893a6d26a003a0d812346c4d
Reviewed-on: http://gerrit.cloudera.org:8080/16568
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
As IMPALA-4371 and IMPALA-10186 points out, Impala might write
empty data pages. It usually does that when it has to write a bigger
page than the current page size. If we really need to write empty data
pages is a different question, but we need to handle them correctly
as there are already such files out there.
The corresponding Parquet offset index entries to empty data pages
are invalid PageLocation objects with 'compressed_page_size' = 0.
Before this commit Impala didn't ignore the empty page locations, but
generated a warning. Since invalid page index doesn't fail a scan
by default, Impala continued scanning the file with semi-initialized
page filtering. This resulted in 'Top level rows aren't in sync'
error, or a crash in DEBUG builds.
With this commit Impala ignores empty data pages and still able to
filter the rest of the pages. Also, if the page index is corrupt
for some other reason, Impala correctly resets the page filtering
logic and falls back to regular scanning.
Testing:
* Added unit test for empty data pages
* Added e2e test for empty data pages
* Added e2e test for invalid page index
Change-Id: I4db493fc7c383ed5ef492da29c9b15eeb3d17bb0
Reviewed-on: http://gerrit.cloudera.org:8080/16503
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes creating Iceberg table by HadoopCatalog.
We only supported HadoopTables api before this patch, but now we can
use HadoopCatalog to create Iceberg table. When creating managed table,
we can use SQL like this:
CREATE TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test');
We supported two values ('hadoop.catalog', 'hadoop.tables') for
'iceberg.catalog' now. If you don't specify this property in your SQL,
default catalog type is 'hadoop.catalog'.
As for external Iceberg table, you can use SQL like this:
CREATE EXTERNAL TABLE default.iceberg_test_external
STORED AS ICEBERG
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test',
'iceberg.table_identifier'='default.iceberg_test');
We cannot set table location for both managed and external Iceberg
table with 'hadoop.catalog', and 'SHOW CREATE TABLE' will not display
table location yet. We need to use 'DESCRIBE FORMATTED/EXTENDED' to
get this location info.
'iceberg.catalog_location' is necessary for 'hadoop.catalog' table,
which used to reserved Iceberg table metadata and data, and we use this
location to load table metadata from Iceberg.
'iceberg.table_identifier' is used for Icebreg TableIdentifier.If this
property not been specified in SQL, Impala will use database and table name
to load Iceberg table, which is 'default.iceberg_test_external' in above SQL.
This property value is splitted by '.', you can alse set this value like this:
'org.my_db.my_tbl'. And this property is valid for both managed and external
table.
Testing:
- Create table tests in functional_schema_template.sql
- Iceberg table create test in test_iceberg.py
- Iceberg table query test in test_scanners.py
- Iceberg table show create table test in test_show_create_table.py
Change-Id: Ic1893c50a633ca22d4bca6726c9937b026f5d5ef
Reviewed-on: http://gerrit.cloudera.org:8080/16446
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Some tests (e.g. AnalyzeDDLTest.TestCreateTableLikeFileOrc) depend on
hard-coded file paths of managed tables, assuming that there is always a
file named 'base_0000001/bucket_00000_0' under the table dir. However,
the file name is in the form of bucket_${bucket-id}_${attempt-id}. The
last part of the file name is not guaranteed to be 0. If the first
attempt fails and the second attempt succeeds, the file name will be
bucket_00000_1.
This patch replaces these hard-coded file paths to corresponding files
that are uploaded to HDFS by commands. For tests that do need to use the
file paths of managed table files, we do a listing on the table dir to
get the file names, instead of hard-coding the file paths.
Updated chars-formats.orc to contain column names in the file so can be
used in more tests. The original one only has names like col0, col1,
col2.
Tests:
- Run CORE tests
Change-Id: Ie3136ee90e2444c4a12f0f2e1470fca1d5deaba0
Reviewed-on: http://gerrit.cloudera.org:8080/16441
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch mainly realizes the querying of iceberg table through impala,
we can use the following sql to create an external iceberg table:
CREATE EXTERNAL TABLE default.iceberg_test (
level string,
event_time timestamp,
message string,
)
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg_file_format'='parquet');
Or just including table name and location like this:
CREATE EXTERNAL TABLE default.iceberg_test
STORED AS ICEBERG
LOCATION 'hdfs://xxx'
TBLPROPERTIES ('iceberg_file_format'='parquet');
'iceberg_file_format' is the file format in iceberg, currently only
support PARQUET, other format would be supported in the future. And
if you don't specify this property in your SQL, default file format
is PARQUET.
We achieved this function by treating the iceberg table as normal
unpartitioned hdfs table. When querying iceberg table, we pushdown
partition column predicates to iceberg to decide which data files
need to be scanned, and then transfer this information to BE to
do the real scan operation.
Testing:
- Unit test for Iceberg in FileMetadataLoaderTest
- Create table tests in functional_schema_template.sql
- Iceberg table query test in test_scanners.py
Change-Id: I856cfee4f3397d1a89cf17650e8d4fbfe1f2b006
Reviewed-on: http://gerrit.cloudera.org:8080/16143
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Currently Impala checks file metadata 'hive.acid.version' to decide the
full ACID schema. There are cases when Hive forgets to set this value
for full ACID files, e.g. query-based compactions.
So it's more robust to check the schema elements instead of the metadata
field. Also, sometimes Hive write the schema with different character
cases, e.g. originalTransaction vs originaltransaction, so we should
rather compare the column names in a case insensitive way.
Testing:
* added test for full ACID compaction
* added test_full_acid_schema_without_file_metadata_tag to test full
ACID file without metadata 'hive.acid.version'
Change-Id: I52642c1755599efd28fa2c90f13396cfe0f5fa14
Reviewed-on: http://gerrit.cloudera.org:8080/16383
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of serialized Apache DataSketches KLL
sketches produced by ds_kll_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_kll_quantile(ds_kll_union(sketch_col), 0.5)
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_kll_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_kll_union() on those sketches.
Change-Id: I020aea28d36f9b6ef9fb57c08411f2170f5c24bf
Reviewed-on: http://gerrit.cloudera.org:8080/16267
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
ds_kll_sketch() is an aggregate function that receives a float
parameter (e.g. a float column of a table) and returns a serialized
Apache DataSketches KLL sketch of the input data set wrapped into
STRING type. This sketch can be saved into a table or view and later
used for quantile approximations. ds_kll_quantile() receives two
parameters: a STRING parameter that contains a serialized KLL sketch
and a DOUBLE that represents the rank of the quantile in the range of
[0,1]. E.g. rank=0.1 means the approximate value in the sketch where
10% of the sketched items are less than or equals to this value.
Testing:
- Added automated tests on small data sets to check the basic
functionality of sketching and getting a quantile approximate.
- Tested on TPCH25_parquet.lineitem to check that sketching and
approximating works on bigger scale as well where serialize/merge
phases are also required. On this scale the error range of the
quantile approximation is within 1-1.5%
Change-Id: I11de5fe10bb5d0dd42fb4ee45c4f21cb31963e52
Reviewed-on: http://gerrit.cloudera.org:8080/16235
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This function receives a set of sketches produced by ds_hll_sketch()
and merges them into a single sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_hll_estimate(ds_hll_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Note, currently there is a known limitation of unioning string types
where some input sketches come from Impala and some from Hive. In
this case if there is an overlap in the input data used by Impala and
by Hive this overlapping data is still counted twice due to some
string representation difference between Impala and Hive.
For more details see:
https://issues.apache.org/jira/browse/IMPALA-9939
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_hll_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_hll_union() on those sketches.
Change-Id: I67cdbf6f3ebdb1296fea38465a15642bc9612d09
Reviewed-on: http://gerrit.cloudera.org:8080/16095
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Removed the support for dateless timestamps.
During dateless timestamp casts if the format doesn't contain
date part we get an error during tokenization of the format.
If the input str doesn't contain a date part then we get null result.
Examples:
select cast('01:02:59' as timestamp);
This will come back as NULL value.
select to_timestamp('01:01:01', 'HH:mm:ss');
select cast('01:02:59' as timestamp format 'HH12:MI:SS');
select cast('12 AM' as timestamp FORMAT 'AM.HH12');
These will come back with a parsing errors.
Casting from a table will generate similar results.
Testing:
Modified the previous tests related to dateless timestamps.
Added test to read fromtables which are still containing dateless
timestamps and covered timestamp to string path when no date tokens
are requested in the output string.
Change-Id: I48c49bf027cc4b917849b3d58518facba372b322
Reviewed-on: http://gerrit.cloudera.org:8080/15866
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
These functions can be used to get cardinality estimates of data
using HLL algorithm from Apache DataSketches. ds_hll_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized HLL sketch in string format. This can be written to a
table or be fed directly to ds_hll_estimate() that returns the
cardinality estimate for that sketch.
Comparing to ndv() these functions bring more flexibility as once we
fed data to the sketch it can be written to a table and next time we
can save scanning through the dataset and simply return the estimate
using the sketch. This doesn't come for free, however, as perfomance
measurements show that ndv() is 2x-3.5x faster than sketching. On the
other hand if we query the estimate from an existing sketch then the
runtime is negligible.
Another flexibility with these sketches is that they can be merged
together so e.g. if we had saved a sketch for each of the partitions
of a table then they can be combined with each other based on the
query without touching the actual data.
DataSketches HLL is sensitive for the order of the data fed to the
sketch and as a result running these algorithms in Impala gets
non-deterministic results within the error bounds of the algorithm.
In terms of correctness DataSketches HLL is most of the time in 2%
range from the correct result but there are occasional spikes where
the difference is bigger but never goes out of the range of 5%.
Even though the DataSketches HLL algorithm could be parameterized
currently this implementation hard-codes these parameters and use
HLL_4 and lg_k=12.
For more details about Apache DataSketches' HLL implementation see:
https://datasketches.apache.org/docs/HLL/HLL.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on TPCH25.lineitem to compare perfomance with
ndv(). Depending on data characteristics ndv() appears 2x-3.5x
faster. The lower the cardinality of the dataset the bigger the
difference between the 2 algorithms is.
- Ran manual tests on TPCH25.lineitem and
functional_parquet.alltypes to compare correctness with ndv(). See
results above.
Change-Id: Ic602cb6eb2bfbeab37e5e4cba11fbf0ca40b03fe
Reviewed-on: http://gerrit.cloudera.org:8080/16000
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
"Original files" are files that don't have full ACID schema. We can see
such files if we upgrade a non-ACID table to full ACID. Also, the LOAD
DATA statement can load non-ACID files into full ACID tables. So such
files don't store special ACID columns, that means we need
to auto-generate their values. These are (operation,
originalTransaction, bucket, rowid, and currentTransaction).
With the exception of 'rowid', all of them can be calculated based on
the file path, so I add their values to the scanner's template tuple.
'rowid' is the ordinal number of the row inside a bucket inside a
directory. For now Impala only allows one file per bucket per
directory. Therefore we can generate row ids for each file
independently.
Multiple files in a single bucket in a directory can only be present if
the table was non-transactional earlier and we upgraded it to full ACID
table. After the first compaction we should only see one original file
per bucket per directory.
In HdfsOrcScanner we calculate the first row id for our split then
the OrcStructReader fills the rowid slot with the proper values.
Testing:
* added e2e tests to check if the generated values are correct
* added e2e test to reject tables that have multiple files per bucket
* added unit tests to the new auxiliary functions
Change-Id: I176497ef9873ed7589bd3dee07d048a42dfad953
Reviewed-on: http://gerrit.cloudera.org:8080/16001
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This adds support for reading Parquet files where the
DECIMAL is encoded as a FIXED_LEN_BYTE_ARRAY field
with extra padding. This requires loosening file
validation and fixing up the decoding so that it
no longer assumes that the in-memory value is at
least as large as the encoded representation.
The decimal decoding logic was reworked so that we
could add the extra condition handling without regressing
performance of the decoding logic in the common case. In
the end I was able to significantly speed up the decoding
logic. The bottleneck, revealed by perf record while running
the below benchmark, was CPU stalls on the bitshift used for
sign extension instruction waiting on loading the result of
ByteSwap(). I worked around this by doing the sign-extension
before the ByteSwap(),
Perf:
Ran a microbenchmark to check that scanning perf didn't regress
as a result of the change. The query scans a DECIMAL column
that is mostly plain-encoded, so to maximally stress the
FIXED_LEN_BYTE_ARRAY decoding performance.
set mt_dop=1;
set num_nodes=1;
select min(l_extendedprice)
from tpch_parquet.lineitem
The SCAN time in the summary averaged out to 94ms before
the change and is reduced to 74ms after the change. The
actual speedup of the DECIMAL decoding is greater - it
went from ~20% of time in to ~6% of time as measured by
perf.
Testing:
Added a couple of parquet files that were generated with a
hacked version of Impala to have extra padding. Sanity-checked
that hacked tables returned the same results on Hive. The
tests failed before this code change.
Ran exhaustive tests with the hacked version of Impala
(so that all decimal tables got extra padding).
Change-Id: I2700652eab8ba7f23ffa75800a1712d310d4e1ec
Reviewed-on: http://gerrit.cloudera.org:8080/16090
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Minor compactions can compact several delta directories into a single
delta directory. The current directory filtering algorithm had to be
modified to handle minor compacted directories and prefer those over
plain delta directories. This happens in the Frontend, mostly in
AcidUtils.java.
Hive Streaming Ingestion writes similar delta directories, but they
might contain rows Impala cannot see based on its valid write id list.
E.g. we can have the following delta directory:
full_acid/delta_0000001_0000010/0000 # minWriteId: 1
# maxWriteId: 10
This delta dir contains rows with write ids between 1 and 10. But maybe
we are only allowed to see write ids less than 5. Therefore we need to
check the ACID write id column (named originalTransaction) to determine
which rows are valid.
Delta directories written by Hive Streaming don't have a visibility txn
id, so we can recognize them based on the directory name. If there's
a visibilityTxnId and it is committed => every row is valid:
full_acid/delta_0000001_0000010_v01234 # has visibilityTxnId
# every row is valid
If there's no visibilityTxnId then it was created via Hive Streaming,
therefore we need to validate rows. Fortunately Hive Streaming writes
rows with different write ids into different ORC stripes, therefore we
don't need to validate the write id per row. If we had statistics,
we could validate per stripe, but since Hive Streaming doesn't write
statistics we validate the write id per ORC row batch (an alternative
could be to do a 2-pass read, first we'd read a single value from each
stripe's 'currentTransaction' field, then we'd read the stripe if the
write id is valid).
Testing
* the frontend logic is tested in AcidUtilsTest
* the backend row validation is tested in test_acid_row_validation
Change-Id: I5ed74585a2d73ebbcee763b0545be4412926299d
Reviewed-on: http://gerrit.cloudera.org:8080/15818
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In this patch, we add support for reading zstd encoded text files.
This includes:
1. support reading zstd file written by Hive which uses streaming.
2. support reading zstd file compressed by standard zstd library which
uses block.
To support decompressing both formats, a function ProcessBlockStreaming
is added in zstd decompressor.
Testing done:
Added two backend tests:
1. streaming decompress test.
2. large data test for both block and streaming decompress.
Added two end to end tests:
1. hive and impala integration. For four compression codecs, write in
hive and read from impala.
2. zstd library and impala integration. Copy a zstd lib compressed file
to HDFS, and read from impala.
Change-Id: I2adce9fe00190558525fa5cd3d50cf5e0f0b0aa4
Reviewed-on: http://gerrit.cloudera.org:8080/15023
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Hudi Read Optimized Table contains multiple versions of parquet files,
in order to load the table correctly, Impala needs to recognize Hudi Read
Optimized Table as a HdfsTable and load the latest version of the file
using HoodieROTablePathFilter.
Tests
- Unit test for Hudi in FileMetadataLoader
- Create table tests in functional_schema_template.sql
- Query tests in hudi-parquet.test
Change-Id: I65e146b347714df32fe968409ef2dde1f6a25cdf
Reviewed-on: http://gerrit.cloudera.org:8080/14711
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Implements read path for the date type in ORC scanner. The internal
representation of a date is an int32 meaning the number of days since
Unix epoch using proleptic Gregorian calendar.
Similarly to the Parquet implementation (IMPALA-7370) this
representation introduces an interoperability issue between Impala
and older versions of Hive (before 3.1). For more details see the
commit message of the mentioned Parquet implementation.
Change-Id: I672a2cdd2452a46b676e0e36942fd310f55c4956
Reviewed-on: http://gerrit.cloudera.org:8080/14982
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The root type should be struct as far as I know, and this was
checked with a DCHECK, leading to crashes in fuzz tests. This
change replaced the DCHECK with returning an error message.
Testing:
- added corrupt ORC file and e2e test
Change-Id: I7fba8cffbcdf8f647e27e2d5ee9e6716a4492b9b
Reviewed-on: http://gerrit.cloudera.org:8080/15021
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
orc::ColumnSelector::updateSelectedByTypeId can throw an exception on
malformed ORC files. The exception wasn't caught by Impala therefore it
caused program termination.
The fix is to simply catch the exception and return with a parse error
instead.
Testing:
* added corrupt ORC file and e2e test
Change-Id: I2f706bc832298cb5089e539b7a818cb86d02199f
Reviewed-on: http://gerrit.cloudera.org:8080/14994
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Hive can write timestamps that are outside Impala's valid
range (Impala: 1400-9999 Hive: 0001-9999). This change adds
validation logic to ORC reading that replaces out-of-range
timestamps with NULLs and adds a warning to the query.
The logic is very similar to the existing validation in
Parquet. Some differences:
- "time of day" is not checked separately as it doesn't make
sense with ORC's encoding
- instead of column name only column id is added to the warning
Testing:
- added a simple EE test that scans an existing ORC file
Change-Id: I8ee2ba83a54f93d37e8832e064f2c8418b503490
Reviewed-on: http://gerrit.cloudera.org:8080/14832
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The goal is to let JDBC clients get constraint information
from Impala tables. We implement two new metadata operations in
impala-hs2-server, GetPrimaryKeys and GetCrossReference, which are
already implemented in Hive's HS2. The thrift
definitions are copied from Hive's TCLIService.thrift. In FE, these
two operations are implemented to get the information from tables
in the catalog.
Much like GetColumns(), tables need to be loaded in order to be able to get
PK/FK information. We wait for the PK table/FK table to load.
In the implementation, PK/FK information is returned
ONLY if the user has access to ALL the columns involved in the PK/FK
relationship.
Testing:
- Added three test tables to our test datasets since most of our FE tests
relied on dummy tables or testdata. It was difficult to test PK/FK with
these methods. Also, we can build on this testdata in future when we make
optimizer improvements.
- Added unit tests in AuthorizationTest and JDBCtest.
- Added e2e test in test_hs2.py
- This patch modifies AnalyzeDDLTests and ToSqlTests to rely on the newly
added dataset instead of dummy tables for pk/fk tests.
Caveats:
- Ranger needs OWNER user information for authorization. Since this is HMS
metadata that we do not aggresively load, this information is not available
for IncompleteTables. Some foreign key tables (fact tables for example)
might have FK/PK relationships with several PK tables some of which might
not be loaded in catalog. Currently we have no way to check column
previleges without owner user information tables. We do not return keys
involving such columns. Therefore, when Ranger is used, there maybe missing
PK/FK relationships for parent tables that are not loaded. This can be
tracked in IMPALA-9172.
- Retrieval of constraints is not yet supported in LocalCatalog mode. See
IMPALA-9158.
Change-Id: I8942dfbbd4a3be244eed1c61ac2ce17069960477
Reviewed-on: http://gerrit.cloudera.org:8080/14720
Reviewed-by: Vihang Karajgaonkar <vihang@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch the supported year range for DATE type started with
year 0. This contradicts the ANSI SQL standard that defines the valid
DATE value range to be 0001-01-01 to 9999-12-31.
Change-Id: Iefdf1c036834763f52d44d0c39a25a1f04e41e07
Reviewed-on: http://gerrit.cloudera.org:8080/14349
Reviewed-by: Attila Jeges <attilaj@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change is a follow-up to IMPALA-7368 and adds support for DATE
type to the avro scanner.
Similarly to parquet, avro uses DATE logical type for dates. DATE
logical type annotates an INT32 that stores the number of days since
the unix epoch, 1 January 1970.
This representation introduces an avro interoperability issue between
Impala and older versions of Hive:
- Before version 3.1, Hive used Julian calendar to represent dates
up to 1582-10-05 and Gregorian calendar for dates starting with
1582-10-15. Dates between 1582-10-05 and 1582-10-15 were lost.
- Impala uses proleptic Gregorian calendar, extending the Gregorian
calendar backward to dates preceding its official introduction in
1582-10-15.
This means that pre-1582-10-15 dates written to an avro table by Hive
will be read back incorrectly by Impala.
Note that Hive 3.1 switched to proleptic Gregorian calendar too, so
for Hive 3.1+ this is no longer an issue.
Dependency changes:
- BE uses avro 1.7.4-p5 from native-toolchain.
Change-Id: I7a9d5b93a22cf3a00244037e187f8c145cacc959
Reviewed-on: http://gerrit.cloudera.org:8080/13944
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit implements page filtering based on the Parquet page index.
The read and evaluation of the page index is done by the
HdfsParquetScanner. At first, we determine the row ranges we are
interested in, and based on the row ranges we determine the candidate
pages for each column that we are reading.
We still issue one ScanRange per column chunk, but we specify
sub-ranges that store the candidate pages, i.e. we don't read
the whole column chunk, but only fractions of it.
Pages are not aligned across column chunks, i.e. page #2 of column A
might store completely different rows than page #2 of column B.
It means we need to implement some kind of row-skipping logic
when we read the data pages. This logic is implemented in
BaseScalarColumnReader and ScalarColumnReader. Collection column
readers know nothing about page filtering.
Page filtering can be turned off by setting the query option
'read_parquet_page_index' to false.
Testing:
* added some unit tests for the row range and
page selection logic
* generated various Parquet files with Parquet-MR
* enabled Page index writing and wrote selective queries against
tables written by Impala. Current tests are likely to use page
filtering transparently.
Performance:
* Measured locally, observed 3x to 20x speedup for selective queries.
The speedup was proportional to the IO operations need to be done.
* The TPCH benchmark didn't show a significant performance change. It
is not a suprise since the data is not being sorted in any useful
way. So the main goal was to not introduce perf regression.
TODO:
* measure performance for remote reads
Change-Id: I0cc99f129f2048dbafbe7f5a51d1ea3a5005731a
Reviewed-on: http://gerrit.cloudera.org:8080/12065
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change is a follow-up to IMPALA-7368 and adds support for DATE
type to the parquet scanner/writer. CREATE TABLE LIKE PARQUET
statements associated with data files that contain dates are also
supported.
Parquet uses DATE logical type for dates. DATE logical type annotates
an INT32 that stores the number of days from the Unix epoch, 1 January
1970.
This representation introduces a parquet interoperability issue
between Impala and older versions of Hive:
- Before version 3.1, Hive used Julian calendar to represent dates
up to 1582-10-05 and Gregorian calendar for dates starting with
1582-10-15. Dates between 1582-10-05 and 1582-10-15 were lost.
- Impala uses proleptic Gregorian calendar, extending the Gregorian
calendar backward to dates preceding its official introduction in
1582-10-15.
This means that pre-1582-10-15 dates written to a parquet table by
Hive will be read back incorrectly by Impala and vice versa.
Note that Hive 3.1 switched to proleptic Gregorian calendar too, so
for Hive 3.1+ this is no longer an issue.
Change-Id: I67da03754531660bc8de3b6935580d46deae1814
Reviewed-on: http://gerrit.cloudera.org:8080/13189
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
DATE values describe a particular year/month/day in the form
yyyy-MM-dd. For example: DATE '2019-02-15'. DATE values do not have a
time of day component. The range of values supported for the DATE type
is 0000-01-01 to 9999-12-31.
This initial DATE type support covers TEXT and HBASE fileformats only.
'DateValue' is used as the internal type to represent DATE values.
The changes are as follows:
- Support for DATE literal syntax.
- Explicit casting between DATE and other types (note that invalid
casts will fail with an error just like invalid DECIMAL_V2 casts,
while failed casts to other types do no lead to warning or error):
- from STRING to DATE. The string value must be formatted as
yyyy-MM-dd HH:mm:ss.SSSSSSSSS. The date component is mandatory,
the time component is optional. If the time component is
present, it will be truncated silently.
- from DATE to STRING. The resulting string value is formatted as
yyyy-MM-dd.
- from TIMESTAMP to DATE. The source timestamp's time of day
component is ignored.
- from DATE to TIMESTAMP. The target timestamp's time of day
component is set to 00:00:00.
- Implicit casting between DATE and other types:
- from STRING to DATE if the source string value is used in a
context where a DATE value is expected.
- from DATE to TIMESTAMP if the source date value is used in a
context where a TIMESTAMP value is expected.
- Since STRING -> DATE, STRING -> TIMESTAMP and DATE -> TIMESTAMP
implicit conversions are now all possible, the existing function
overload resolution logic is not adequate anymore.
For example, it resolves the
if(false, '2011-01-01', DATE '1499-02-02') function call to the
if(BOOLEAN, TIMESTAMP, TIMESTAMP) version of the overloaded
function, instead of the if(BOOLEAN, DATE, DATE) version.
This is clearly wrong, so the function overload resolution logic had
to be changed to resolve function calls to the best-fit overloaded
function definition if there are multiple applicable candidates.
An overloaded function definition is an applicable candidate for a
function call if each actual parameter in the function call either
matches the corresponding formal parameter's type (without casting)
or is implicitly castable to that type.
When looking for the best-fit applicable candidate, a parameter
match score (i.e. the number of actual parameters in the function
call that match their corresponding formal parameter's type without
casting) is calculated and the applicable candidate with the highest
parameter match score is chosen.
There's one more issue that the new resolution logic has to address:
if two applicable candidates have the same parameter match score and
the only difference between the two is that the first one requires a
STRING -> TIMESTAMP implicit cast for some of its parameters while
the second one requires a STRING -> DATE implicit cast for the same
parameters then the first candidate has to be chosen not to break
backward compatibility.
E.g: year('2019-02-15') function call must resolve to
year(TIMESTAMP) instead of year(DATE). Note, that year(DATE) is not
implemented yet, so this is not an issue at the moment but it will
be in the future.
When the resolution algorithm considers overloaded function
definitions, first it orders them lexicographically by the types in
their parameter lists. To ensure the backward compatible behavior
Primitivetype.DATE enum value has to come after
PrimitiveType.TIMESTAMP.
- Codegen infrastructure changes for expression evaluation.
- 'IS [NOT] NULL' and '[NOT] IN' predicates.
- Common comparison operators (including the 'BETWEEN' operator).
- Infrastructure changes for built-in functions.
- Some built-in functions: conditional, aggregate, analytical and
math functions.
- C++ UDF/UDA support.
- Support partitioning and grouping by DATE.
- Beeswax, HiveServer2 support.
These items are tightly coupled and it makes sense to implement them
in one change-set.
Testing:
- A new partitioned TEXT table 'functional.date_tbl' (and the
corresponding HBASE table 'functional_hbase.date_tbl') was
introduced for DATE-related tests.
- BE and FE tests were extended to cover DATE type.
- E2E tests:
- since DATE type is supported for TEXT and HBASE fileformats
only, most DATE tests were implemented separately in
tests/query_test/test_date_queries.py.
Note, that this change-set is not a complete DATE type implementation,
but it lays the foundation for future work:
- Add date support to the random query generator.
- Implement a complete set of built-in functions.
- Add Parquet support.
- Add Kudu support.
- Optionally support Avro and ORC.
For further details, see IMPALA-6169.
Change-Id: Iea8155ef09557e0afa2f8b2d0b2dc9d0896dc30f
Reviewed-on: http://gerrit.cloudera.org:8080/12481
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This is a fix for the following issue:
1. Some BE tests (e.g. ExprTest.TimestampFunctions) use the system's
local timezone but run against a test timezone db (instead of the
system's timezone db).
2. On some Linux installations /usr/share/zoneinfo contains symlinks
to files in the /usr/share/zoneifo/SystemV directory
(e.g /usr/share/zoneinfo/America/Los_Angeles is a symlink to
../SystemV/PST8PDT).
3. The 'SystemV' directory is not part of the test timezone db, since
it is obsolete and excluded by default.
Consequently, if the system's local timezone is set to
America/Los_Angeles, BE tests won't find the corresponding timezone
file in the test timezone db. BE tests will default to UTC, which will
break some of them.
This change sets local timezone explicitly for failing BE tests, so
they don't depend on the system's local timezone.
It also adds 'SystemV' directory to the test timezone db to avoid
similar issues in the future.
Change-Id: I9288cd24c8af0c059e55d47c86bd92eaf0075681
Reviewed-on: http://gerrit.cloudera.org:8080/12199
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The code mimics the code written for other min-max filters. Decimal data
can be stored using 4 bytes, 8 bytes and 16 bytes. The code respectively
handles these 3 storage configurations. The column definition states the
precision and the precision determines the storage size.
The minimum and maximum values are stored in a union. The precision from
the column will come in as an input. Based on the precision the size will be
found, and depending on the size appropriate variable will be used.
The code in min-max-filter* follows the general convention of the file, hence
uses macros.
The test includes 24 decimal columns (as listed below) with the following joins:
1. Inner Join with broadcast (2 tables)
1a. 1 predicate
1b. 4 predicates - all results in decimal min-max filter
1c. 4 predicates - 3 results in decimal min=max filter; 1 doesn't
2. Inner Join with Shuffle (3 tables)
3. Right outer join (2 tables)
4. Left Semi join (2 tables)
5. Right Semi join (2 tables)
Decimal Columns:
4bytes:
(5,0), (5,1), (5,3), (5,5)
(9,0), (9,1), (9,5), (9,9)
8 bytes:
(14,0), (14,1), (14,7), (14,14)
(18,0), (18,1), (18,9), (18,18)
16 bytes:
(28,0), (28,1), (28,14), (28,28)
(38,0), (38,1), (38,19), (38,38)
The test aggregates the count of probe rows. This shows that the min-max filter
is exercised, because the number of probe rows is less than the total number
of rows in the probe side table. The count of probe rows is considered to be
deterministic. But, it will be beneficial to look out for changes in Kudu that can
change the way data is partitioned. Such a change could change the probe row count
and in that case, the test will have to be updated.
impala_test_suite.py and test_result_verifier.py are enhanced to support saving
of aggregation using update_results.
Change-Id: Ib7e7278e902160d7060f8097290bc172d9031f94
Reviewed-on: http://gerrit.cloudera.org:8080/12113
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
PARQUET-1387 added int64 timestamps with nanosecond precision that
stores timestamps as nanoseconds since the Unix epoch.
As 64 bits are not enough to represent the whole 1400..9999 range
of Impala timestamps, this new type works with a limited range:
1677-09-21 00:12:43.145224192 .. 2262-04-11 23:47:16.854775807 UTC
The benefit of the reduced range is that no validation is necessary
during scanning, as every possible 64 bit value represents a valid
timestamp in Impala. This may mean that this has the potential be
the fastest way to store timestamps in Impala + Parquet.
Another way NANO differs from MICRO and MILLI is that NANO can
be only described with new logical types in Parquet, it has no
converted type equivalent. This made implementing CREATE TABLE
LIKE PARQUET less trivial than it was for MICRO/MILLI: the type
conversion logic in ParquetHelper.java had to be rewritten to
use LogicalTypeAnnotation instead of ConvertedType.
The changes on Java side also made bumping CDH_BUILD_NUMBER
necessary.
Testing:
- added a new testfile with int64 nano timestamps
- ran core tests
Change-Id: I932396d8646f43c0b9ca4a6359f164c4d8349d8f
Reviewed-on: http://gerrit.cloudera.org:8080/11984
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The idea is to optimise the common case where there are long runs of
NULL or non-NULL values (i.e. the def level is repeated). We can
detect this cheaply by keying the decoding loop in the column reader
off the state of the def level RLE decoder - if there's a long run
of repeated levels, we can skip checking the def level for every
value. We still fall back to decoding, caching and reading
value-by-value a batch of def levels whenever the next def level is not
in a repeated run. We still use the old approach for decoding rep
levels. There might be some benefit to using the same approach for rep
levels *if* repeated def and rep level runs line up.
These changes should unlock further optimizations because more time is
spent in simple kernel functions, e.g. UnpackAndDecode32Values() for
dictionary decompression, which is very optimisable using SIMD etc.
Snappy decompression now seems to be the main CPU bottleneck for
decoding snappy-compressed Parquet.
Perf:
Running TPC-H scale factor 60 on uncompressed and snappy parquet
both showed a ~4% speedup overall.
Microbenchmarks on uncompressed parquet show scans only doing
dictionary decoding on uncompressed Parquet is ~75% faster:
set mt_dop=1;
select min(l_returnflag) from lineitem;
Testing:
We have alltypes agg with a mix of null and non-null.
Many tables have long runs of non-null values.
Added new test data and coverage:
* a test table manynulls with long runs of null values.
* a large CHAR test table
* missing coverage for materialising pos slot in flattened nested types
scan.
* Extended dict test to test longer runs.
* A larger version of complextypestbl with interesting collection
shapes - NULL collections, empty collections, etc, particularly runs
of collections with the same shape.
* Test interaction of timestamp validation with conversion
* Ran code coverage build to confirm all code paths are tested
* ASAN and exhaustive runs.
Change-Id: I8c03006981c46ef0dae30602f2b73c253d9b49ef
Reviewed-on: http://gerrit.cloudera.org:8080/8319
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Changes:
- parquet.thrift is updated to a newer version which contains the
timestamp logical type.
- INT64 columns with converted types TIMESTAMP_MILLIS and
TIMESTAMP_MICROS can be read as TIMESTAMP.
- If the logical type is timestamp, then the type will contain the
information whether the UTC->local conversion is necessary. This
feature is only supported for the new timestamp types, so INT96
timestamps must still use flag
convert_legacy_hive_parquet_utc_timestamps.
- Min/max stat filtering is enabled again for columns that need
UTC->local conversion. This was disabled in IMPALA-7559 because
it could incorrectly drop column chunks.
- CREATE TABLE LIKE PARQUET converts these columns to
TIMESTAMP - before the change, an error was returned instead.
- Bulk of the Parquet column stat logic was moved to a new class
called "ColumnStatsReader".
Testing:
- Added unit tests for timezone conversion (this needed a new public
function in timezone_db.h and adding CET to tzdb_tiny).
- Added parquet files (created with parquet-mr) with int64 timestamp
columns.
Change-Id: I4c7c01fffa31b3d2ca3480adf6ff851137dadac3
Reviewed-on: http://gerrit.cloudera.org:8080/11057
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>