mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This patch mainly implement the querying of paimon data table
through JNI based scanner.
Features implemented:
- support column pruning.
The partition pruning and predicate push down will be submitted
as the third part of the patch.
We implemented this by treating the paimon table as normal
unpartitioned table. When querying paimon table:
- PaimonScanNode will decide paimon splits need to be scanned,
and then transfer splits to BE do the jni-based scan operation.
- We also collect the required columns that need to be scanned,
and pass the columns to Scanner for column pruning. This is
implemented by passing the field ids of the columns to BE,
instead of column position to support schema evolution.
- In the original implementation, PaimonJniScanner will directly
pass paimon row object to BE, and call corresponding paimon row
field accessor, which is a java method to convert row fields to
impala row batch tuples. We find it is slow due to overhead of
JVM method calling.
To minimize the overhead, we refashioned the implementation,
the PaimonJniScanner will convert the paimon row batches to
arrow recordbatch, which stores data in offheap region of
impala JVM. And PaimonJniScanner will pass the arrow offheap
record batch memory pointer to the BE backend.
BE PaimonJniScanNode will directly read data from JVM offheap
region, and convert the arrow record batch to impala row batch.
The benchmark shows the later implementation is 2.x better
than the original implementation.
The lifecycle of arrow row batch is mainly like this:
the arrow row batch is generated in FE,and passed to BE.
After the record batch is imported to BE successfully,
BE will be in charge of freeing the row batch.
There are two free paths: the normal path, and the
exception path. For the normal path, when the arrow batch
is totally consumed by BE, BE will call jni to fetch the next arrow
batch. For this case, the arrow batch is freed automatically.
For the exceptional path, it happends when query is cancelled, or memory
failed to allocate. For these corner cases, arrow batch is freed in the
method close if it is not totally consumed by BE.
Current supported impala data types for query includes:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE
TODO:
- Patches pending submission:
- Support tpcds/tpch data-loading
for paimon data table.
- Virtual Column query support for querying
paimon data table.
- Query support with time travel.
- Query support for paimon meta tables.
- WIP:
- Snapshot incremental read.
- Complex type query support.
- Native paimon table scanner, instead of
jni based.
Testing:
- Create tests table in functional_schema_template.sql
- Add TestPaimonScannerWithLimit in test_scanners.py
- Add test_paimon_query in test_paimon.py.
- Already passed the tpcds/tpch test for paimon table, due to the
testing table data is currently generated by spark, and it is
not supported by impala now, we have to do this since hive
doesn't support generating paimon table for dynamic-partitioned
tables. we plan to submit a separate patch for tpcds/tpch data
loading and associated tpcds/tpch query tests.
- JVM Offheap memory leak tests, have run looped tpch tests for
1 day, no obvious offheap memory increase is observed,
offheap memory usage is within 10M.
Change-Id: Ie679a89a8cc21d52b583422336b9f747bdf37384
Reviewed-on: http://gerrit.cloudera.org:8080/23613
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This directory contains Impala test data sets. The directory layout is structured as follows:
datasets/
<data set>/<data set>_schema_template.sql
<data set>/<data files SF1>/data files
<data set>/<data files SF2>/data files
Where SF is the scale factor controlling data size. This allows for scaling the same schema to
different sizes based on the target test environment.
The schema template SQL files have the following format:
The goal is to provide a single place to define a table + data files
and have the schema and data load statements generated for each combination of file
format, compression, etc. The way this works is by specifying how to create a
'base table'. The base table can be used to generate tables in other file formats
by performing the defined INSERT / SELECT INTO statement. Each new table using the
file format/compression combination needs to have a unique name, so all the
statements are pameterized on table name.
The template file is read in by the 'generate_schema_statements.py' script to
to generate all the schema for the Impala benchmark tests.
Each table is defined as a new section in the file with the following format:
====
---- SECTION NAME
section contents
...
---- ANOTHER SECTION
... section contents
---- ... more sections...
Note that tables are delimited by '====' and that even the first table in the
file must include this header line.
The supported section names are:
DATASET
Data set name - Used to group sets of tables together
BASE_TABLE_NAME
The name of the table within the database
CREATE
Explicit CREATE statement used to create the table (executed by Impala)
CREATE_HIVE
Same as the above, but will be executed by Hive instead. If specified,
'CREATE' must not be specified.
CREATE_KUDU
Customized CREATE TABLE statement used to create the table for Kudu-specific
syntax.
COLUMNS
PARTITION_COLUMNS
ROW_FORMAT
HBASE_COLUMN_FAMILIES
TABLE_PROPERTIES
HBASE_REGION_SPLITS
If no explicit CREATE statement is provided, a CREATE statement is generated
from these sections (see 'build_table_template' function in
'generate-schema-statements.py' for details)
ALTER
A set of ALTER statements to be executed after the table is created
(typically to add partitions, but may also be used for other settings that
cannot be specified directly in the CREATE TABLE statement).
These statements are ignored for HBase and Kudu tables.
LOAD
The statement used to load the base (text) form of the table. This is
typically a LOAD DATA statement.
DEPENDENT_LOAD
DEPENDENT_LOAD_KUDU
DEPENDENT_LOAD_HIVE
DEPENDENT_LOAD_ACID
DEPENDENT_LOAD_JSON
Statements to be executed during the "dependent load" phase. These statements
are run after the initial (base table) load is complete.
HIVE_MAJOR_VERSION
The required major version of Hive for this table. If the major version
of Hive at runtime does not exactly match the version specified in this section,
the table will be skipped.
NOTE: this is not a _minimum_ version -- if HIVE_MAJOR_VERSION specifies '2',
the table will _not_ be loaded/created on Hive 3.