TestTpcdsQueryWithProcessingCost.test_tpcds_q51a and
TestTpcdsQuery.test_tpcds_q67a has been intermittently failing with
memory oversubscription error. The fact that test minicluster start 3
impalad in single host probably make admission control less effective in
preventing these queries from running in parallel with others.
This patch keep both test, but reduce max_fragment_instances_per_node
from 4 to 2 to lower its memory requirement.
Before patch:
q51a
Max Per-Host Resource Reservation: Memory=3.08GB Threads=129
Per-Host Resource Estimates: Memory=124.24GB
Per Host Min Memory Reservation: localhost:27001(2.93 GB) localhost:27002(1.97 GB) localhost:27000(2.82 GB)
Per Host Number of Fragment Instances: localhost:27001(115) localhost:27002(79) localhost:27000(119)
Admission result: Admitted immediately
Cluster Memory Admitted: 33.00 GB
Per Node Peak Memory Usage: localhost:27000(2.84 GB) localhost:27002(1.99 GB) localhost:27001(2.95 GB)
Per Node Bytes Read: localhost:27000(62.08 MB) localhost:27002(45.71 MB) localhost:27001(47.39 MB)
q67a
Max Per-Host Resource Reservation: Memory=2.15GB Threads=105
Per-Host Resource Estimates: Memory=4.48GB
Per Host Min Memory Reservation: localhost:27001(2.13 GB) localhost:27002(2.13 GB) localhost:27000(2.15 GB)
Per Host Number of Fragment Instances: localhost:27001(76) localhost:27002(76) localhost:27000(105)
Cluster Memory Admitted: 13.44 GB
Per Node Peak Memory Usage: localhost:27000(2.24 GB) localhost:27002(2.21 GB) localhost:27001(2.21 GB)
Per Node Bytes Read: localhost:27000(112.79 MB) localhost:27002(109.57 MB) localhost:27001(105.16 MB)
After patch:
q51a
Max Per-Host Resource Reservation: Memory=2.00GB Threads=79
Per-Host Resource Estimates: Memory=118.75GB
Per Host Min Memory Reservation: localhost:27001(1.84 GB) localhost:27002(1.28 GB) localhost:27000(1.86 GB)
Per Host Number of Fragment Instances: localhost:27001(65) localhost:27002(46) localhost:27000(74)
Cluster Memory Admitted: 33.00 GB
Per Node Peak Memory Usage: localhost:27000(1.88 GB) localhost:27002(1.31 GB) localhost:27001(1.88 GB)
Per Node Bytes Read: localhost:27000(62.08 MB) localhost:27002(45.71 MB) localhost:27001(47.39 MB)
q67a
Max Per-Host Resource Reservation: Memory=1.31GB Threads=85
Per-Host Resource Estimates: Memory=3.76GB
Per Host Min Memory Reservation: localhost:27001(1.29 GB) localhost:27002(1.29 GB) localhost:27000(1.31 GB)
Per Host Number of Fragment Instances: localhost:27001(56) localhost:27002(56) localhost:27000(85)
Cluster Memory Admitted: 11.28 GB
Per Node Peak Memory Usage: localhost:27000(1.35 GB) localhost:27002(1.32 GB) localhost:27001(1.33 GB)
Per Node Bytes Read: localhost:27000(112.79 MB) localhost:27002(109.57 MB) localhost:27001(105.16 MB)
Testing:
- Pass test_tpcds_queries.py in local machine.
Change-Id: I6ae5aeb97a8353d5eaa4d85e3f600513f42f7cf4
Reviewed-on: http://gerrit.cloudera.org:8080/20581
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-11068 added three new tests into
hdfs-scanner-thread-mem-scaling.test. The first one is failing
intermittently, most likely due to fragment right above the scan does
not pull row batches fast enough. This patch attempt to deflake the
tests by replacing it with simple count start query. The three test
cases is now contained in its own
test_hdfs_scanner_thread_non_reserved_bytes and will be skipped for
sanitized build.
Testing:
- Loop and pass test_hdfs_scanner_thread_non_reserved_bytes a hundred
times.
Change-Id: I7c99b2ef70b71e148cedb19037e2d99702966d6e
Reviewed-on: http://gerrit.cloudera.org:8080/20593
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-5741 added initial support for external JDBC data source. But
the JDBC driver URL only works for HDFS now, other schemes like ozone
are still under developemnt.
This patch turns off tests of JDBC data source for non-HDFS
environments temporally.
Testing:
- Manually ran tests/query_test/test_ext_data_sources.py
Change-Id: I4b162767afc567065ae61961ffb376c17c32ec29
Reviewed-on: http://gerrit.cloudera.org:8080/20563
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch uses the "external data source" mechanism in Impala to
implement data source for querying JDBC.
It has some limitations due to the restrictions of "external data
source":
- It is not distributed, e.g, fragment is unpartitioned. The queries
are executed on coordinator.
- Queries which read following data types from external JDBC tables
are not supported:
BINARY, CHAR, DATETIME, and COMPLEX.
- Only support binary predicates with operators =, !=, <=, >=,
<, > to be pushed to RDBMS.
- Following data types are not supported for predicates:
DECIMAL, TIMESTAMP, DATE, and BINARY.
- External tables with complex types of columns are not supported.
- Support is limited to the following databases:
MySQL, Postgres, Oracle, MSSQL, H2, DB2, and JETHRO_DATA.
- Catalog V2 is not supported (IMPALA-7131).
- DataSource objects are not persistent (IMPALA-12375).
Additional fixes are planned on top of this patch.
Source files under jdbc/conf, jdbc/dao and jdbc/exception are
replicated from Hive JDBC Storage Handler.
In order to query the RDBMS tables, the following steps should be
followed (note that existing data source table will be rebuilt):
1. Make sure the Impala cluster has been started.
2. Copy the jar files of JDBC drivers and the data source library into
HDFS.
${IMPALA_HOME}/testdata/bin/copy-ext-data-sources.sh
3. Create an `alltypes` table in the Postgres database.
${IMPALA_HOME}/testdata/bin/load-ext-data-sources.sh
4. Create data source tables (alltypes_jdbc_datasource and
alltypes_jdbc_datasource_2).
${IMPALA_HOME}/bin/impala-shell.sh -f\
${IMPALA_HOME}/testdata/bin/create-ext-data-source-table.sql
5. It's ready to run query to access data source tables created
in last step. Don't need to restart Impala cluster.
Testing:
- Added unit-test for Postgres and ran unit-test with JDBC driver
postgresql-42.5.1.jar.
- Ran manual unit-test for MySql with JDBC driver
mysql-connector-j-8.1.0.jar.
- Ran core tests successfully.
Change-Id: I8244e978c7717c6f1452f66f1630b6441392e7d2
Reviewed-on: http://gerrit.cloudera.org:8080/17842
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
If an Iceberg table is frequently updated/written to in small batches,
a lot of small files are created. This decreases read performance.
Similarly, frequent row-level deletes contribute to this problem
by creating delete files, which have to be merged on read.
So far INSERT OVERWRITE (rewriting the table with itself) has been used
to compact Iceberg tables.
However, it comes with some RESTRICTIONS:
- The table should not have multiple partition specs/partition evolution.
- The table should not contain complex types.
The OPTIMIZE statement offers a new syntax and a solution limited to
Iceberg tables to enhance read performance for subsequent operations.
See IMPALA-12293 for details.
Syntax: OPTIMIZE TABLE <table_name>;
This first patch introduces the new syntax, temporarily as an alias
for INSERT OVERWRITE.
Note that executing OPTIMIZE TABLE requires ALL privileges.
Testing:
- negative tests
- FE planner test
- Ranger test
- E2E tests
Change-Id: Ief42537499ffe64fafdefe25c8d175539234c4e7
Reviewed-on: http://gerrit.cloudera.org:8080/20405
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Until IMPALA-7368, Impala allowed comparing char and varchar slots as in
select * from functional.chars_tiny where cs = vc;
IMPALA-7368 added DATE type support, and as part of that changed
function call resolution to use a fit function based on the number of
arguments that match the call types. Previously the comparison above
would take the first matching function, which happened to be equality
between STRING and STRING; CHAR and VARCHAR can both be implicitly cast
to STRING, so this function worked. With the new function resolution,
the best fit is equality between VARCHAR and VARCHAR, however implicit
casting to VARCHAR(*) from CHAR wasn't allowed.
The behavior before IMPALA-7368 was somewhat accidental; it depended on
the order that builtin EQ functions are added via
BinaryPredicate.initBuiltins -> Type.getSupportedTypes. Supported types
happened to be ordered with STRING preceding VARCHAR and CHAR. The fit
function makes sense and changing its behavior may have other
consequences; it also makes sense that CHAR should be castable to
VARCHAR.
This change allows implicit cast between matching types. Functionally it
only changes how we handle char/varchar comparison with wildcard
char/varchar, because decimals are handled before checking for matching
types and other type matching is the same as equals. It now allows
casting to a compatible type when it is a char or varchar and the target
type is a wildcard version of the same.
Does not attempt to address differences from CHAR padding (IMPALA-1652).
Testing:
- Adds tests covering cast comparison and other implicit conversions.
- Passed exhaustive test run.
Change-Id: Ib89d0a391bc8f2152ecd9151c8872a01ba19c436
Reviewed-on: http://gerrit.cloudera.org:8080/20425
Reviewed-by: Peter Rozsa <prozsa@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-2581 added enforcement of the limit when adding entries to the
grouping aggregation. It would stop adding new entries if the number of
entries in the grouping aggregation was >= the limit. If the grouping
aggregation never contains more entries than the limit, then it would
not output more entries.
However, this limit was not enforced exactly when adding. It would add a
whole batch before checking the limit, so it can go past the limit. In
practice the exchange in a distributed aggregation would enforce limits,
so this would only show up when num_nodes=1. As a result, the following
query incorrectly returns 16 rows, not 10:
set num_nodes=1;
select distinct l_orderkey from tpch.lineitem limit 10;
One option is to be exact when adding items to the group aggregation,
which would require testing the limit on each row (we don't know which
are duplicates). This is awkward. Removing the limit on the output of
the aggregation also is not really needed for the original change
(stopping the children early once the limit is reached). Instead, we
restore the limit on the output of the grouping agg (which is already
known to work).
Testing:
- added a test case where we assert number of rows returned by an
aggregation node (rather than an exchange or top-n).
- restores definition of ALL_CLUSTER_SIZES and makes it simpler to
enable for individual test suites. Filed IMPALA-12394 to generally
re-enable testing with ALL_CLUSTER_SIZES. Enables ALL_CLUSTER_SIZES
for aggregation tests.
Change-Id: Ic5eec1190e8e182152aa954897b79cc3f219c816
Reviewed-on: http://gerrit.cloudera.org:8080/20379
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
LZ4 compression doesn't seem useful when the RowBatch is sent to a
fragment instance within the same process instead of a remote host.
After this change KrpcDataStreamSender skips compression for channels
where the destination is in the same process.
Other changes:
- OutboundRowBatch is moved to a separate file to make the commonly
included row-batch.h lighter.
- TestObservability.test_global_exchange_counters had to be changed
as skipping compression changed metric ExchangeScanRatio. Also added
a sleep to the test query because it was flaky on my machine (it
doesn't seem flaky in jenkins runs, probably my CPU is faster).
See the Jira for more details on tasks that could be skipped in
intra process RowBatch transfer. From these compression is both
the most expensive and easiest to avoid.
Note that it may also make sense to skip compression if the target
is not the in same process but resides on the same host. This setup is
not typical in production environment AFAIK and it would complicate
testing compression as impalad processes often run on the
same host during tests. For these reasons it seems better to only
implement this if both the host and port are the same.
TPCH benchmark shows significant improvement but it uses only 3
impalad processes so 1/3 of exchanges are affected - in bigger
clusters the change should be much smaller.
+----------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+----------+-----------------------+---------+------------+------------+----------------+
| TPCH(42) | parquet / none / none | 3.59 | -4.95% | 2.37 | -2.51% |
+----------+-----------------------+---------+------------+------------+----------------+
Change-Id: I7ea23fd1f0f10f72f3dbd8594f3def3ee190230a
Reviewed-on: http://gerrit.cloudera.org:8080/20462
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Prototype of HdfsJsonScanner implemented based on rapidjson, which
supports scanning data from splitting json files.
The scanning of JSON data is mainly completed by two parts working
together. The first part is the JsonParser responsible for parsing the
JSON object, which is implemented based on the SAX-style API of
rapidjson. It reads data from the char stream, parses it, and calls the
corresponding callback function when encountering the corresponding JSON
element. See the comments of the JsonParser class for more details.
The other part is the HdfsJsonScanner, which inherits from HdfsScanner
and provides callback functions for the JsonParser. The callback
functions are responsible for providing data buffers to the Parser and
converting and materializing the Parser's parsing results into RowBatch.
It should be noted that the parser returns numeric values as strings to
the scanner. The scanner uses the TextConverter class to convert the
strings to the desired types, similar to how the HdfsTextScanner works.
This is an advantage compared to using number value provided by
rapidjson directly, as it eliminates concerns about inconsistencies in
converting decimals (e.g. losing precision).
Added a startup flag, enable_json_scanner, to be able to disable this
feature if we hit critical bugs in production.
Limitations
- Multiline json objects are not fully supported yet. It is ok when
each file has only one scan range. However, when a file has multiple
scan ranges, there is a small probability of incomplete scanning of
multiline JSON objects that span ScanRange boundaries (in such cases,
parsing errors may be reported). For more details, please refer to
the comments in the 'multiline_json.test'.
- Compressed JSON files are not supported yet.
- Complex types are not supported yet.
Tests
- Most of the existing end-to-end tests can run on JSON format.
- Add TestQueriesJsonTables in test_queries.py for testing multiline,
malformed, and overflow in JSON.
Change-Id: I31309cb8f2d04722a0508b3f9b8f1532ad49a569
Reviewed-on: http://gerrit.cloudera.org:8080/19699
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch forbids creating an EXTERNAL Iceberg table that points
to another Iceberg table in the Hive Catalog. I.e. the following should
be forbidden:
CREATE EXTERNAL TABLE ice_ext
STORED BY ICEBERG
TBLPROPERTIES ('iceberg.table_identifier'='db.tbl');
Loading such tables should also raise an error. Users need to query
the original Iceberg tables. Alternatively they can create VIEWs if
they want to query tables with a different name.
Testing:
* added e2e tests for CREATE EXTERNAL TABLE
* added e2e test about loading such table
Change-Id: Ifb0d7f0e7ec40fba356bd58b43f68d070432de71
Reviewed-on: http://gerrit.cloudera.org:8080/20429
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
DDL/DMLs might spend a long time in the execution in catalogd.
Currently, the profiles just have a counter of CatalogOpExecTimer. We
need more items to show the details of how they are executed in
catalogd.
As the first step, this patch adds the profile timeline for how the
createTable DDL is executed in Catalog Server. Also added more events in
the existing "Query Timeline" for when the CatalogDdlRequest finished
and when the catalog updates are applied.
To implement this, a 'profile' field is added in TDdlExecResponse to
carry the execution counters and timeline in catalogd. Currently, we
just uses the timeline of it. We can add more counters in the future.
Several methods add a timeline parameter to mark the progress in them.
Timeline events are added after each RPC finished.
Here is an example when HMS is hanging for 26s in a CTAS. I used gdb to
attach to HMS as the JIRA description mentioned. In the timeline, we can
see the time is spent in the first HMS RPC that fetching the current HMS
event id:
Catalog Server Operation: 26s560ms
- Got metastoreDdlLock: 163.977us (163.977us)
- Got Metastore client: 166.339us (2.362us)
- Got current Metastore event id 8355270: 26s494ms (26s494ms)
- Created table in Metastore: 26s558ms (63.507ms)
- Fetched event batch from Metastore: 26s559ms (1.155ms)
- Created table in catalog cache: 26s560ms (1.164ms)
- DDL finished: 26s560ms (65.538us)
Query Compilation: 164.257ms
- Metadata of all 1 tables cached: 10.535ms (10.535ms)
- Analysis finished: 118.324ms (107.788ms)
- Authorization finished (noop): 118.489ms (164.626us)
- Value transfer graph computed: 118.830ms (341.792us)
- Single node plan created: 150.150ms (31.319ms)
- Runtime filters computed: 150.254ms (103.529us)
- Distributed plan created: 151.832ms (1.578ms)
- Planning finished: 164.257ms (12.425ms)
Query Timeline: 27s304ms
- Query submitted: 129.658us (129.658us)
- Planning finished: 170.224ms (170.095ms)
- CatalogDdlRequest finished: 26s731ms (26s561ms)
- Applied catalog updates from DDL: 26s740ms (8.752ms)
- Submit for admission: 26s740ms (22.233us)
- Completed admission: 26s740ms (286.295us)
- Ready to start on 3 backends: 26s740ms (155.916us)
- All 3 execution backends (3 fragment instances) started: 26s751ms (10.864ms)
- Last row fetched: 26s920ms (168.226ms)
- Released admission control resources: 26s920ms (27.635us)
- DML data written: 26s920ms (126.369us)
- Applied catalog updates from DDL: 26s985ms (65.354ms)
- DML Metastore update finished: 26s985ms (30.343us)
- Rows available: 26s985ms (27.050us)
- Unregister query: 27s304ms (318.661ms)
An example of creating a Kudu table:
Catalog Server Operation: 1s730ms
- Got Metastore client: 113.403us (113.403us)
- Got current Metastore event id 8355276: 974.500us (861.097us)
- Got Kudu client: 212.123ms (211.148ms)
- Got kuduDdlLock: 212.128ms (4.680us)
- Checked table existence in Kudu: 850.786ms (638.658ms)
- Created table in Kudu: 1s623ms (772.379ms)
- Got metastoreDdlLock: 1s623ms (397.305us)
- Got Metastore client: 1s623ms (7.813us)
- Checked table existence in Metastore: 1s648ms (25.154ms)
- Created table in Metastore: 1s725ms (76.348ms)
- Fetched event batch from Metastore: 1s728ms (3.004ms)
- Created table in catalog cache: 1s730ms (2.024ms)
- DDL finished: 1s730ms (84.448us)
An example of creating an Iceberg table:
Catalog Server Operation: 1s573ms
- Got Metastore client: 141.799us (141.799us)
- Checked table existence in Metastore: 2.957ms (2.815ms)
- Got current Metastore event id 8355277: 3.669ms (712.475us)
- Created table using Iceberg Catalog HIVE_CATALOG: 1s379ms (1s375ms)
- Fetched event batch from Metastore: 1s381ms (2.188ms)
- Created table in catalog cache: 1s382ms (1.556ms)
- Set Iceberg table owner in Metastore: 1s573ms (190.262ms)
- DDL finished: 1s573ms (59.176us)
Tests:
- Add e2e tests to verify the DDL timeline events exist in profile
Change-Id: I3ebf591625e71391a5b23f56ddca8f0ae97b1efa
Reviewed-on: http://gerrit.cloudera.org:8080/20368
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit addresses an issue in the CastExpr class where the clone
constructor was not properly preserving compatibility settings. The
clone constructor assigned the default compatibility regardless of the
source expression, causing substitution errors for partitioned tables.
Example:
'insert into unsafe_insert_partitioned(int_col, string_col)
values("1", null), (null, "1")'
Throws:
ERROR: IllegalStateException: Failed analysis after expr substitution.
CAUSED BY: IllegalStateException: cast STRING to INT
Tests:
- new test case added to insert-unsafe.test
Change-Id: Iff64ce02539651fcb3a90db678f74467f582648f
Reviewed-on: http://gerrit.cloudera.org:8080/20385
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The commit of an Iceberg transaction is done by the Iceberg catalog. In
the common case for Impala the Iceberg catalog is HiveCatalog, and the
actual commit is performed by HMS. This means the commit could fail
because of some activity outside of Impala. It is useful therefore to
be able to simulate what happens when an Iceberg commit fails.
Extend Java DebugAction to allow it to throw an exception. For now this
is limited to throwing unchecked exceptions, which is all that is
needed for this patch.
Add two DebugActions that can be used to throw Iceberg
CommitFailedExceptions at the point where the Iceberg transaction is
about to commit.
Add a new test that uses the new DebugActions to abort an insert and the
addition of a column.
Change-Id: Iafdacc3377a0a946ead9331ba6a63a1fbb11f0eb
Reviewed-on: http://gerrit.cloudera.org:8080/20306
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Originally there were two flavors of the test_drop_corrupt_table test.
One version (test_drop_corrupt_table_with_invalidate) did
an 'invalidate metadata' as part of the test after deleting metadata
files, and the other version did not. The version without invalidation
was depending on the catalog update resulting from adding the iceberg
table arriving at the coordinator before the deletion. This happens a
lot of the time but it can't be guaranteed. Fix the test by removing
test_drop_corrupt_table. Simplify the code a little by inlining a
method, and rename the remaining test to be test_drop_corrupt_table.
Change-Id: I4cbdf5646ed20bb8333e5557ed43226de993b7dd
Reviewed-on: http://gerrit.cloudera.org:8080/20289
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
UnnestExpr can be used in a sort tuple which won't have a resolved path.
In such case, there is no parent tuple id so we should not check it in
UnnestExpr.isBoundByTupleIds(). This fixes the NullPointerException.
Tests
- Added fe and e2e tests for sorting columns come from unnest().
- Move test_zipping_unnest_from_view to a dedicated class that only has
the parquet file_format dimension to make sure it also runs in "core"
exploration strategy. Before this it only runs in "exhaustive"
strategy.
Change-Id: I43e1ef0467edfb8a4a23047a439426053c93ad72
Reviewed-on: http://gerrit.cloudera.org:8080/20274
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
DeflateCodec is an alias to DefaultCodec. Impala works with
DefaultCodec. Fixes reading files written with DeflateCodec.
DeflateCodec isn't an issue with text files because they don't include a
codec header. Sequence files do, which we check on decompress.
Moves TestTextInterop to a E2E test since it doesn't require any special
startup options and refactors out test running to be format-agnostic.
Updates text file test as IMPALA-8721 is fixed. Removes creating a table
in Impala for Hive to read, as it didn't test anything new. Adds tests
for sequence files; excludes reading zstd due to IMPALA-12276.
Testing:
- manual exhaustive run of updated tests
Change-Id: Id5ec1d0345ae35597f6aade9d8b9eef2257efeba
Reviewed-on: http://gerrit.cloudera.org:8080/20181
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
If, in a VALUES clause, for the same column all of the values are CHAR
types but not all are of the same length, the common type chosen is
CHAR(max(lengths)). This means that shorter values are padded with
spaces. If the destination column is not CHAR but VARCHAR or STRING,
this produces different results than if the values in the column are
inserted individually, in separate statements. This behaviour is
suboptimal because information is lost.
For example:
CREATE TABLE impala_char_insert (s STRING);
-- all values are CHAR(N) with different N, but all will use the
biggest N
INSERT OVERWRITE impala_char_insert VALUES
(CAST("1" AS CHAR(1))),
(CAST("12" AS CHAR(2))),
(CAST("123" AS CHAR(3)));
SELECT length(s) FROM impala_char_insert;
3
3
3
-- if inserted individually, the result is
SELECT length(s) FROM impala_char_insert;
1
2
3
This patch adds the query option VALUES_STMT_AVOID_LOSSY_CHAR_PADDING
which, when set to true, fixes the problem by implicitly casting the
values to the VARCHAR type of the longest value if all values in a
column are CHAR types AND not all have the same length. This VARCHAR
type will be the common type of the column in the VALUES statement.
The new behaviour is not turned on by default because it is a breaking
change.
Note that the behaviour in Hive is different from both behaviours in
Impala: Hive (and PostgreSQL) implicitly remove trailing spaces from
CHAR values when they are cast to other types, which is also lossy.
We choose VARCHAR instead of STRING as the common type because VARCHAR
can be converted to any VARCHAR type shorter or the same length and also
to STRING, while STRING cannot safely be converted to VARCHAR because
its length is not bounded - we would therefore run into problems if the
common type were STRING and the destination column were VARCHAR.
Note: although the VALUES statement is implemented as a special UNION
operation under the hood, this patch doesn't change the behaviour of
explicit UNION statements, it only applies to VALUES statements.
Note: the new VALUES_STMT_AVOID_LOSSY_CHAR_PADDING query option and
ALLOW_UNSAFE_CASTS are not allowed to be used at the same time: if both
are set to true and the query contains set operation(s), an error is
returned.
Testing:
- Added tests verifying that unneeded padding doesn't occur and the
queries succeed in various situations, e.g. different destination
column types and multi-column inserts. See
testdata/workloads/functional-query/queries/QueryTest/chars-values-clause.test
Change-Id: I9e9e189cb3c2be0e741ca3d15a7f97ec3a1b1a86
Reviewed-on: http://gerrit.cloudera.org:8080/18999
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IcebergDeleteNode and IcebergDeleteBuild classes are based on
PartitionedHashJoin counterparts. The actual "join" part of the node is
optimized, while others are kept very similarly, to be able to integrate
features of PartitionedHashJoin if needed (partitioning, spilling).
ICEBERG_DELETE_JOIN is added as a join operator which is used only by
IcebergDeleteNode node.
IcebergDeleteBuild processes the data from the relevant delete files and
stores them in a {file_path: ordered row id vector} hash map.
IcebergDeleteNode tracks the processed file and progresses through the
row id vector parallel to the probe batch to check if a row is deleted
or hashes the probe row's file path and uses binary search to find the
closest row id if it is needed for the check.
Testing:
- Duplicated related planner tests to run both with new operator and
hash join
- Added a dimension for e2e tests to run both with new operator and
hash join
- Added new multiblock tests to verify assumptions used in new
operator to optimize probing
- Added new test with BATCH_SIZE=2 to verify in/out batch handling
with new operator
Change-Id: I024a61573c83bda5584f243c879d9ff39dd2dcfa
Reviewed-on: http://gerrit.cloudera.org:8080/19850
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch implements the migration from legacy Hive tables to Iceberg
tables. The target Iceberg tables inherit the location of the original
Hive tables. The Hive table has to be a non-transactional table.
To migrate a Hive format table stored in a distributed system or object
store to an Iceberg table use the command:
ALTER TABLE [dbname.]table_name CONVERT TO ICEBERG [TBLPROPERTIES(...)];
Currently only 'iceberg.catalog' is allowed as a table property.
For example
- ALTER TABLE hive_table CONVERT TO ICEBERG;
- ALTER TABLE hive_table CONVERT TO ICEBERG TBLPROPERTIES(
'iceberg.catalog' = 'hadoop.tables');
The HDFS table to be converted must follow those requirements:
- table is not a transactional table
- InputFormat must be either PARQUET, ORC, or AVRO
This is an in-place migration so the original data files of the legacy
Hive table are re-used and not moved, copied or re-created by this
operation. The new Iceberg table will have the 'external.table.purge'
property set to true after the migration.
NUM_THREADS_FOR_TABLE_MIGRATION query option can control the maximum
number of threads to execute the table conversion. The default value is
one, meaning that table conversion runs on one thread. It can be
configured in a range of [0, 1024]. Zero means that the number of CPU
cores will be the degree of parallelism. A value greater than zero will
imply the number of threads used for table conversion, however, there
is a cap of the number of CPU cores as the highest degree of
parallelism.
Process of migration:
- Step 1: Setting table properties,
e.g. 'external.table.purge'=false on the HDFS table.
- Step 2: Rename the HDFS table to a temporary table name using a name
format of "<original_table_name>_tmp_<random_ID>".
- Step 3: Refresh the renamed HDFS table.
- Step 4: Create an external Iceberg table by Iceberg API using the
data of the Hdfs table.
- Step 5 (Optional): For an Iceberg table in Hadoop Tables, run a
CREATE TABLE query to add the Iceberg table to HMS as well.
- Step 6 (Optional): For an Iceberg table in Hive catalog, run an
INVALIDATE METADATA to make the new table available for all
coordinators right after the conversion finished.
- Step 7 (Optional): For an Iceberg table in Hadoop Tables, set the
'external.table.purge' property to true in an ALTER TABLE
query.
- Step 8: Drop the temporary HDFS table.
Testing:
- Add e2e tests
- Add FE UTs
- Manually tested the runtime performance for a table that is
unpartitioned and has 10k data files. The runtime is around 10-13s.
Co-authored-by: lipenglin <lipenglin@apache.org>
Change-Id: Iacdad996d680fe545cc9a45e6bc64a348a64cd80
Reviewed-on: http://gerrit.cloudera.org:8080/20077
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Tamas Mate <tmater@apache.org>
This patch adds an expiremental query option called ALLOW_UNSAFE_CASTS
which allows implicit casting between some numeric types and string
types. A new type of compatibility is introduced for this purpose, and
the compatibility rule handling is refactored also. The new approach
uses an enum to differentiate the compatibility levels, and to make it
easier to pass them through methods. The unsafe compatibility is used
only in two cases: for set operations and for insert statements. The
insert statements and set operations accept unsafe implicitly casted
expressions only when the source expressions are constant.
The following implicit type casts are enabled in unsafe mode:
- String -> Float, Double
- String -> Tinyint, Smallint, Int, Bigint
- Float, Double -> String
- Tinyint, Smallint, Int, Bigint -> String
The patch also covers IMPALA-3217, and adds two more rules to handle
implicit casting in set operations and insert statements between string
types:
- String -> Char(n)
- String -> Varchar(n)
The unsafe implicit casting requires that the source expression must be
constant in this case as well.
Tests:
- tests added to AnalyzeExprsTest.java
- new test class added to test_insert.py
Change-Id: Iee5db2301216c2e088b4b3e4f6cb5a1fd10600f7
Reviewed-on: http://gerrit.cloudera.org:8080/19881
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for the DELETE operation for partitioned
Iceberg tables. It does so by writing position delete files
(merge-on-read strategy). The delete files contain the file path
and file position of the deleted records. The delete files must
reside in the same partition as the data files they are referring
to.
To execute the DELETE statement for a given table 'tbl', we are
basically doing an INSERT to the virtual DELETE table
'tbl-POSITION-DELETE':
from:
DELETE FROM ice_t WHERE id = 42;
to:
INSERT INTO ice_t-POSITION-DELETE
SELECT INPUT__FILE__NAME, FILE__POSITION
FROM ice_t
WHERE id = 42;
The above was true for unpartitioned Iceberg tables.
If the table is partitioned, we need to shuffle the rows around
executors based on the partitions they belong, then sort the rows
based on the partitions (also based on 'file_path' and 'pos'), so
writers can work on partitions sequentially.
To do this, we need to select the partition columns as well from the
table. But in case of partition-evolution there are different sets
of partition columns in each partition spec of the table. To overcome
this, this patchset introduces two additional virtual columns:
* PARTITION__SPEC__ID
* ICEBERG__PARTITION__SERIALIZED
PARTITION__SPEC__ID is an INT column that contains the Iceberg spec_id
for each row. ICEBERG__PARTITION__SERIALIZED is a BINARY column that
contains all partition values base64-encoded and dot-separated. E.g.:
select PARTITION__SPEC__ID, ICEBERG__PARTITION__SERIALIZED, * FROM ice_t
+---------------------+--------------------------------+---+---+
| partition__spec__id | iceberg__partition__serialized | i | j |
+---------------------+--------------------------------+---+---+
| 0 | Mg== | 2 | 2 |
| 0 | Mg== | 2 | 2 |
+---------------------+--------------------------------+---+---+
So for the INSERT we are shuffling the rows between executors based on
HASH(partition__spec__id, iceberg__partition__serialized) then each
writer fragment sorts the rows based on (partition__spec__id,
iceberg__partition__serialized, file_path, pos) before writing them out
to delete files. The IcebergDeleteSink has been smarten up in a way that
it creates a new delete file whenever it sees a row with a new
(partition__spec__id, iceberg__partition__serialized).
Some refactorings were also involved during implementing this patch set.
A lot of common code between IcebergDeleteSink and HdfsTableSink has
been moved to the common base class TableSinkBase. In the Frontend this
patch set also moves some common code of InsertStmt and ModifyStmt to a
new common base class DmlStatementBase.
Testing:
* planner tests
* e2e tests (including interop with Hive)
* Did manual stress test with a TPCDS_3000.store_sales
** Table had 8 Billion rows, partitioned by column (ss_sold_date_sk)
** Deleted 800 Million rows using 10 Impala hosts
** Operation was successful, finished under a minute
** Created minimum number of delete files, i.e. one per partition
Change-Id: I28b06f240c23c336a7c5b6ef22fe2ee0a21f7b60
Reviewed-on: http://gerrit.cloudera.org:8080/20078
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When inserting non-partitioned tables, the catalog update
request could fail due to file not found exceptions. At that
point we have reset(cleared) the partition map so it becomes
empty after the failure, which is an illegal state and will
cause failures in later operations. Currently, users have to
manually invalidate the metadata of the table to recover. We
can improve this by making all the updates happen after all
the external loadings succeed. So any failures in loading the
file metadata won't leave the table metadata in a partially
updated state.
Testing:
1. Added a test which simulates a failure in a catalog update
request by throwing an exception through the debug action and
confirms that subsequent catalog update requests are not
affected by the failure.
Change-Id: I28e76a73b7905c24eb93b935124d20ea7abe8513
Reviewed-on: http://gerrit.cloudera.org:8080/19878
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds the query progress of queries to Impala /queries
webpage. The query progress shows the completion progress of a
query's fragment instances. It helps users track the completion of
computation-intensive queries.
Testing:
- Added test cases to test_observability.py
- Added a new functional test to test_web_pages.py
Change-Id: Ic0e8695a8a8395c1364b4b249f83c4345d2cc53e
Reviewed-on: http://gerrit.cloudera.org:8080/19706
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
When using local catalog mode, if a runtime filter is being generated
for a time travel iceberg table, then a query may fail with "ERROR:
IllegalArgumentException: null"
In the planner an Iceberg table that is being accessed with Time Travel
is represented by an IcebergTimeTravelTable object. This object
represents a time-based variation on a base table. The
IcebergTimeTravelTable may represent a different schema from the base
table, it does this by tracking its own set of Columns. As part of
generating a runtime filter the isClusteringColumn() method is called
on the table. IcebergTimeTravelTable was delegating this call to the
base object. In local catalog mode this method is implemented by
LocalTable which has a Preconditions check (an assertion) that the
column parameter matches the stored column. In this case the check
fails as the base table and time travel table have their own distinct
set of column objects.
The fix is to have IcebergTimeTravelTable provide its own
isClusteringColumn() method. For iceberg there are no clustering
columns, so this method simply returns false.
TESTING
- Ran all end-to-end tests.
- Added test case for query that failed.
Change-Id: I51d04c8757fb48bd417248492d4615ac58085632
Reviewed-on: http://gerrit.cloudera.org:8080/20034
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for DELETE statements on unpartitioned Iceberg
tables. Impala uses the 'merge-on-read' mode with position delete files.
The patch reuses the existing IcebergPositionDeleteTable as the target
table of the DELETE statements, because this table already has the same
schema as position delete files, even with correct Iceberg field IDs.
The patch basically rewrites DELETE statements to INSERT statements,
e.g.:
from:
DELETE FROM ice_t WHERE id = 42;
to:
INSERT INTO ice_t-POSITION-DELETE
SELECT INPUT__FILE__NAME, FILE__POSITION
FROM ice_t
WHERE id = 42;
Position delete files need to be ordered by (file_path, pos), so
we add an extra SORT node before the table sink operator.
In the backend the patch adds a new table sink operator, the
IcebergDeleteSink. It writes the incoming rows (file_path, position) to
delete files. It reuses a lot of code from HdfsTableSink, so this patch
moves the common code to the new common base class: TableSinkBase.
The coordinator then collects the written delete files and invokes
UpdateCatalog to finalize the DELETE statement.
The Catalog then uses Iceberg APIs to create a new snapshot with the
created delete files. It also validates that there was no conflicting
data files written since the operation started.
Testing:
* added planer test
* e2e tests
* interop test between Impala and Hive
Change-Id: Ic933b2295abe54b46d2a736961219988ff42915b
Reviewed-on: http://gerrit.cloudera.org:8080/19776
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
test_krpc_datastream_sender_shuffle has been failing with OOM error in
HDFS EC environement. This is due to fix introduced in IMPALA-12106
reduce num instances of a union fragment (F06) from 3 to 2. Running the
test with BATCH_SIZE=8 help pass the test while still holding the
assertion (KrpcDataStreamSender claiming megabytes of memory for
RowBatchSerialization).
Testing:
- test_krpc_datastream_sender_shuffle pass both in regular minicluster
and HDFS EC setup.
Change-Id: I8c7961ad8dd489a4d62e738d364d4da1fa44d0cc
Reviewed-on: http://gerrit.cloudera.org:8080/20011
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
This patch makes multiple improvements to query profile and RPC metrics
to improve observability and allow more detailed analysis of where time
is being spent by client RPCs.
- A new CreateResultSetTime metric has been added to PLAN_ROOT_SINK node
in the query profile. This timer isolates the cost to convert fetched
rows to the client protocol.
- Read/Write time is now tracked during client RPC execution and added to
the rpcz JSON output. A checkbox in the /rpcz Web UI page enables
display of the Read/Write stats.
- Read and Write time are defined from Thrift callbacks defined in
apache::thrift::TProcessorEventHandler. Read time includes reading and
deserializing Thrift RPC args from the transport. Write time includes
serializing, writing, and flushing Thrift RPC args to the transport.
- Client RPC cost is tracked on a per-query basis and displayed in the
server profile as RPCCount, RPCReadTimer, and RPCWriteTimer
- Accuracy of RPC histograms is changed from milliseconds to microseconds
Testing:
tests added to test_fetch.py and test_web_pages.py
Change-Id: I986f3f2afac1775274895393969b270cf956b262
Reviewed-on: http://gerrit.cloudera.org:8080/19966
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change aims to decrease back-pressure in the sorter. It offers an
alternative for the in-memory run formation strategy and sorting
algorithm by introducing a new in-memory merge level between the
in-memory quicksort and the external merge phase.
Instead of forming one big run, it produces many smaller in-memory runs
(called miniruns), sorts those with quicksort, then merges them
in memory, before spilling or serving GetNext().
The external merge phase remains the same.
Works with MAX_SORT_RUN_SIZE development query option that determines
the maximum number of pages in a 'minirun'. The default value of
MAX_SORT_RUN_SIZE is 0, which keeps the original implementation of 1
big initial in-memory run. Other options are integers of 2 and above.
The recommended value is 10 or more, to avoid high fragmentation
in case of large workloads and variable length data.
Testing:
- added MAX_SORT_RUN_SIZE as an additional test dimension to
test_sort.py with values [0, 2, 20]
- additional partial sort test case (inserting into partitioned
kudu table)
- manual E2E testing
Change-Id: I58c0ae112e279b93426752895ded7b1a3791865c
Reviewed-on: http://gerrit.cloudera.org:8080/18393
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Csaba Ringhofer <csringhofer@cloudera.com>
updateCatalog() invokes createTblTransaction() for transactional tables.
It's called after acquiring the table lock. The write lock of catalog's
versionLock will also be acquired by the current thread. Whenever we hit
an exception, we should release those locks. This patch moves the code
calling createTblTransaction() into the exception handling scope.
Tests:
- Add a debug action to abort the transaction in updateCatalog() so
createTblTransaction() will fail.
- Add e2e test for the error handling.
Change-Id: I3a64764d0568fc1e6c6f4c52f9e220df3130bd84
Reviewed-on: http://gerrit.cloudera.org:8080/20020
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The new processing cost-based planner changes (IMPALA-11604,
IMPALA-12091) will impact output writer parallelism for insert queries,
with the potential for more small files if the processing cost-based
planning results in too many writer fragments. It can further exacerbate
a problem introduced by MT_DOP (see IMPALA-8125).
The MAX_FS_WRITERS query option can help mitigate this. But even without
the MAX_FS_WRITERS set, the default output writer parallelism should
avoid creating excessive writer parallelism for partitioned and
unpartitioned inserts.
This patch implements such a limit when using the cost-based planner. It
limits the number of writer fragments such that each writer fragment
writes at least 256MB of rows. This patch also allows CTAS (a kind of
DDL query) to be eligible for auto-scaling.
This patch also remove comments about NUM_SCANNER_THREADS added by
IMPALA-12029, since it does not applies anymore after IMPALA-12091.
Testing:
- Add test cases in test_query_cpu_count_divisor_default
- Add test_processing_cost_writer_limit in test_insert.py
- Pass test_insert.py::TestInsertHdfsWriterLimit
- Pass test_executor_groups.py
Change-Id: I289c6ffcd6d7b225179cc9fb2f926390325a27e0
Reviewed-on: http://gerrit.cloudera.org:8080/19880
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch the Parquet STRUCT reader didn't fill the
position slots: collection position, file position. When users
queried these virtual columns Impala was crashed or returned
incorrect results.
The ORC scanner already worked correctly, but there was no tests
written for it.
Test:
* e2e tests for both ORC / Parquet
Change-Id: I32a808a11f4543cd404ed9f3958e9b4e971ca1f4
Reviewed-on: http://gerrit.cloudera.org:8080/19911
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Fixes writing an empty parquet page when a page fills (or reaches
parquet_page_row_count_limit) at the same time that its dictionary
fills.
When a page filled (or reached parquet_page_row_count_limit) at the same
time that the dictionary filled, Impala would first detect the page was
full and create a new page. It would then detect the dictionary is full
and create another page, resulting in an empty page.
Parquet readers like Hive error if they encounter an empty page. This
patch attempts to make it impossible to generate an empty page by
reworking AppendRow and adding DCHECKs for empty pages. Dictionary size
is checked on FinalizeCurrentPage so whenever a page is written, we also
flush the dictionary if full.
Addresses clang-tidy by adding override in source files.
Testing:
- new test for full page size reached with full dictionary
- new test for parquet_page_row_count_limit with full dictionary
- new test for parquet_page_row_count_limit followed by large value.
This seems useful as a theoretical corner-case; it currently writes
the too-large value to the page anyway, but if we ever start checking
whether the first value will fit the page this could become an issue.
Change-Id: I90d30d958f07c6289a1beba1b5df1ab3d7213799
Reviewed-on: http://gerrit.cloudera.org:8080/19898
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
HDDS-7122 changed how Ozone prints chunk size from bytes to KB, as in
1024k rather than 1048576. That makes it consistent with HDFS reporting.
Our tests verify the chunk size reported in SHOW output. Updates the
expected erasure code policy string to match the new format.
Updates CDP_OZONE_VERSION to a build that includes HDDS-7122. However
this build includes two regressions that we work around for the moment:
- HDDS-8543: FSO layout reports incorrect replication config for
directories in EC buckets
- HDDS-8289: FSO layout listStatus operations get slower with lots of
files and filesystem operations
Testing:
- ran test suite with Ozone Erasure Coding
Change-Id: I5354de61bbc507931a1d5bc86f6466c0dd50fc30
Reviewed-on: http://gerrit.cloudera.org:8080/19870
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Laszlo Gaal <laszlo.gaal@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
This adds ClientFetchWaitTimeStats to the runtime profile
to track the min/max/# of samples for ClientFetchWaitTimer.
Here is some sample output:
- ClientFetchWaitTimeStats: (Avg: 161.554ms ; Min: 101.411ms ; Max: 461.728ms ; Number of samples: 6)
- ClientFetchWaitTimer: 969.326ms
This also fixes the definition of ClientFetchWaitTimer to avoid
including time after end of fetch. When the client is closing
the query, Finalize() gets called. The Finalize() call should
only add extra client wait time if fetch has not completed.
Testing:
- Added test cases in query_test/test_fetch.py with specific
numbers of fetches and verification of the statistics.
- The test cases make use of a new function for parsing
summary stats for timers, and this also gets its own test
case.
Change-Id: I9ca525285e03c7b51b04ac292f7b3531e6178218
Reviewed-on: http://gerrit.cloudera.org:8080/19897
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
As a first stage of IMPALA-10939, this change implements support for
including in the sorting tuple top-level collections that only contain
fixed length types (including fixed length structs). For these types the
implementation is almost the same as the existing handling of strings.
Another limitation is that structs that contain any type of collection
are not yet allowed in the sorting tuple.
Also refactored the RawValue::Write*() functions to have a clearer
interface.
Testing:
- Added a new test table that contains many rows with arrays. This is
queried in a new test added in test_sort.py, to ensure that we handle
spilling correctly.
- Added tests that have arrays and/or maps in the sorting tuple in
test_queries.py::TestQueries::{test_sort,
test_top_n,test_partitioned_top_n}.
Change-Id: Ic7974ef392c1412e8c60231e3420367bd189677a
Reviewed-on: http://gerrit.cloudera.org:8080/19660
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
There is a sign of flakiness in TestTpcdsQueryWithProcessingCost within
dockerised environment. The flakiness seems to happen due to tighter
per-process memory limit in dockerised environment. This patch skip
TestTpcdsQueryWithProcessingCost in dockerised environment.
Testing:
- Hack SkipIfDockerizedCluster.insufficient_mem_limit to return True if
IS_HDFS and confirm that the whole TestTpcdsQueryWithProcessingCost is
skipped.
Change-Id: Ibb6b2d4258a2c6613d1954552f21641b42cb3c38
Reviewed-on: http://gerrit.cloudera.org:8080/19892
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Parquet v2 means several changes in Parquet files compared to v1:
1. file version = 2 instead of 1
c185faf0c4/src/main/thrift/parquet.thrift (L1016)
Before this patch Impala rejected Parquet files with version!=1.
2. possible use of DataPageHeaderV2 instead DataPageHeader
c185faf0c4/src/main/thrift/parquet.thrift (L561)
The main differences compared to V1 DataPageHeader:
a. rep/def levels are not compressed, so the compressed part contains
only the actual encoded values
b. rep/def levels must be RLE encoded (Impala only supports RLE encoded
levels even for V1 pages)
c. compression can be turned on/off per page (member is_compressed)
d. number of nulls (member num_nulls) is required - in v1 it was
included in statistics which is optional
e. number of rows is required (member num_rows) which can help with
matching collection items with the top level collection
The patch adds support for understanding v2 data pages but does not
implement some potential optimizations:
a. would allow an optimization for queries that need only the nullness
of a column but not the actual value: as the values are not needed the
decompression of the page data can be skipped. This optimization is not
implemented - currently Impala materializes both the null bit and the
value for all columns regardless of whether the value is actually
needed.
d. could be also used for optimizations / additional validity checks
but it is not used currently
e. could make skipping rows easier but is not used, as the existing
scanner has to be able to skip rows efficiently also in v1 files so
it can't rely on num_rows
3. possible use of new encodings (e.g. DELTA_BINARY_PACKED)
No new encoding is added - when an unsupported encoding is encountered
Impala returns an error.
parquet-mr uses new encodings (DELTA_BINARY_PACKED, DELTA_BYTE_ARRAY)
for most types if the file version is 2, so with this patch Impala is
not yet able to read all v2 Parquet tables written by Hive.
4. Encoding PLAIN_DICTIONARY is deprecated and RLE_DICTIONARY is used
instead. The semantics of the two encodings are exactly the same.
Additional changes:
Some responsibilites are moved from ParquetColumnReader to
ParquetColumnChunkReader:
- ParquetColumnChunkReader decodes rep/def level sizes to hide v1/v2
differences (see 2.a.)
- ParquetColumnChunkReader skips empty data pages in
ReadNextDataPageHeader
- the state machine of ParquetColumnChunkReader is simplified by
separating data page header reading / reading rest of the page
Testing:
- added 4 v2 Parquet test tables (written by Hive) to cover
compressed / uncompressed and scalar/complex cases
- added EE and fuzz tests for the test tables above
- manual tested v2 Parquet files written by pyarrow
- ran core tests
Note that no test is added where some pages are compressed while
some are not. It would be tricky to create such files with existing
writers. The code should handle this case and it is very unlikely that
files like this will be encountered.
Change-Id: I282962a6e4611e2b662c04a81592af83ecaf08ca
Reviewed-on: http://gerrit.cloudera.org:8080/19793
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.
Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.
Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.
Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.
Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():
- If a fragment has UnionNode, its parallelism is the maximum between
its input fragments and its collocated ScanNode's expected
parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
parallelism is calculated in the same fashion as the interior fragment
but will not be lowered anymore since it will not have any child
fragment to compare with.
Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.
The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.
This patch is benchmarked using single_node_perf_run.py with the
following parameters:
args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
--workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
--impalad_args="$args" \
--query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
"IMPALA-12091~1" IMPALA-12091
The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62 | 4.54 | +1.92% | 0.23% | 1.59% | 5 | +2.32% | 1.15 | 2.67 |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82 | 5.76 | +1.08% | 5.27% | 3.89% | 5 | +2.04% | 0.00 | 0.37 |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65 | 4.58 | +1.38% | 1.97% | 0.48% | 5 | +0.81% | 0.87 | 1.51 |
| TPCDS(10) | TPCDS-Q49 | parquet / none / none | 1.49 | 1.48 | +0.46% | * 36.02% * | * 34.95% * | 5 | +1.26% | 0.58 | 0.02 |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76 | 3.75 | +0.39% | 1.67% | 0.58% | 5 | -0.03% | -0.58 | 0.49 |
| TPCDS(10) | TPCDS-Q78 | parquet / none / none | 2.80 | 2.80 | -0.04% | 1.32% | 1.33% | 5 | -0.42% | -0.29 | -0.05 |
| TPCDS(10) | TPCDS-Q80A | parquet / none / none | 2.87 | 2.89 | -0.51% | 1.33% | 0.40% | 5 | -0.01% | -0.29 | -0.82 |
| TPCDS(10) | TPCDS-Q3 | parquet / none / none | 0.18 | 0.19 | -1.29% | * 15.26% * | * 15.87% * | 5 | -0.54% | -0.87 | -0.13 |
| TPCDS(10) | TPCDS-Q76 | parquet / none / none | 1.08 | 1.11 | -2.98% | 0.92% | 1.70% | 5 | -3.99% | -2.02 | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests
Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Reviewed-on: http://gerrit.cloudera.org:8080/19807
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
NumFileMetadataRead counter was lost with the revert of commit
f932d78ad0. This patch restore
NumFileMetadataRead counter and also assertions in impacted iceberg test
files. Other impacted test files will be gradually restored with
reimplementation of optimized count star for ORC.
Testing:
- Pass core tests.
Change-Id: Ib14576245d978a127f688e265cab2f4ff519600c
Reviewed-on: http://gerrit.cloudera.org:8080/19854
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This reverts commit f932d78ad0.
The commit is reverted because it cause significant regression for
non-optimized counts star query in parquet format.
There are several conflicts that need to be resolved manually:
- Removed assertion against 'NumFileMetadataRead' counter that is lost
with the revert.
- Adjust the assertion in test_plain_count_star_optimization,
test_in_predicate_push_down, and test_partitioned_insert of
test_iceberg.py due to missing improvement in parquet optimized count
star code path.
- Keep the "override" specifier in hdfs-parquet-scanner.h to pass
clang-tidy
- Keep python3 style of RuntimeError instantiation in
test_file_parser.py to pass check-python-syntax.sh
Change-Id: Iefd8fd0838638f9db146f7b706e541fe2aaf01c1
Reviewed-on: http://gerrit.cloudera.org:8080/19843
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
IMPALA-9495 added support for struct types in SELECT lists but only with
codegen turned off. This commit implements codegen for struct types.
To facilitate this, code generation for reading and writing 'AnyVal's
has been refactored. A new class, 'CodegenAnyValReadWriteInfo' is
introduced. This class is an interface between sources and destinations,
one of which is an 'AnyVal' object: sources generate an instance of this
class and destinations take that instance and use it to write the value.
The other side can for example be tuples from which we read (in the case
of 'SlotRef') or tuples we write into (in case of materialisation, see
Tuple::CodegenMaterializeExprs()). The main advantage is that sources do
not have to know how to write their destinations, only how to read the
values (and vice versa).
Before this change, many tests that involve structs ran only with
codegen turned off. Now that codegen is supported in these cases, these
tests are also run with codegen on.
Testing:
- enabed tests for structs in the select list with codegen on in
tests/query_test/test_nested_types.py
- enabled codegen in other tests where it used to be disabled because
it was not supported.
Change-Id: I5272c3f095fd9f07877104ee03c8e43d0c4ec0b6
Reviewed-on: http://gerrit.cloudera.org:8080/18526
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The end time in DML profiles is incorrect that it's actually the time
when admission control resources are released. This is correct for
normal queries. But for DMLs, coordinator still needs to invoke the
updateCatalog RPC of catalogd to finalize the HMS update. The end time
should be set after the request finished.
This patch fixes the DML end time by not setting it after the admission
control resources are released. Instead, it's set after
ClientRequestState::WaitInternal() finishes, which makes sure the
updateCatalog RPC has finished.
Also adds a duration field in profile by the way.
For testing, this patch also adds a new debug action in catalogd
(catalogd_insert_finish_delay) to inject delays in updateCatalog.
Tests
- Added e2e test to verify the end time of a DML profile
Change-Id: I9c5dc92c2f8576ceed374d447c0ac05022a2dee6
Reviewed-on: http://gerrit.cloudera.org:8080/19644
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Fix an Ozone test failure where we were using 'hdfs_client', which is
set to None on non-HDFS systems. Instead use 'filesystem_client' which
is a generic interface for doing filesystem operations that works
across all the filesystems that Impala supports.
TESTING:
- Ran the test on Ozone.
Change-Id: Ib6fe961cb659c3002321f26673293e6e25b902a4
Reviewed-on: http://gerrit.cloudera.org:8080/19672
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change extends parsing table references with Iceberg metadata
tables. The TableName class has been extended with an extra vTbl field
which is filled when a virtual table reference is suspected. This
additional field helps to keep the real table in the statement table
cache next to the virtual table, which should be loaded so Iceberg
metadata tables can be created.
Iceberg provides a rich API to query metadata, these Iceberg API tables
are accessible through the MetadataTableUtils class. Using these table
schemas it is possible to create an Impala table that can be queried
later on.
Querying a metadata table at this point is expected to throw a
NotImplementedException.
Testing:
- Added E2E test to test it for some tables.
Change-Id: I0b5db884b5f3fecbd132fcb2c2cbd6c622ff965b
Reviewed-on: http://gerrit.cloudera.org:8080/19483
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
There are some steps in the test that set the timezone and execute
rollback to given time. However, the snapshot creation, and querying
the current time uses local timezone. As a reuslt if the test is run
e.g. in CET timezone it fails when expecting an error in the Icelandic
timezone.
Tests:
- Re-ran TestIcebergTable.test_execute_rollback.
Change-Id: Iba9724f9b86cc508e6497eb33844a6480498b6e4
Reviewed-on: http://gerrit.cloudera.org:8080/19655
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Traditionally table metadata is loaded by the catalog and sent as thrift
to the Impala daemons. With Iceberg tables, some metadata, for example
the org.apache.iceberg.Table, is loaded in the Coordinator at the same
time as the thrift description is being deserialized. If the loading of
the org.apache.iceberg.Table fails, perhaps because of missing Iceberg
metadata, then the loading of the table fails. This can cause an
infinite loop as StmtMetadataLoader.loadTables() waits hopefully for
the catalog to send a new version of the table.
Change some Iceberg table loading methods to throw
IcebergTableLoadingException when a failure occurs. Prevent the hang by
substituting in an IncompleteTable if an IcebergTableLoadingException
occurs.
The test test_drop_incomplete_table had previously been disabled because
of IMPALA-11509. To re-enable this required a second change. The way
that DROP TABLE is executed on an iceberg table depends on which
Iceberg catalog is being used. If this Iceberg catalog is not a Hive
catalog then the execution happens in two parts, first the Iceberg
table is dropped, then the table is dropped in HMS. If this case, if
the drop fails in Iceberg, we should still continue on to perform the
drop in HMS.
TESTING
- Add a new test, originally developed for IMPALA-11330, which tests
failures after deleting Iceberg metadata.
- Re-enable test_drop_incomplete_table().
Change-Id: I695559e21c510615918a51a4b5057bc616ee5421
Reviewed-on: http://gerrit.cloudera.org:8080/19509
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Enables test_erasure_coding for Ozone. HDDS-7603 is planned for Ozone
1.4.0, but we test with a CDP build - 1.3.0.7.2.17.0-127 - that already
includes this fix. Since this is a testing-only change, seems safe to
rely on that.
Testing:
- Ran test_erasure_coding on Ozone with EC
Change-Id: Iee57c008102db7fac89abcea9a140c867178bb08
Reviewed-on: http://gerrit.cloudera.org:8080/19578
Reviewed-by: Laszlo Gaal <laszlo.gaal@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
Iceberg table modifications cause new table snapshots to be created;
these snapshots represent an earlier version of the table. The Iceberg
API provides a way to rollback the table to a previous snapshot.
This change adds the ability to execute a rollback on Iceberg tables
using the following statements:
- ALTER TABLE <tbl> EXECUTE ROLLBACK(<snapshot id>)
- ALTER TABLE <tbl> EXECUTE ROLLBACK('<timestamp>')
The latter form of the command rolls back to the most recent snapshot
that has a creation timestamp that is older than the specified
timestamp.
Note that when a table is rolled back to a snapshot, a new snapshot is
created with the same snapshot id, but with a new creation timestamp.
Testing:
- Added analysis unit tests.
- Added e2e tests.
- Converted test_time_travel to use get_snapshots() from iceberg_util.
- Add a utility class to allow pytests to create tables with various
iceberg catalogs.
Change-Id: Ic74913d3b81103949ffb5eef7cc936303494f8b9
Reviewed-on: http://gerrit.cloudera.org:8080/19002
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>