We optimize plain count(*) queries on Iceberg tables the following way:
AGGREGATE
COUNT(*)
|
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / \
without / \
deletes SCAN SCAN
datafiles deletes
||
rewrite
||
\/
ArithmethicExpr: LHS + RHS
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / \
/ \
SCAN SCAN
datafiles deletes
This optimization consists of two parts:
1 Rewriting count(*) expression to count(*) + "record_count" (of data
files without deletes)
2 In IcebergScanPlanner we only need to consruct the right side of
the original UNION ALL operator, i.e.:
ANTI JOIN
/ \
/ \
SCAN SCAN
datafiles deletes
SelectStmt decides whether we can do the count(*) optimization, and if
so, does the following:
1: SelectStmt sets 'TotalRecordsNumV2' in the analyzer, then during the
expression rewrite phase the CountStarToConstRule rewrites the
count(*) to count(*) + record_count
2: SelectStmt sets "OptimizeCountStarForIcebergV2" in the query context
then IcebergScanPlanner creates plan accordingly.
This mechanism works for simple queries, but can turn on count(*)
optimization in IcebergScanPlanner for all Iceberg V2 tables in complex
queries. Even if only one subquery enables count(*) optimization during
analysis.
With this patch the followings change:
1: We introduce IcebergV2CountStarAccumulator which we use instead of
the ArithmethicExpr. So after rewrite we still know if count(*)
optimization should be enabled for the planner.
2: Instead of using the query context, we pass the information to the
IcebergScanPlanner via the TableRef object.
Testing
* e2e tests
Change-Id: I1940031298eb634aa82c3d32bbbf16bce8eaf874
Reviewed-on: http://gerrit.cloudera.org:8080/23705
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
In some cases users delete files directly from storage without
going through the Iceberg API, e.g. they remove old partitions.
This corrupts the table, and makes queries that try to read the
missing files fail.
This change introduces a repair statement that deletes the
dangling references of missing files from the metadata.
Note that the table cannot be repaired if there are missing
delete files because Iceberg's DeleteFiles API which is used
to execute the operation allows removing only data files.
Testing:
- E2E
- HDFS
- S3, Ozone
- analysis
Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e
Reviewed-on: http://gerrit.cloudera.org:8080/23512
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
tables
This patch implements partition filtering support for the SHOW FILES
statement on Iceberg tables, based on the functionality added in
IMPALA-12243. Prior to this change, the syntax resulted in a
NullPointerException.
Key changes:
- Added ShowFilesStmt.analyzeIceberg() to validate and transform
partition expressions using IcebergPartitionExpressionRewriter and
IcebergPartitionPredicateConverter. After that, it collects matching
file paths using IcebergUtil.planFiles().
- Added FeIcebergTable.Utils.getIcebergTableFilesFromPaths() to
accept pre-filtered file lists from the analysis phase.
- Enhanced TShowFilesParams thrift struct with optional selected_files
field to pass pre-filtered file paths from frontend to backend.
Testing:
- Analyzer tests for negative cases: non-existent partitions, invalid
expressions, non-partition columns, unsupported transforms.
- Analyzer tests for positive cases: all transform types, complex
expressions.
- Authorization tests for non-filtered and filtered syntaxes.
- E2E tests covering every partition transform type with various
predicates.
- Schema evolution and rollback scenarios.
The implementation follows AlterTableDropPartition's pattern where the
analysis phase performs validation/metadata retrieval and the execution
phase handles result formatting and display.
Change-Id: Ibb9913e078e6842861bdbb004ed5d67286bd3152
Reviewed-on: http://gerrit.cloudera.org:8080/23455
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala crashes when it needs to write multiple delete files per
partition in a single DELETE operation. It is because
IcebergBufferedDeleteSink has its own DmlExecState object, but
sometimes the methods in TableSinkBase use the RuntimeState's
DmlExecState object. I.e. it can happen that we add a partition
to the IcebergBufferedDeleteSink's DmlExecState, but later we
expect to find it in the RuntimeState's DmlExecState.
This patch adds new methods to TableSinkBase that are specific
for writing delete files, and they always take a DmlExecState
object as a parameter. They are now used by IcebergBufferedDeleteSink.
Testing
* added e2e tests
Change-Id: I46266007a6356e9ff3b63369dd855aff1396bb72
Reviewed-on: http://gerrit.cloudera.org:8080/23537
Reviewed-by: Mihaly Szjatinya <mszjat@pm.me>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this change, query plans and profile reported only a single
partition even for partitioned Iceberg tables, which was misleading
for users.
Now we can display the number of scanned partitions correctly for
both partitioned and unpartitioned Iceberg tables. This is achieved by
extracting the partition values from the file descriptors and storing
them in the IcebergContentFileStore. Instead of storing this information
redundantly in all file descriptors, we store them in one place and
reference the partition metadata in the FDs with an id.
This also gives the opportunity to optimize memory consumption in the
Catalog and Coordinator as well as reduce network traffic between them
in the future.
Time travel is handled similarly to oldFileDescMap. In that case
we don't know the total number of partitions in the old snapshot,
so the output is [Num scanned partitions]/unknown.
Testing:
- Planner tests
- E2E tests
- partition transforms
- partition evolution
- DROP PARTITION
- time travel
Change-Id: Ifb2f654bc6c9bdf9cfafc27b38b5ca2f7b6b4872
Reviewed-on: http://gerrit.cloudera.org:8080/23113
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true
reveals some tests that require adjustment. This patch made such
adjustment, which mostly revolves around encoding differences and string
vs bytes type in Python3. This patch also switch the default to run
pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The
following are the details:
Change hash() function in conftest.py to crc32() to produce
deterministic hash. Hash randomization is enabled by default since
Python 3.3 (see
https://docs.python.org/3/reference/datamodel.html#object.__hash__).
This cause test sharding (like --shard_tests=1/2) produce inconsistent
set of tests per shard. Always restart minicluster during custom cluster
tests if --shard_tests argument is set, because test order may change
and affect test correctness, depending on whether running on fresh
minicluster or not.
Moved one test case from delimited-latin-text.test to
test_delimited_text.py for easier binary comparison.
Add bytes_to_str() as a utility function to decode bytes in Python3.
This is often needed when inspecting the return value of
subprocess.check_output() as a string.
Implement DataTypeMetaclass.__lt__ to substitute
DataTypeMetaclass.__cmp__ that is ignored in Python3 (see
https://peps.python.org/pep-0207/).
Fix WEB_CERT_ERR difference in test_ipv6.py.
Fix trivial integer parsing in test_restart_services.py.
Fix various encoding issues in test_saml2_sso.py,
test_shell_commandline.py, and test_shell_interactive.py.
Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1.
Switch to binary comparison in test_iceberg.py where needed.
Specify text mode when calling tempfile.NamedTemporaryFile().
Simplify create_impala_shell_executable_dimension to skip testing dev
and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason
is that several UTF-8 related tests in test_shell_commandline.py break
in Python3 pytest + Python2 impala-shell combo. This skipping already
happen automatically in build OS without system Python2 available like
RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty).
Removed unused vector argument and fixed some trivial flake8 issues.
Several test logic require modification due to intermittent issue in
Python3 pytest. These include:
Add _run_query_with_client() in test_ranger.py to allow reusing a single
Impala client for running several queries. Ensure clients are closed
when the test is done. Mark several tests in test_ranger.py with
SkipIfFS.hive because they run queries through beeline + HiveServer2,
but Ozone and S3 build environment does not start HiveServer2 by
default.
Increase the sleep period from 0.1 to 0.5 seconds per iteration in
test_statestore.py and mark TestStatestore to execute serially. This is
because TServer appears to shut down more slowly when run concurrently
with other tests. Handle the deprecation of Thread.setDaemon() as well.
Always force_restart=True each test method in TestLoggingCore,
TestShellInteractiveReconnect, and TestQueryRetries to prevent them from
reusing minicluster from previous test method. Some of these tests
destruct minicluster (kill impalad) and will produce minidump if metrics
verifier for next tests fail to detect healthy minicluster state.
Testing:
Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true.
Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4
Reviewed-on: http://gerrit.cloudera.org:8080/23319
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
test_execute_remove_orphan_files failed in Ozone build for not getting
the correct table path.
This patch fix the issue by defining
TABLE_PATH = '{0}/{1}.db/{2}'.format(
WAREHOUSE, unique_database, tbl_name)
Other path assertions are also modified to use os.path.join.
Testing:
Run and pass test_execute_remove_orphan_files at Ozone.
Change-Id: I2aba4f464ef472ab8f9d8ff7db7b5fb31a735478
Reviewed-on: http://gerrit.cloudera.org:8080/23114
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch implements delete orphan files query for Iceberg table.
The following statement becomes available for Iceberg tables:
- ALTER TABLE <tbl> EXECUTE remove_orphan_files(<timestamp>)
The bulk of implementation copies Hive's implementation of
org.apache.iceberg.actions.DeleteOrphanFiles interface (HIVE-27906,
6b2e21a93ef3c1776b689a7953fc59dbf52e4be4), which this patch rename to
ImpalaIcebergDeleteOrphanFiles.java. Upon execute(),
ImpalaIcebergDeleteOrphanFiles class instance will gather all URI of
valid data files and Iceberg metadata files using Iceberg API. These
valid URIs then will be compared to recursive file listing obtained
through Hadoop FileSystem API under table's 'data' and 'metadata'
directory accordingly. Any unmatched URI from FileSystem API listing
that has modification time less than 'olderThanTimestamp' parameter will
then be removed via Iceberg FileIO API of given Iceberg table. Note that
this is a destructive query that will wipe out any files within Iceberg
table's 'data' and 'metadata' directory that is not addressable by any
valid snapshots.
The execution happens in CatalogD via
IcebergCatalogOpExecutor.alterTableExecuteRemoveOrphanFiles(). CatalogD
supplied CatalogOpExecutor.icebergExecutorService_ as executor service
to execute the Iceberg API planFiles and FileIO API for deletion.
Also fixed toSql() implementation for all ALTER TABLE EXECUTE queries.
Testing:
- Add FE and EE tests.
Change-Id: I5979cdf15048d5a2c4784918533f65f32e888de0
Reviewed-on: http://gerrit.cloudera.org:8080/23042
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Since IMPALA-11591 Impala tries to avoid pushing down predicates to
Iceberg unless it is necessary (timetravel) or is likely to be useful
(at least 1 partition column is involved in predicates). While this
makes planning faster, it may miss opportunities to skip files during
planning.
This patch adds table property impala.iceberg.push_down_hint that
expects a comma separated list of column names and leads to push
down to Iceberg when there is a predicate on any of these columns.
Users can set this manually, while in the future Impala or other tools
may be able to set it automatically, e.g. during COMPUTE STATS if
there are many files with non-overlapping min/max stats for a given
column.
Note that in most cases when Iceberg can skip files the Parquet/ORC
scanner would also skip most of the data based on stat filtering. The
benefit of doing it during planning is reading less footers and a
"smaller" query plan.
Change-Id: I8eb4ab5204c20b3991fdf305d7317f4023904a0f
Reviewed-on: http://gerrit.cloudera.org:8080/22995
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
TestIcebergV2Table.test_compute_stats_table_sampling was failing in
ARM release builds. However, COMPUTE STATS with TABLESAMPLE is
inherently non-deterministic due to its use of SAMPLED_NDV().
This patch completely rewrites the tests and moves them to
test_stats_extrapolation.py to test Iceberg tables similarly to
legacy tables.
'diff_perc' argument of appx_equals() method was also updated in
the tests, as with the previous value (1.0) it only reported errors
for negative estimates.
Change-Id: I98b07b156aad300827c9e1b7970b8dfacfc6d251
Reviewed-on: http://gerrit.cloudera.org:8080/23044
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
COMPUTE STATS with TABLESAMPLE clause did a full scan on Iceberg
tables since IMPALA-13737, because before this patch ComputeStatsStmt
used FeFsTable.Utils.getFilesSample() which only works correctly on
FS tables that have the file descriptors loaded. Since IMPALA-13737
the internal FS table of an Iceberg table doesn't have file descriptor
information, therefore FeFsTable.Utils.getFilesSample() returned an
empty map which turned off table sampling for COMPUTE STATS.
We did not have proper testing for COMPUTE STATS with table sampling
therefore we did not catch the regression.
This patch adds proper table sampling logic for Iceberg tables that
can be used for COMPUTE STATS. The algorithm previously found in
IcebergScanNode.getFilesSample() has been moved to
FeIcebergTable.Utils.getFilesSample().
Testing
* added e2e tests
Change-Id: Ie59d5fc1374ab69209a74f2488bcb9a7d510b782
Reviewed-on: http://gerrit.cloudera.org:8080/22873
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a
single thread. It can be really slow on cloud storage systems,
especially if the operation needs to remove lots of files.
This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize
Iceberg API call that supports passing ExecutorService, such as
ExpireSnapshots.executeDeleteWith(). Number of threads for this executor
service is controlled by CatalogD flag --iceberg_catalog_num_threads. It
is default to 16, same as --num_metadata_loading_threads default value.
Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match
with other validators in backend-gflag-util.cc.
Testing:
- Lower sleep time between insert queries from 5s to 1s in
test_expire_snapshots and test_describe_history_params to speed up
tests.
- Manually verify that 'IcebergCatalogThread' threads are visible in
/jvm-threadz page of CatalogD.
- Pass test_iceberg.py.
Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485
Reviewed-on: http://gerrit.cloudera.org:8080/22980
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
test_scan_metrics_in_profile was querying pre-written Iceberg V2
tables. The position delete files of such tables contain hard-coded
URIs of data files, i.e. URIs that start with "hdfs://localhost...".
Therefore the test only worked well in HDFS builds.
This patch splits the test into two parts:
* test_scan_metrics_in_profile_basic: it works on all storage systems
as it only works on Iceberg tables that don't have delete files.
* test_scan_metrics_in_profile_with_deletes: uses Iceberg tables
that have delete files, therefore it is only executed on HDFS.
Change-Id: I80a7c6469a7f56b58254e1327a05ef7b3dc9c9ff
Reviewed-on: http://gerrit.cloudera.org:8080/22931
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When calling planFiles() on an Iceberg table, it can give us some
metrics like total planning time, number of data/delete files and
manifests, how many of these could be skipped etc.
This change integrates these metrics into the query profile, under the
"Frontend" section. These metrics are per-table, so if multiple tables
are scanned for the query there will be multiple sections in the
profile.
Note that we only have these metrics for a table if Iceberg needs to be
used for planning for that table, e.g. if a predicate is pushed down to
Iceberg or if there is time travel. For tables where Iceberg was not
used in planning, the profile will contain a short note describing this.
To facilitate pairing the metrics with scans, the metrics header
references the plan node responsible for the scan. This will always be
the top level node for the scan, so it can be a SCAN node, a JOIN node
or a UNION node depending on whether the table has delete files.
Testing:
- added EE tests in iceberg-scan-metrics.tests
- added a test in PlannerTest.java that asserts on the number of
metrics; if it changes in a new Iceberg release, the test will fail
and we can update our reporting
Change-Id: I080ee8eafc459dad4d21356ac9042b72d0570219
Reviewed-on: http://gerrit.cloudera.org:8080/22501
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
This puts all of the thrift-generated python code into the
impala_thrift_gen package. This is similar to what Impyla
does for its thrift-generated python code, except that it
uses the impala_thrift_gen package rather than impala._thrift_gen.
This is a preparatory patch for fixing the absolute import
issues.
This patches all of the thrift files to add the python namespace.
This has code to apply the patching to the thirdparty thrift
files (hive_metastore.thrift, fb303.thrift) to do the same.
Putting all the generated python into a package makes it easier
to understand where the imports are getting code. When the
subsequent change rearranges the shell code, the thrift generated
code can stay in a separate directory.
This uses isort to sort the imports for the affected Python files
with the provided .isort.cfg file. This also adds an impala-isort
shell script to make it easy to run.
Testing:
- Ran a core job
Change-Id: Ie2927f22c7257aa38a78084efe5bd76d566493c0
Reviewed-on: http://gerrit.cloudera.org:8080/20169
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This change adds get_workload() to ImpalaTestSuite and removes it
from all test suites that already returned 'functional-query'.
get_workload() is also removed from CustomClusterTestSuite which
used to return 'tpch'.
All other changes besides impala_test_suite.py and
custom_cluster_test_suite.py are just mass removals of
get_workload() functions.
The behavior is only changed in custom cluster tests that didn't
override get_workload(). By returning 'functional-query' instead
of 'tpch', exploration_strategy() will no longer return 'core' in
'exhaustive' test runs. See IMPALA-3947 on why workload affected
exploration_strategy. An example for affected test is
TestCatalogHMSFailures which was skipped both in core and exhaustive
runs before this change.
get_workload() functions that return a different workload than
'functional-query' are not changed - it is possible that some of
these also don't handle exploration_strategy() as expected, but
individually checking these tests is out of scope in this patch.
Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115
Reviewed-on: http://gerrit.cloudera.org:8080/22726
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IcebergPositionDeleteChannel uses incorrect capacity since IMPALA-13509.
It is set to -1 which means it collects delete records as long as it
runs out of memory. This patch moves the Channel's capacity calculation
from the Init() function to the constructor.
Testing
* e2e test added
Change-Id: I207869c97a699d2706227285595ec7d7dbe1e249
Reviewed-on: http://gerrit.cloudera.org:8080/22616
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In migrated Iceberg tables we can have data files with missing field
IDs. We assume that their schema corresponds to the table schema at the
point when the table migration happened. This means during runtime we
can generate the field ids. The logic is more complicated when there are
complex types in the table and the table is partitioned. In such cases
we need to do some adjustments during field ID generation, in which case
we verify that the file schema corresponds to the table schema.
These adjustments are not needed when the table doesn't have complex
types, hence we can be a bit more relaxed and skip schema verification,
because field ID generation for top-level columns are not affected.
This means Impala would still be able to read the table if there were
trivial schema changes before migration.
With this change we allow all data files that have a compatible schema
with the table schema, which was the case before IMPALA-13364. This
behavior is also aligned with Hive.
Testing:
* e2e tests added for both Parquet and ORC files
Change-Id: Ib1f1d0cf36792d0400de346c83e999fa50c0fa67
Reviewed-on: http://gerrit.cloudera.org:8080/22610
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change fixes the delete expression calculation for
IcebergMergeImpl, when an Iceberg table contains equality deletes, the
merge implementation now includes the data sequence number in the result
expressions as the underlying tuple descriptor also includes it
implicitly. Without including this field, the row evaluation fails
because of the mismatching number of evaluators and slot descriptors.
Tests:
- manually validated on an Iceberg table that contains equality delete
- e2e test added
Change-Id: I60e48e2731a59520373dbb75104d75aae39a94c1
Reviewed-on: http://gerrit.cloudera.org:8080/22423
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch we got a TableLoadingException for missing data files.
This means the IcebergTable will be in an incomplete state in Impala's
memory, therefore we won't be able to do any operation on it.
We should continue table loading in such cases, and only throw exception
for queries that are about to read the missing data files.
This way ROLLBACK / DROP PARTITION, and some SELECT statements should
still work.
If Impala is running in strict mode via CatalogD flag
--iceberg_allow_datafiles_in_table_location_only, and an Iceberg table
has data files outside of table location, we still raise an exception
and leave the table in an unloaded state. To retain this behavior, the
IOException we threw is substituted to TableLoadingException which fits
better to logic errors anyway.
Testing
* added e2e tests
Change-Id: If753619d8ee1b30f018e90157ff7bdbe5d7f1525
Reviewed-on: http://gerrit.cloudera.org:8080/22367
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When UPDATEing an Iceberg or Kudu table, we should change as few rows
as possible. In case of Iceberg tables it means writing as few new
data records and delete records as possible.
Therefore, if rows already have the new values we should just ignore
them. One way to achieve this is to add extra predicates, e.g.:
UPDATE tbl SET k = 3 WHERE i > 4;
==>
UPDATE tbl SET k = 3 WHERE i > 4 AND k IS DISTINCT FROM 3;
So we won't write new data/delete records for the rows that already have
the desired value.
Explanation on how to create extra predicates to filter out these rows:
If there are multiple assignments in the SET list, we can only skip
updating a row if all the mentioned values are already equal.
If either of the values needs to be updated, the entire row does.
Therefore we can think of the SET list as predicates connected with AND
and all of them need to be taken into consideration.
To negate this SET list, we have to negate the individual SET
assignments and connect them with OR.
Then add this new compound predicate to the original where predicates
with an AND (if there were none, just create a WHERE predicate from it).
AND
/ \
original OR
WHERE predicate / \
!a OR
/ \
!b !c
This simple graph illustrates how the where predicate is rewritten.
(Considering an UPDATE statement that sets 3 columns.)
'!a', '!b' and '!c' are the negations of the individual assignments in
the SET list. So the extended WHERE predicate is:
(original WHERE predicate) AND (!a OR !b OR !c)
To handle NULL values correctly, we use IS DISTINCT FROM instead of
simply negating the assignment with operator '!='.
If the assignments contain UDFs, the result might be inconsistent
because of possible non-deterministic values or state in the UDFs,
therefore we should not rewrite the WHERE predicate at all.
Evaluating expressions can be expensive, therefore this optimization
can be limited or switched off entirely using the Query Option
SKIP_UNNEEDED_UPDATES_COL_LIMIT. By default, there is no filtering
if more than 10 assignments are in the SET list.
-------------------------------------------------------------------
Some performance measurements on a tpch lineitem table:
- predicates in HASH join, all updates can be skipped
Q1/[Q2] (Similar, but Q2 adds extra 4 items to the SET list):
update t set t.l_suppkey = s.l_suppkey,
[ t.l_partkey=s.l_partkey,
t.l_quantity=s.l_quantity,
t.l_returnflag=s.l_returnflag,
t.l_shipmode=s.l_shipmode ]
from ice_lineitem t join ice_lineitem s
on t.l_orderkey=s.l_orderkey and t.l_linenumber=s.l_linenumber;
- predicates in HASH join, all rows need to be updated
Q3: update t set
t.l_suppkey = s.l_suppkey,
t.l_partkey=s.l_partkey,
t.l_quantity=s.l_quantity,
t.l_returnflag=s.l_returnflag,
t.l_shipmode=concat(s.l_shipmode,' ')
from ice_lineitem t join ice_lineitem s
on t.l_orderkey=s.l_orderkey and t.l_linenumber=s.l_linenumber;
- predicates pushed down to the scanner, all rows updated
Q4/[Q5] (Similar, but Q5 adds extra 8 items to the SET ist):
update ice_lineitem set
[ l_suppkey = l_suppkey + 0,
l_partkey=l_partkey + 0,
l_quantity=l_quantity,
l_returnflag=l_returnflag,
l_tax = l_tax,
l_discount= l_discount,
l_comment = l_comment,
l_receiptdate = l_receiptdate, ]
l_shipmode=concat(l_shipmode,' ');
+=======+============+==========+======+
| Query | unfiltered | filtered | diff |
+=======+============+==========+======+
| Q1 | 4.1s | 1.9s | -54% |
+-------+------------+----------+------+
| Q2 | 4.2s | 2.1s | -50% |
+-------+------------+----------+------+
| Q3 | 4.3s | 4.7s | +10% |
+-------+------------+----------+------+
| Q4 | 3.0s | 3.0s | +0% |
+-------+------------+----------+------+
| Q5 | 3.1s | 3.1s | +0% |
+-------+------------+----------+------+
The results show that in the best case (we can skip all rows)
this change can cause significant perf improvement ~50%, since
0 rows were written. See Q1 and Q2.
If the predicates are evaluated in the join operator, but there were
no matches (worst case scenario) we can lose about 10%. (Q3)
If all the predicates can be pushed down to the scanners, the change
does not seem to cause significant difference (~0% in Q4 and Q5)
even if all rows have to be updated.
Testing:
- Analysis
- Planner
- E2E
- Kudu
- Iceberg
- testing the new query option: SKIP_UNNEEDED_UPDATES_COL_LIMIT
Change-Id: I926c80e8110de5a4615a3624a81a330f54317c8b
Reviewed-on: http://gerrit.cloudera.org:8080/22407
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Previously, error_msg_expected() only accepted error messages starting
with the following error prompt:
```
Query <query_id> failed:\n
```
However, for some tests using the Beeswax protocol, the error prompt may
appear in the middle of the error message instead of at its beginning.
Therefore, this patch adapts error_msg_expected() to accept error
messages not starting with the error prompt.
The error_msg_expected() function is renamed to error_msg_startswith()
to better describe its behavior.
Change-Id: Iac3e68bcc36776f7fd6cc9c838dd8da9c3ecf58b
Reviewed-on: http://gerrit.cloudera.org:8080/22468
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
When using a native UDF in the target value of an UPDATE statement or in
a filter predicate or target value of a MERGE statement, Impala crashes
with the following DCHECK:
be/src/exprs/expr.cc:47 47 DCHECK(cache_entry_ == nullptr);
This DCHECK is in the destructor of Expr, and it fires because Close()
has not been called for the expression. In the UPDATE case this is
caused by MultiTableSinkConfig: it creates child DataSinkConfig objects
but does not call Close() on them, and consequently these child sink
configs do not call Close() on their output expressions.
In the MERGE case it is because various expressions are not closed in
IcebergMergeCase and IcebergMergeNode.
This patch fixes the issue by overriding Close() in MultiTableSinkConfig,
calling Close() on the child sinks as well as closing the expressions in
IcebergMergeCase and IcebergMergeNode.
Testing:
- Added EE regression tests for the UPDATE and MERGE cases in
iceberg-update-basic.test and iceberg-merge.test
Change-Id: Id86638c8d6d86062c68cc9d708ec9c7b0a4e95eb
Reviewed-on: http://gerrit.cloudera.org:8080/22508
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds __reset_impala_clients() method in ImpalaConnection.
__reset_impala_clients() then simply clear configuration. It is called
on each setup_method() to ensure that each EE test uses clean test
client. All subclasses of ImpalaTestSuite that declare setup() method
are refactored to declare setup_method() instead, to match newer py.test
convention. Also implement teardown_method() to complement
setup_method(). See "Method and function level setup/teardown" at
https://docs.pytest.org/en/stable/how-to/xunit_setup.html.
CustomClusterTestSuite fully overrides setup_method() and
teardown_method() because it subclasses can be destructive. The custom
cluster test method often restart the whole Impala cluster, rendering
default impala clients initialized at setup_class() unusable. Each
subclass of CustomClusterTestSuite is responsible to ensure that impala
client they are using is in a good state.
This patch improve BeeswaxConnection and ImpylaHS2Connection to only
consider non-REMOVED options as its default options. They lookup for
valid (not REMOVED) query options with their own appropriate way,
memorized the option names as lowercase string and the values as string.
List values are wrapped with double quote. Log in
ImpalaConnection.set_configuration_option() is differentiated from how
SET query looks.
Note that ImpalaTestSuite.run_test_case() modify and restore query
option written at .test file by issuing SET query, not by calling
ImpalaConnection.set_configuration_option(). It is remain unchanged.
Consistently lower case query option everywhere in Impala test code
infrastructure. Fixed several tests that has been unknowingly override
'exec_option' vector dimension due to case sensitive mismatch. Also
fixed some flake8 issues.
Added convenience method execute_query_using_vector() and
create_impala_client_from_vector() in ImpalaTestSuite.
Testing:
- Pass core tests.
Change-Id: Ieb47fec9f384cb58b19fdbd10ff7aa0850ad6277
Reviewed-on: http://gerrit.cloudera.org:8080/22404
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Reviewed-by: Jason Fehr <jfehr@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Iceberg supports multiple writers with optimistic concurrency.
Each writer can write new files which are then added to the table
after a validation check to ensure that the commit does not conflict
with other modifications made during the execution.
When there was a conflicting change which could not be resolved, it
means that the newly written files cannot be committed to the table,
so they used to become orphan files on the file system. Orphan files
can accumulate over time, taking up a lot of storage space. They do
not belong to the table because they are not referenced by any snapshot
and therefore they can't be removed by expiring snapshots.
This change introduces automatic cleanup of uncommitted files
after an unsuccessful DML operation to prevent creating orphan files.
No cleanup is done if Iceberg throws CommitStateUnknownException
because the update success or failure is unknown in this case.
Testing:
- E2E test: Injected ValidationException with debug option.
- stress test: Added a method to check that no orphan files were
created after failed conflicting commits.
Change-Id: Ibe59546ebf3c639b75b53dfa1daba37cef50eb21
Reviewed-on: http://gerrit.cloudera.org:8080/22189
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change adds INSERT * and UPDATE SET * language elements for
WHEN NOT MATCHED and WHEN MATCHED clauses. INSERT * enumerates all
source expressions from source table/subquery and analyzes the clause
similarly to the regular WHEN NOT MATCHED THEN INSERT case. UPDATE SET
* creates assignments for each target table column by enumerating the
table columns and assigning source expressions by index.
If the target column count and the source expression count mismatches or
the types mismatches both clauses report analysis errors.
Tests:
- parser tests added
- analyzer tests added
- E2E tests added
Change-Id: I31cb771f2355ba4acb0f3b9f570ec44fdececdf3
Reviewed-on: http://gerrit.cloudera.org:8080/22051
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change adds support for a new MERGE clause that covers the
condition when the source statement's rows do not match the target
tables rows. Example: MERGE INTO target t using source s on t.id = s.id
WHEN NOT MATCHED BY SOURCE THEN UPDATE set t.column = "a";
This change also adds support to use WHEN NOT MATCHED BY TARGET
explicitly, this is equivalent to WHEN NOT MATCHED.
Tests:
- Parser tests for the new language elements.
- Analyzer and planner test for WHEN NOT MATCHED BY SOURCE/TARGET
clauses.
- E2E tests for WHEN NOT MATCHED BY SOURCE clause.
Change-Id: Ia0e0607682a616ef6ad9eccf499dc0c5c9278c5f
Reviewed-on: http://gerrit.cloudera.org:8080/21988
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change adds support for reading NDV statistics from Puffin files
when they are available for the current snapshot. Puffin files or blobs
that were written for other snapshots than the current one are ignored.
Because this behaviour is different from what we have for HMS stats and
may therefore be unintuitive for users, reading Puffin stats is disabled
by default; set the "--disable_reading_puffin_stats" startup flag to
false to enable it.
When Puffin stats reading is enabled, the NDV values read from Puffin
files take precedence over NDV values stored in the HMS. This is because
we only read Puffin stats for the current snapshot, so these values are
always up-to-date, while the values in the HMS may be stale.
Note that it is currently not possible to drop Puffin stats from Impala.
For this reason, this patch also introduces two ways of disabling the
reading of Puffin stats:
- globally, with the aforementioned "--disable_reading_puffin_stats"
startup flag: when it is set to true, Impala will never read Puffin
stats
- for specific tables, by setting the
"impala.iceberg_disable_reading_puffin_stats" table property to
true.
Note that this change is only about reading Puffin files, Impala does
not yet support writing them.
Testing:
- created the PuffinDataGenerator tool which can generate Puffin files
and metadata.json files for different scenarios (e.g. all stats are
in the same Puffin file; stats for different columns are in different
Puffin files; some Puffin files are corrupt etc.). The generated
files are under the "testdata/ice_puffin/generated" directory.
- The new custom cluster test class
'test_iceberg_with_puffin.py::TestIcebergTableWithPuffinStats' uses
the generated data to test various scenarios.
- Added custom cluster tests that test the
'disable_reading_puffin_stats' startup flag.
Change-Id: I50c1228988960a686d08a9b2942e01e366678866
Reviewed-on: http://gerrit.cloudera.org:8080/21605
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Schema is case insensitive in Impala. Via Spark it's possible to create
schema elements with upper/lower case letters and store them in the
metadata JSON files of Iceberg, e.g.:
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "ID",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "OWNERID",
"required" : false,
"type" : "string"
} ]
} ],
This can cause problems in Impala during predicate pushdown, as we can
get a ValidationException from the Iceberg library (as Impala pushes
down predicates with lower case column names, while Iceberg sees upper
case names).
With this patch Impala invokes Scan.caseSensitive(boolean caseSensitive)
on the TableScan object to set case insensitivity.
Testing:
* added e2e test
Change-Id: Iedaf152d8a0c02a124c3dcf8acb59b4ba4e81cf4
Reviewed-on: http://gerrit.cloudera.org:8080/21950
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
test_read_position_deletes modifies a table (COMPUTE STATS / DROP STATS)
during its execution. So when the test executes in parallel we can see
inconsistent results.
This change moves the COMPUTE/DROP STATS operations to a separate test
that is executed serially. Also adds SHOW COLUMN STATS which was not
tested before.
Change-Id: Iee8cdac9c631a17436250dc60528a8545e668e2d
Reviewed-on: http://gerrit.cloudera.org:8080/21906
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Schema resolution doesn't work correctly for migrated partitioned
Iceberg tables that have complex types. When we face a Parquet/ORC file
in an Iceberg table that doesn't have field IDs in the file metadata, we
assume that it is an old data file before migration, and the schema is
the very first one, hence we can mimic Iceberg's field ID generation to
assign field IDs to the file schema elements.
This process didn't take the partition columns into account. Partition
columns are not part of the data file but they still get field IDs. This
only matters when there are complex types in the table, as partition
columns are always the last columns in legacy Hive tables, and field IDs
are assigned via a "BFS-like" traversal. I.e. if there are only primitive
types in the table we don't have any problems, but the children of
complex types columns are assigned incorrectly.
This patch fixes field ID generation by taking the number of partitions
into account. If none of the partition columns are included in the data
file (common case) we adjust the file-level field IDs accordingly. It is
also OK to have all the partition columns in the data files (it is not
common, but we've seen such data files). We raise an error in other
cases (some partition columns are in the data file, while others
aren't).
Testing:
* e2e tests added
* added negative tests
Change-Id: Ie32952021b63d6b55b8820489e434bfc2a91580b
Reviewed-on: http://gerrit.cloudera.org:8080/21761
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
MERGE statement is a DML command that allows users to perform
conditional insert, update, or delete operations on a target table based
on the results of a join with a source table. This change adds MERGE
statement parsing and an Iceberg-specific semantic analysis, planning,
and execution. The parsing grammar follows the SQL standard, it accepts
the same syntax as Hive, Spark, and Trino by supporting arbitrary number
of WHEN clauses, with conditions or without and accepting inline views
as source.
Example:
'MERGE INTO target t USING source s ON t.id = s.id
WHEN MATCHED AND t.id < 100 THEN UPDATE SET column1 = s.column1
WHEN MATCHED AND t.id > 100 THEN DELETE
WHEN MATCHED THEN UPDATE SET column1 = "value"
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.column1);'
The Iceberg-specific analysis, planning, and execution are based on a
concept that was previously used for UPDATE: The analyzer creates a
SELECT statement with all target and source columns (including
Iceberg's virtual columns) and a 'row_present' column that defines
whether the source, the target, or both rows are present in the result
set after joining the two table references by the ON clause. The join
condition should be an equi-join, as it is a FULL OUTER JOIN, and Impala
currently supports only equi-joins in this case. The joining order is
forced by a query hint, this guarantees that the target table is always
on the left side.
A new, IcebergMergeNode is added at planning phase, this node does the
row-level filtering for each MATCHED/ NOT MATCHED cases. The
'row_present' column decides which case group will be evaluated; if
both sides are available, the matched cases, if only the source side
matches then the not matched cases and their filter expressions
will be evaluated over the row. If one of the cases match, then the
execution evaluates the result expressions into the output row batch,
and an auxiliary tuple will store the merge action. The merge action is
a flag for the newly added IcebergMergeSink; this sink will route each
incoming row from IcebergMergeNode to their respective destination. Each
row could go to the delete sink, insert sink, or to both sinks.
Target-side duplicate records are filtered during IcebergMergeNode's
execution, if one target table-side duplicate is detected, the whole
statement's execution is stopped and the error is reported back to the
user.
Added tests:
- Parser tests
- Analyzer tests
- Unit test for WHEN NOT MATCHED INSERT column collation
- Planner tests for partitioned/sorted cases
- Authorization tests
- E2E tests
Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268
Reviewed-on: http://gerrit.cloudera.org:8080/21423
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The OPTIMIZE TABLE statement is currently used to rewrite the entire
Iceberg table. With the 'FILE_SIZE_THRESHOLD_MB' option, the user can
specify a file size limit to rewrite only small files.
Syntax: OPTIMIZE TABLE <table_name> [(FILE_SIZE_THRESHOLD_MB=<value>)];
The value of the threshold is the file size in MBs. It must be a
non-negative integer. Data files larger than the given limit will only
be rewritten if they are referenced from delete files.
If only 1 file is selected in a partition, it will not be rewritten.
If the threshold is 0, only the delete files and the referenced data
files will be rewritten.
IMPALA-12839: 'Optimizing empty table should be no-op' is also
resolved in this patch.
With the file selection option, the OPTIMIZE operation can operate
in 3 different modes:
- REWRITE_ALL: The entire table is rewritten. Either because the
compaction was triggered by a simple 'OPTIMIZE TABLE' command
without a specified 'FILE_SIZE_THRESHOLD_MB' parameter, or
because all files of the table are deletes/referenced by deletes
or are smaller than the limit.
- PARTIAL: If the value of 'FILE_SIZE_THRESHOLD_MB' parameter is
specified then only the small data files without deletes are selected
and the delete files are merged. Large data files without deletes
are kept to avoid unnecessary resource consuming writes.
- NOOP: When no files qualify for the selection criteria, there is
no need to rewrite any files. This is a no-operation.
Testing:
- Parser test
- FE unit tests
- E2E tests
Change-Id: Icfbb589513aacdb68a86c1aec4a0d39b12091820
Reviewed-on: http://gerrit.cloudera.org:8080/21388
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds the query id to the error messages in both
- the result of the `get_log()` RPC, and
- the error message in an RPC response
before they are returned to the client, so that the users can easily
figure out the errored queries on the client side.
To achieve this, the query id of the thread debug info is set in the
RPC handler method, and is retrieved from the thread debug info each
time the error reporting function or `get_log()` gets called.
Due to the change of the error message format, some checks in the
impala-shell.py are adapted to keep them valid.
Testing:
- Added helper function `error_msg_expected()` to check whether an
error message is expected. It is stricter than only using the `in`
operator.
- Added helper function `error_msg_equal()` to check if two error
messages are equal regardless of the query ids.
- Various test cases are adapted to match the new error message format.
- `ImpalaBeeswaxException`, which is used in tests only, is simplified
so that it has the same error message format as the exceptions for
HS2.
- Added an assertion to the case of killing and restarting a worker
in the custom cluster test to ensure that the query id is in
the error message in the client log retrieved with `get_log()`.
Change-Id: I67e659681e36162cad1d9684189106f8eedbf092
Reviewed-on: http://gerrit.cloudera.org:8080/21587
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
If the Iceberg table has Avro delete files (e.g. by setting
'write.delete.format.default'='avro') then Impala won't be able to read
the contents of the delete files properly. It is because the avro
schema is not set properly for the virtual delete table.
Testing:
* added e2e tests with position delete files of all kinds
Change-Id: Iff13198991caf32c51cd9e0ace4454fd00216cf6
Reviewed-on: http://gerrit.cloudera.org:8080/21301
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
When multiple coordinators attempt to create the same table concurrently
with "if not exists", we still see
AlreadyExistsException: Table was created concurrently: my_iceberg_tbl
Iceberg throws its own version of AlreadyExistsException, but we avoid
most code paths that would throw it because we first check HMS to see if
the table exists before trying to create it.
Updates createIcebergTable to handle Iceberg's AlreadyExistsException
identically to the HMS AlreadyExistsException.
Adds a test using DebugAction to simulate concurrent table creation.
Change-Id: I847eea9297c9ee0d8e821fe1c87ea03d22f1d96e
Reviewed-on: http://gerrit.cloudera.org:8080/21312
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Now that we have the DIRECTED distribution mode, some parts of
IcebergDeleteNode and IcebergDeleteBuilder became dead code. It is
time to simplify the above classes.
IcebergDeleteBuilder and KrpcDataStreamSender now also tolerate
NULL file paths which are also not an error in the hash join mode.
Change-Id: I3ba02b33433990950b49628f11e732e01ed8a34d
Reviewed-on: http://gerrit.cloudera.org:8080/21258
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This CR is the first step to upgrade to Iceberg 1.4.3. The biggest
change in behavior in Iceberg 1.4.3 is that Iceberg V2 tables are
the default. Because of this we update some test files to
explicitly create V1/V2 tables. We also introduce test files that
create Iceberg tables without explicitly specifying the format
version, these tests have the name *-default.test. The latter tests
will need to be updated when we actually upgrade to Iceberg 1.4.3.
Change-Id: Ieb4f6c1b206d1d4fd878f07ea5f1436dcae560cd
Reviewed-on: http://gerrit.cloudera.org:8080/21167
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Andrew Sherman <asherman@cloudera.com>
This is a part 1 change that turns off the count(*) optimisations for
V2 tables as there is a correctness issue with it. The reason is that
Spark compaction may leave some dangling delete files that mess up
the logic in Impala.
Change-Id: Ida9fb04fd076c987b6b5257ad801bf30f5900237
Reviewed-on: http://gerrit.cloudera.org:8080/21139
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds support for reading Iceberg tables that have
different equality field ID lists associated to different equality
delete files. In practice this is a use case when one equality delete
file deletes by e.g. columnA and columnB while another one deletes by
columnB and columnC.
In order to achieve such functionality the plan tree creation needed
some adjustments so that it can create separate LEFT ANTI JOIN nodes
for the different equality field ID lists.
Testing:
- Flink and NiFi was used for creating some test tables with the
desired equality field IDs. Coverage on these tables are added to
the test suite.
Change-Id: I3e52d7a5800bf1b479f0c234679be92442d09f79
Reviewed-on: http://gerrit.cloudera.org:8080/20951
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The OPTIMIZE statement is used to execute table maintenance tasks
on Iceberg tables, such as:
1. compacting small files,
2. merging delete deltas,
3. rewriting the table according to the latest schema
and partition spec.
OptimizeStmt used to serve as an alias for INSERT OVERWRITE.
After this change it works as follows: It creates a source statement
that contains all columns of the table. All table content will be
rewritten to new data files. After the executors finished writing,
the Catalog calls RewriteFiles Iceberg API to commit the changes.
All previous data and delete files will be excluded from,
and all newly written data files will be added to the next
snapshot. The old files remain accessible via time travel
to older snapshots of the table.
By default, Impala has as many file writers as query fragment instances
and therefore can write too many files for unpartitioned tables.
For smaller tables this can be limited by setting the
MAX_FS_WRITERS Query Option.
Authorization: OPTIMIZE TABLE requires ALL privileges.
Limitations:
All limitations about writing Iceberg tables apply.
Testing:
- E2E tests:
- schema evolution
- partition evolution
- UPDATE/DELETE
- time travel
- table history
- negative tests
- Ranger tests for authorization
- FE: Planner test:
- sorting order
- MAX_FS_WRITERS
- partitioned exchange
- Parser test
Change-Id: I65a0c8529d274afff38ccd582f1b8a857716b1b5
Reviewed-on: http://gerrit.cloudera.org:8080/20866
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Including the snapshot id of Iceberg tables for the Iceberg SCAN
operators can be useful to enable replayable queries. Replayable
queries are useful, so we can better investigate performance
problems / bugs.
Testing:
- Updated planner tests, added e2e test for time travel
Change-Id: Iee0b4967429ea733729ad8e44df32e3b24b88525
Reviewed-on: http://gerrit.cloudera.org:8080/20204
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-12742 fixed a bug related to DATE-partitioned Iceberg tables
and it also added several tests. Though it did not add interop tests
between Hive and Impala. This CR fills this gap.
Change-Id: I38e6626d388be3b400e2276de4bf929f673beffb
Reviewed-on: http://gerrit.cloudera.org:8080/20976
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
During remote read scheduling Impala does the following:
Non-Iceberg tables
* The scheduler processes the scan ranges in partition key order
* The scheduler selects N executors as candidates
* The scheduler chooses the executor from the candidates based on
minimum number of assigned bytes
* So consecutive partitions are more likely to be assigned to
different executors
Iceberg tables
* The scheduler processes the scan ranges in random order
* The scheduler selects N executors as candidates
* The scheduler chooses the executor from the candidates based on
minimum number of assigned bytes
* So consecutive partitions (by partition key order) are assigned
randomly, i.e. there's a higher chance of clustering
With this patch, IcebergScanNode orders its file descriptors based on
their paths, so we will have a more balanced scheduling for consecutive
partitions. It is especially important for queries that prune partitions
via runtime filters (e.g. due to a JOIN), because it doesn't matter that
we schedule the scan ranges evenly, the scan ranges that survive the
runtime filters can still be clustered on certain executors.
E.g. TPC-DS Q22 has the following JOIN and WHERE predicates:
inv_date_sk=d_date_sk and
d_month_seq between 1199 and 1199 + 11
The Inventory table is partitioned by column inv_date_sk, and we filter
the rows in the joined table by 'd_month_seq between 1199 and
1199 + 11'. This means that we will only need a range of partitions from
the Inventory table, but that range will only be revealed during
runtime. Scheduling neighbouring partitions to different executors means
that the surviving partitions are spread across executors more evenly.
Testing:
* e2e test
Change-Id: I60773965ecbb4d8e659db158f1f0ac76086d5578
Reviewed-on: http://gerrit.cloudera.org:8080/20973
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change sets the default value to 'true' for
'iceberg_restrict_data_file_location' and changes the flag name to
'iceberg_allow_datafiles_in_table_location_only'. Tests related to
multiple storage locations in Iceberg tables are moved out to custom
cluster tests. During test data loading, the flag is set to 'false'
to make the creation of 'iceberg_multiple_storage_locations' table
possible.
Change-Id: Ifec84c86132a8a44d7e161006dcf51be2e7c7e57
Reviewed-on: http://gerrit.cloudera.org:8080/20874
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The current implementation of UPDATE creates the delete file(s) and the
new data file(s) for the updated row(s). These files are committed in
one Iceberg transaction, but the transaction adds two snapshots to the
table. The first contains the delete file(s), the second adds the new
data file(s) of the updated row(s). Only the final snapshot (which
holds the consistent table state) is observable by concurrent readers,
but still, the commit history can look strange with these "phantom
snapshots".
So instead of doing a RowDelta and AppendFiles operation in a single
transaction, with this change we are doing a single RowDelta operation
only.
Another issue was that we also committed empty operations (e.g. UPDATEs
with zero records). These created redundant snapshots in the table
history. This patch also fixes that.
Testing:
* added e2e test that checks the table history
Change-Id: I2ceb80b939c644388707b21061bf55451234dcd3
Reviewed-on: http://gerrit.cloudera.org:8080/20903
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>