mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
master
1513 Commits
| Author | SHA1 | Message | Date | |
|---|---|---|---|---|
|
|
9d112dae23 |
IMPALA-14536: Fix CONVERT TO ICEBERG to not throw exception on Iceberg tables
Previously, running ALTER TABLE <table> CONVERT TO ICEBERG on an Iceberg table produced an error. This patch fixes that, so the statement will do nothing when called on an Iceberg table and return with 'Table has already been migrated.' message. This is achieved by adding a new flag to StatementBase to signal when a statement ends up NO_OP, if that's true, the new TStmtType::NO_OP will be set as TExecRequest's type and noop_result can be used to set result from Frontend-side. Tests: * extended fe and e2e tests Change-Id: I41ecbfd350d38e4e3fd7b813a4fc27211d828f73 Reviewed-on: http://gerrit.cloudera.org:8080/23699 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Peter Rozsa <prozsa@cloudera.com> |
||
|
|
ddd82e02b9 |
IMPALA-14065: Support WHERE clause in SHOW PARTITIONS statement
This patch extends the SHOW PARTITIONS statement to allow an optional WHERE clause that filters partitions based on partition column values. The implementation adds support for various comparison operators, IN lists, BETWEEN clauses, IS NULL, and logical AND/OR expressions involving partition columns. Non-partition columns, subqueries, and analytic expressions in the WHERE clause are not allowed and will result in an analysis error. New analyzer tests have been added to AnalyzeDDLTest#TestShowPartitions to verify correct parsing, semantic validation, and error handling for supported and unsupported cases. Testing: - Added new unit tests in AnalyzeDDLTest for valid and invalid WHERE clause cases. - Verified functional tests covering partition filtering behavior. Change-Id: I2e2a14aabcea3fb17083d4ad6f87b7861113f89e Reviewed-on: http://gerrit.cloudera.org:8080/23566 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
1684c2d9da |
IMPALA-14131: Add flag to configure the default value of
'impala.disableHmsSync' FEATURE: Implement global 'disable_hms_sync_by_default' flag for event processing. This change introduces a new catalogd startup flag, `disable_hms_sync_by_default`, to simplify skipping/processing events. Problem: Disabling event processing globally requires tedious process of setting 'impala.disableHmsSync' property on every database and table, especially if few specific tables requires sync up of events. Solution: The new flag provides a global default for the 'impala.disableHmsSync' property. Behavior: - If `disable_hms_sync_by_default` is true (the intended default-off state), event processing is skipped for all tables/databases unless the property "impala.disableHmsSync"="false" is explicitly set. - This allows users to easily keep event processing off by default and opt-in specific databases or tables to start syncing. - The check order is: table-property > db-property > global default. - HMS polling remains independent and unaffected by this flag. Change-Id: I4ee617aed48575502d9cf5cf2cbea6ec897d6839 Reviewed-on: http://gerrit.cloudera.org:8080/23487 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
7e29ac23da |
IMPALA-14092 Part2: Support querying of paimon data table via JNI
This patch mainly implement the querying of paimon data table
through JNI based scanner.
Features implemented:
- support column pruning.
The partition pruning and predicate push down will be submitted
as the third part of the patch.
We implemented this by treating the paimon table as normal
unpartitioned table. When querying paimon table:
- PaimonScanNode will decide paimon splits need to be scanned,
and then transfer splits to BE do the jni-based scan operation.
- We also collect the required columns that need to be scanned,
and pass the columns to Scanner for column pruning. This is
implemented by passing the field ids of the columns to BE,
instead of column position to support schema evolution.
- In the original implementation, PaimonJniScanner will directly
pass paimon row object to BE, and call corresponding paimon row
field accessor, which is a java method to convert row fields to
impala row batch tuples. We find it is slow due to overhead of
JVM method calling.
To minimize the overhead, we refashioned the implementation,
the PaimonJniScanner will convert the paimon row batches to
arrow recordbatch, which stores data in offheap region of
impala JVM. And PaimonJniScanner will pass the arrow offheap
record batch memory pointer to the BE backend.
BE PaimonJniScanNode will directly read data from JVM offheap
region, and convert the arrow record batch to impala row batch.
The benchmark shows the later implementation is 2.x better
than the original implementation.
The lifecycle of arrow row batch is mainly like this:
the arrow row batch is generated in FE,and passed to BE.
After the record batch is imported to BE successfully,
BE will be in charge of freeing the row batch.
There are two free paths: the normal path, and the
exception path. For the normal path, when the arrow batch
is totally consumed by BE, BE will call jni to fetch the next arrow
batch. For this case, the arrow batch is freed automatically.
For the exceptional path, it happends when query is cancelled, or memory
failed to allocate. For these corner cases, arrow batch is freed in the
method close if it is not totally consumed by BE.
Current supported impala data types for query includes:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE
TODO:
- Patches pending submission:
- Support tpcds/tpch data-loading
for paimon data table.
- Virtual Column query support for querying
paimon data table.
- Query support with time travel.
- Query support for paimon meta tables.
- WIP:
- Snapshot incremental read.
- Complex type query support.
- Native paimon table scanner, instead of
jni based.
Testing:
- Create tests table in functional_schema_template.sql
- Add TestPaimonScannerWithLimit in test_scanners.py
- Add test_paimon_query in test_paimon.py.
- Already passed the tpcds/tpch test for paimon table, due to the
testing table data is currently generated by spark, and it is
not supported by impala now, we have to do this since hive
doesn't support generating paimon table for dynamic-partitioned
tables. we plan to submit a separate patch for tpcds/tpch data
loading and associated tpcds/tpch query tests.
- JVM Offheap memory leak tests, have run looped tpch tests for
1 day, no obvious offheap memory increase is observed,
offheap memory usage is within 10M.
Change-Id: Ie679a89a8cc21d52b583422336b9f747bdf37384
Reviewed-on: http://gerrit.cloudera.org:8080/23613
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
|
||
|
|
5d1f1e0180 |
IMPALA-14183: Rename the environment variable USE_APACHE_HIVE to USE_APACHE_HIVE_3
When the environment variable USE_APACHE_HIVE is set to true, build Impala for adapting to Apache Hive 3.x. In order to better distinguish it from Apache Hive 2.x later, rename USE_APACHE_HIVE to USE_APACHE_HIVE_3. Additionally, to facilitate referencing different versions of the Hive MetastoreShim, the major version of Hive has been added to the environment variable IMPALA_HIVE_DIST_TYPE. Change-Id: I11b5fe1604b6fc34469fb357c98784b7ad88574d Reviewed-on: http://gerrit.cloudera.org:8080/21724 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
fdad9d3204 |
IMPALA-13725: Add Iceberg table repair functionalities
In some cases users delete files directly from storage without going through the Iceberg API, e.g. they remove old partitions. This corrupts the table, and makes queries that try to read the missing files fail. This change introduces a repair statement that deletes the dangling references of missing files from the metadata. Note that the table cannot be repaired if there are missing delete files because Iceberg's DeleteFiles API which is used to execute the operation allows removing only data files. Testing: - E2E - HDFS - S3, Ozone - analysis Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e Reviewed-on: http://gerrit.cloudera.org:8080/23512 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
a2a11dec62 |
IMPALA-13263: Add single-argument overload for ST_ConvexHull()
Implemented a single-argument version of ST_ConvexHull() to align with PostGIS behavior and simplify usage across geometry types. Testing: Added new tests in test_geospatial_functions.py for ST_ConvexHull(), which previously had no test coverage, to verify correctness across supported geometry types. Change-Id: Idb17d98f5e75929ec0143aa16195a84dd6e50796 Reviewed-on: http://gerrit.cloudera.org:8080/23604 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> |
||
|
|
068158e495 |
IMPALA-12401: Support more info types for HS2 GetInfo() API
This patch adds support for 40+ additional TGetInfoType values in the HiveServer2 GetInfo() API, improving ODBC/JDBC driver compatibility. Previously, only 3 info types were supported (CLI_SERVER_NAME, CLI_DBMS_NAME, CLI_DBMS_VER). The implementation follows the ODBC CLI specification and matches the behavior of Hive's GetInfo implementation where applicable. Testing: - Added unit tests in test_hs2.py for new info types - Tests verify correct return values and data types for each info type Change-Id: I1ce5f2b9dcc2e4633b4679b002f57b5b4ea3e8bf Reviewed-on: http://gerrit.cloudera.org:8080/23528 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> |
||
|
|
087b715a2b |
IMPALA-14108: Add support for SHOW FILES IN table PARTITION for Iceberg
tables This patch implements partition filtering support for the SHOW FILES statement on Iceberg tables, based on the functionality added in IMPALA-12243. Prior to this change, the syntax resulted in a NullPointerException. Key changes: - Added ShowFilesStmt.analyzeIceberg() to validate and transform partition expressions using IcebergPartitionExpressionRewriter and IcebergPartitionPredicateConverter. After that, it collects matching file paths using IcebergUtil.planFiles(). - Added FeIcebergTable.Utils.getIcebergTableFilesFromPaths() to accept pre-filtered file lists from the analysis phase. - Enhanced TShowFilesParams thrift struct with optional selected_files field to pass pre-filtered file paths from frontend to backend. Testing: - Analyzer tests for negative cases: non-existent partitions, invalid expressions, non-partition columns, unsupported transforms. - Analyzer tests for positive cases: all transform types, complex expressions. - Authorization tests for non-filtered and filtered syntaxes. - E2E tests covering every partition transform type with various predicates. - Schema evolution and rollback scenarios. The implementation follows AlterTableDropPartition's pattern where the analysis phase performs validation/metadata retrieval and the execution phase handles result formatting and display. Change-Id: Ibb9913e078e6842861bdbb004ed5d67286bd3152 Reviewed-on: http://gerrit.cloudera.org:8080/23455 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
760eb4f2fa |
IMPALA-13066: Extend SHOW CREATE TABLE to include stats and partitions
Adds a new WITH STATS option to the SHOW CREATE TABLE statement to emit additional SQL statements for recreating table statistics and partitions. When specified, Impala outputs: - Base CREATE TABLE statement. - ALTER TABLE ... SET TBLPROPERTIES for table-level stats. - ALTER TABLE ... SET COLUMN STATS for all non-partition columns, restoring column stats. - For partitioned tables: - ALTER TABLE ... ADD PARTITION statements to recreate partitions. - Per-partition ALTER TABLE ... PARTITION (...) SET TBLPROPERTIES to restore partition-level stats. Partition output is limited by the PARTITION_LIMIT query option (default 1000). Setting PARTITION_LIMIT=0 includes all partitions and emits a warning if the limit is exceeded. Tests added to verify correctness of emitted statements. Default behavior of SHOW CREATE TABLE remains unchanged for compatibility. Change-Id: I87950ae9d9bb73cb2a435cf5bcad076df1570dc2 Reviewed-on: http://gerrit.cloudera.org:8080/23536 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
541fb3f405 |
IMPALA-14092 Part1: Prohibit Unsupported Operation for paimon table
This patch is to prohibit un-supported operation against
paimon table. All unsupported operations are added the
checked in the analyze stage in order to avoid
mis-operation. Currently only CREATE/DROP statement
is supported, the prohibition will be removed later
after the corresponding operation is truly supported.
TODO:
- Patches pending submission:
- Support jni based query for paimon data table.
- Support tpcds/tpch data-loading
for paimon data table.
- Virtual Column query support for querying
paimon data table.
- Query support with time travel.
- Query support for paimon meta tables.
Testing:
- Add unit test for AnalyzeDDLTest.java.
- Add unit test for AnalyzerTest.java.
- Add test_paimon_negative and test_paimon_query in test_paimon.py.
Change-Id: Ie39fa4836cb1be1b1a53aa62d5c02d7ec8fdc9d7
Reviewed-on: http://gerrit.cloudera.org:8080/23530
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
|
||
|
|
1913ab46ed |
IMPALA-14501: Migrate most scripts from impala-python to impala-python3
To remove the dependency on Python 2, existing scripts need to use python3 rather than python. These commands find those locations (for impala-python and regular python): git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python git grep bin/python | grep -v python3 This removes or switches most of these locations by various means: 1. If a python file has a #!/bin/env impala-python (or python) but doesn't have a main function, it removes the hash-bang and makes sure that the file is not executable. 2. Most scripts can simply switch from impala-python to impala-python3 (or python to python3) with minimal changes. 3. The cm-api pypi package (which doesn't support Python 3) has been replaced by the cm-client pypi package and interfaces have changed. Rather than migrating the code (which hasn't been used in years), this deletes the old code and stops installing cm-api into the virtualenv. The code can be restored and revamped if there is any interest in interacting with CM clusters. 4. This switches tests/comparison over to impala-python3, but this code has bit-rotted. Some pieces can be run manually, but it can't be fully verified with Python 3. It shouldn't hold back the migration on its own. 5. This also replaces locations of impala-python in comments / documentation / READMEs. 6. kazoo (used for interacting with HBase) needed to be upgraded to a version that supports Python 3. The newest version of kazoo requires upgrades of other component versions, so this uses kazoo 2.8.0 to avoid needing other upgrades. The two remaining uses of impala-python are: - bin/cmake_aux/create_virtualenv.sh - bin/impala-env-versioned-python These will be removed separately when we drop Python 2 support completely. In particular, these are useful for testing impala-shell with Python 2 until we stop supporting Python 2 for impala-shell. The docker-based tests still use /usr/bin/python, but this can be switched over independently (and doesn't impact impala-python) Testing: - Ran core job - Ran build + dataload on Centos 7, Redhat 8 - Manual testing of individual scripts (except some bitrotted areas like the random query generator) Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc Reviewed-on: http://gerrit.cloudera.org:8080/23468 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Michael Smith <michael.smith@cloudera.com> |
||
|
|
f0a781806f |
IMPALA-14494: Tag catalogd logs of GetPartialCatalogObject requests with correct query ids
Catalogd logs of GetPartialCatalogObject requests are not tagged with correct query ids. Instead, the query id that is previously using that thread is printed in the logs. This is fixed by using ScopedThreadContext which resets the query id at the end of the RPC code. Add DCHECKs to make sure ThreadDebugInfo is initialized before being used in Catalog methods. An instance is added in CatalogdMain() for this. This patch also adds the query id in GetPartialCatalogObject requests so catalogd can tag the responding thread with it. Some codes are copied from Michael Smith's patch: https://gerrit.cloudera.org/c/22738/ Tested by enabling TRACE logging in org.apache.impala.common.JniUtil to verify logs of GetPartialCatalogObject requests. I20251014 09:39:39.685225 342587 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of CATALOG_SERVICE_ID I20251014 09:39:39.690346 342587 JniUtil.java:176] 964e37e9303d6f8a:eab7096000000000] Finished getPartialCatalogObject request: Getting partial catalog object of CATALOG_SERVICE_ID. Time spent: 5ms I20251014 09:39:39.699471 342587 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of DATABASE:functional I20251014 09:39:39.701821 342587 JniUtil.java:176] 964e37e9303d6f8a:eab7096000000000] Finished getPartialCatalogObject request: Getting partial catalog object of DATABASE:functional. Time spent: 2ms I20251014 09:39:39.711462 341074 TAcceptQueueServer.cpp:368] New connection to server CatalogService from client <Host: 127.0.0.1 Port: 42084> I20251014 09:39:39.719146 342588 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of TABLE:functional.alltypestiny Change-Id: Ie63363ac60e153e3a69f2a4cf6a0f4ce10701674 Reviewed-on: http://gerrit.cloudera.org:8080/23535 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
1008decc07 |
IMPALA-14447: Parallelize table loading in getMissingTables()
StmtMetadataLoader.getMissingTables() load missing tables in serial manner. In local catalog mode, large number of serial table loading can incur significant round trip latency to CatalogD. This patch parallelize the table loading by using executor service to lookup and gather all non-null FeTables from given TableName set. Modify LocalCatalog.loadDbs() and LocalDb.loadTableNames() slightly to make it thread-safe. Change FrontendProfile.Scope to support nested scope referencing the same FrontendProfile instance. Added new flag max_stmt_metadata_loader_threads to control the maximum number of threads to use for loading table metadata during query compilation. It is deafult to 8 threads per query compilation. If there is only one table to load, max_stmt_metadata_loader_threads set to 1, or RejectedExecutionException raised, fallback to load table serially. Testing: Run and pass few tests such as test_catalogd_ha.py, test_concurrent_ddls.py, and test_observability.py. Add FE tests CatalogdMetaProviderTest.testProfileParallelLoad. Manually run following query and observe parallel loading by setting TRACE level log in CatalogdMetaProvider.java. use functional; select count(*) from alltypesnopart union select count(*) from alltypessmall union select count(*) from alltypestiny union select count(*) from alltypesagg; Change-Id: I97a5165844ae846b28338d62e93a20121488d79f Reviewed-on: http://gerrit.cloudera.org:8080/23436 Reviewed-by: Quanlong Huang <huangquanlong@gmail.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
a77fec6391 |
IMPALA-13661: Support parallelism above JDBC tables for joins/aggregates
Impala's planner generates a single-fragment, single-
threaded scan node for queries on JDBC tables because table
statistics are not properly available from the external
JDBC source. As a result, even large JDBC tables are
executed serially, causing suboptimal performance for joins,
aggregations, and scans over millions of rows.
This patch enables Impala to estimate the number of rows in a JDBC
table by issuing a COUNT(*) query at query preparation time. The
estimation is returned via TPrepareResult.setNum_rows_estimate()
and propagated into DataSourceScanNode. The scan node then uses
this cardinality to drive planner heuristics such as join order,
fragment parallelization, and scanner thread selection.
The design leverages the existing JDBC accessor layer:
- JdbcDataSource.prepare() constructs the configuration and invokes
GenericJdbcDatabaseAccessor.getTotalNumberOfRecords().
- The accessor wraps the underlying query in:
SELECT COUNT(*) FROM (<query>) tmptable
ensuring correctness for both direct table scans and parameterized
query strings.
- The result is captured as num_rows_estimate, which is then applied
during computeStats() in DataSourceScanNode.
With accurate (or approximate) row counts, the planner can now:
- Assign multiple scanner threads to JDBC scan nodes instead of
falling back to a single-thread plan.
- Introduce exchange nodes where beneficial, parallelizing data
fetches across multiple JDBC connections.
- Produce better join orders by comparing JDBC row cardinalities
against native Impala tables.
- Avoid severe underestimation that previously defaulted to wrong
table statistics, leading to degenerate plans.
For a sample join query mentioned in the test file,
these are the improvements:
Before Optimization:
- Cardinality fixed at 1 for all JDBC scans
- Single fragment, single thread per query
- Max per-host resource reservation: ~9.7 MB, 1 thread
- No EXCHANGE or MERGING EXCHANGE operators
- No broadcast distribution; joins executed serially
- Example query runtime: ~77s
SCAN JDBC A
\
HASH JOIN
\
SCAN JDBC B
\
HASH JOIN
\
SCAN JDBC C
\
TOP-N -> ROOT
After Optimization:
- Cardinality derived from COUNT(*) (e.g. 150K, 1.5M rows)
- Multiple fragments per scan, 7 threads per query
- Max per-host resource reservation: ~123 MB, 7 threads
- Plans include EXCHANGE and MERGING EXCHANGE operators
- Broadcast joins on small sides, improving parallelism
- Example query runtime: ~38s (~2x faster)
SCAN JDBC A --> EXCHANGE(SND) --+
\
EXCHANGE(RCV) -> HASH JOIN(BCAST) --+
SCAN JDBC B --> EXCHANGE(SND) ----/ \
HASH JOIN(BCAST) --+
SCAN JDBC C --> EXCHANGE(SND) ------------------------------------------/ \
TOP-N
\
MERGING EXCHANGE -> ROOT
Also added a new backend configuration flag
--min_jdbc_scan_cardinality (default: 10) to provide a
lower bound for scan node cardinality estimates
during planning. This flag is propagated from BE
to FE via TBackendGflags and surfaced through
BackendConfig, ensuring the planner never produces
unrealistically low cardinality values.
TODO: Add a query option for this optimization
to avoid extra JDBC round trip for smaller
queries (IMPALA-14417).
Testing: All cases of Planner tests are written in
jdbc-parallel.test. Some basic metrics
are also mentioned in the commit message.
Change-Id: If47d29bdda5b17a1b369440f04d4e209d12133d9
Reviewed-on: http://gerrit.cloudera.org:8080/23112
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
|
||
|
|
ebbc67cf40 |
IMPALA-13801: Support greatest synced event with hierarchical metastore event processing
It is a follow-up jira/commit to IMPALA-12709. IMPALA-12152 and
IMPALA-12785 are affected when hierarchical metastore event
processing feature is enabled.
Following changes are incorporated with this patch:
1. Added creationTime_ and dispatchTime_ fields in MetastoreEvent
class to store the current time in millisec. They are used to
calculate:
a) Event dispatch time(time between a MetastoreEvent object
creation and when event is moved to inProgressLog_ of
EventExecutorService after dispatching it to a
DbEventExecutor).
b) Event schedule delays incurred at DbEventExecutors and
TableEventExecutors(time between an event moved to
EventExecutorService's inProgressLog_ and before start of
processing event at appropriate DbEventExecutor and
TableEventExecutor).
c) Event process time from EventExecutorService point of
view(time spent in inProgressLog_ before it is moved to
processedLog_).
Logs are added to show the event dispatch time, schedule
delays, process time from EventExecutorService point of
view for each event. Also a log is added to show the time
taken for event's processIfEnabled().
2. Added isDelimiter_ field in MetastoreEvent class to indicate
whether it is a delimiter event. It is set only when
hierarchical event processing is enabled. Delimiter is a kind
of metastore event that do not require event processing.
Delimeter event can be:
a) A CommitTxnEvent that do not have any write event info for
a given transaction.
b) An AbortTxnEvent that do not have write ids for a given
transaction.
c) An IgnoredEvent.
An event is determined and marked as delimiter in
EventExecutorService#dispatch(). They are not queued to a
DbEventExecutor for processing. They are just maintained in
the inProgressLog_ to preserve continuity and correctness in
synchronization tracking. The delimiter events are removed from
inProgressLog_ when their preceding non-delimiter metastore
event is removed from inProgressLog_.
3. Greatest synced event id is computed based on the dispatched
events(inProgressLog_) and processed events(processedLog_) tree
maps. Greatest synced event is the latest event such that all
events with id less than or equal to the latest event are
definitely synced.
4. Lag is calculated as difference between latest event time on HMS
and the greatest synced event time. It is shown in the log.
5. Greatest synced event id is used in IMPALA-12152 changes. When
greatest synced event id becomes greater than or equal to
waitForEventId, all the required events are definitely synced.
6. Event processor is paused gracefully when paused with command in
IMPALA-12785. This ensures that all the fetched events from HMS in
current batch are processed before the event processor is fully
paused. It is necessary to process the current batch of events
because, certain events like AllocWriteIdEvent, AbortTxnEvent and
CommitTxnEvent update table write ids in catalog upon metastore
event object creation. And the table write ids are later updated
to appropriate table object during their event process. Can lead
to inconsistent state of write ids on table objects when paused
abruptly in the middle of current batch of event processing.
7. Added greatest synced event id and event time in events processor
metrics. And updated description of lag, pending events, last
synced event id and event time metrics.
8. Atomically update the event queue and increment outstanding event
count in enqueue methods of both DbProcessor and TableProcessor
so that respective process methods do not process the event until
event is added to queue and outstanding event count is incremented.
Otherwise, event can get processed, outstanding event count gets
decremented before it is incremented in enqueue method.
9. Refactored DbEventExecutor, DbProcessor, TableEventExecutor and
TableProcessor classes to propapage the exception occurred along
with event during event processing. EventProcessException is a
wrapper added to hold reference to event being processed and
exception occurred.
10.Added AcidTableWriteInfo helper class to store table, writeids
and partitions for the transaction id received in CommitTxnEvent.
Testing:
- Added new tests and executed existing end to end tests.
- Have executed the existing tests with hierarchical event processing
enabled.
Change-Id: I26240f36aaf85125428dc39a66a2a1e4d3197e85
Reviewed-on: http://gerrit.cloudera.org:8080/22997
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
|
||
|
|
e05d92cb3d |
IMPALA-13548: Schedule scan ranges oldest to newest for tuple caching
Scheduling does not sort scan ranges by modification time. When a new file is added to a table, its order in the list of scan ranges is not based on modification time. Instead, it is based on which partition it belongs to and what its filename is. A new file that is added early in the list of scan ranges can cause cascading differences in scheduling. For tuple caching, this means that multiple runtime cache keys could change due to adding a single file. To minimize that disruption, this adds the ability to sort the scan ranges by modification time and schedule scan ranges oldest to newest. This enables it for scan nodes that feed into tuple cache nodes (similar to deterministic scan range assignment). Testing: - Modified TestTupleCacheFullCluster::test_scan_range_distributed to have stricter checks about how many cache keys change after an insert (only one should change) - Modified TupleCacheTest#testDeterministicScheduling to verify that oldest to newest scheduling is also enabled. Change-Id: Ia4108c7a00c6acf8bbfc036b2b76e7c02ae44d47 Reviewed-on: http://gerrit.cloudera.org:8080/23228 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
ca356a8df5 |
IMPALA-13437 (part 2): Implement cost-based tuple cache placement
This changes the default behavior of the tuple cache to consider cost when placing the TupleCacheNodes. It tries to pick the best locations within a budget. First, it eliminates unprofitable locations via a threshold. Next, it ranks the remaining locations by their profitability. Finally, it picks the best locations in rank order until it reaches the budget. The threshold is based on the ratio of processing cost for regular execution versus the processing cost for reading from the cache. If the ratio is below the threshold, the location is eliminated. The threshold is specified by the tuple_cache_required_cost_reduction_factor query option. This defaults to 3.0, which means that the cost of reading from the cache must be less than 1/3 the cost of computing the value normally. A higher value makes this more restrictive about caching locations, which pushes in the direction of lower overhead. The ranking is based on the cost reduction per byte. This is given by the formula: (regular processing cost - cost to read from cache) / estimated serialized size This prefers locations with small results or high reduction in cost. The budget is based on the estimated serialized size per node. This limits the total caching that a query will do. A higher value allows more caching, which can increase the overhead on the first run of a query. A lower value is less aggressive and can limit the overhead at the expense of less caching. This uses a per-node limit as the limit should scale based on the size of the executor group as each executor brings extra capacity. The budget is specified by the tuple_cache_budget_bytes_per_executor. The old behavior to place the tuple cache at all eligible locations is still available via the tuple_cache_placement_policy query option. The default is the cost_based policy described above, but the old behavior is available via the all_eligible policy. This is useful for correctness testing (and the existing tuple cache test cases). This changes the explain plan output: - The hash trace is only enabled at VERBOSE level. This means that the regular profile will not contain the hash trace, as the regular profile uses EXTENDED. - This adds additional information at VERBOSE to display the cost information for each plan node. This can help trace why a particular location was not picked. Testing: - This adds a TPC-DS planner test with tuple caching enabled (based on the existing TpcdsCpuCostPlannerTest) - This modifies existing tests to adapt to changes in the explain plan output Change-Id: Ifc6e7b95621a7937d892511dc879bf7c8da07cdc Reviewed-on: http://gerrit.cloudera.org:8080/23219 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
821c7347d1 |
IMPALA-13267: Display number of partitions for Iceberg tables
Before this change, query plans and profile reported only a single partition even for partitioned Iceberg tables, which was misleading for users. Now we can display the number of scanned partitions correctly for both partitioned and unpartitioned Iceberg tables. This is achieved by extracting the partition values from the file descriptors and storing them in the IcebergContentFileStore. Instead of storing this information redundantly in all file descriptors, we store them in one place and reference the partition metadata in the FDs with an id. This also gives the opportunity to optimize memory consumption in the Catalog and Coordinator as well as reduce network traffic between them in the future. Time travel is handled similarly to oldFileDescMap. In that case we don't know the total number of partitions in the old snapshot, so the output is [Num scanned partitions]/unknown. Testing: - Planner tests - E2E tests - partition transforms - partition evolution - DROP PARTITION - time travel Change-Id: Ifb2f654bc6c9bdf9cfafc27b38b5ca2f7b6b4872 Reviewed-on: http://gerrit.cloudera.org:8080/23113 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
826c8cf9b0 |
IMPALA-14081: Support create/drop paimon table for impala
This patch mainly implement the creation/drop of paimon table
through impala.
Supported impala data types:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE
Syntax for creating paimon table:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
(
[col_name data_type ,...]
[PRIMARY KEY (col1,col2)]
)
[PARTITIONED BY (col_name data_type [COMMENT 'col_comment'], ...)]
STORED AS PAIMON
[LOCATION 'hdfs_path']
[TBLPROPERTIES (
'primary-key'='col1,col2',
'file.format' = 'orc/parquet',
'bucket' = '2',
'bucket-key' = 'col3',
];
Two types of paimon catalogs are supported.
(1) Create table with hive catalog:
CREATE TABLE paimon_hive_cat(userid INT,movieId INT)
STORED AS PAIMON;
(2) Create table with hadoop catalog:
CREATE [EXTERNAL] TABLE paimon_hadoop_cat
STORED AS PAIMON
TBLPROPERTIES('paimon.catalog'='hadoop',
'paimon.catalog_location'='/path/to/paimon_hadoop_catalog',
'paimon.table_identifier'='paimondb.paimontable');
SHOW TABLE STAT/SHOW COLUMN STAT/SHOW PARTITIONS/SHOW FILES
statements are also supported.
TODO:
- Patches pending submission:
- Query support for paimon data files.
- Partition pruning and predicate push down.
- Query support with time travel.
- Query support for paimon meta tables.
- WIP:
- Complex type query support.
- Virtual Column query support for querying
paimon data table.
- Native paimon table scanner, instead of
jni based.
Testing:
- Add unit test for paimon impala type conversion.
- Add unit test for ToSqlTest.java.
- Add unit test for AnalyzeDDLTest.java.
- Update default_file_format TestEnumCase in
be/src/service/query-options-test.cc.
- Update test case in
testdata/workloads/functional-query/queries/QueryTest/set.test.
- Add test cases in metadata/test_show_create_table.py.
- Add custom test test_paimon.py.
Change-Id: I57e77f28151e4a91353ef77050f9f0cd7d9d05ef
Reviewed-on: http://gerrit.cloudera.org:8080/22914
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
|
||
|
|
711797e7fb |
IMPALA-14349: Encode FileDescriptors in time in loading Iceberg Tables
With this patch we create Iceberg file descriptors from LocatedFileStatus objects during IcebergFileMetadataLoader's parallelListing(). This has the following benefits: * We parallelize the creation of Iceberg file descriptor objects * We don't need to maintain a large hash map with all the LocatedFileStatus objects at once. Now we only need to keep a few LocatedFileStatus objects per partition in memory while we are converting them to Iceberg file descriptors. I.e., the GC is free to destroy the LocatedFileStatus objects we don't use anymore. This patch retires startup flag 'iceberg_reload_new_files_threshold'. Since IMPALA-13254 we only list partitions that have new data files, and we load them in parallel, i.e. efficient incremental table loading is already covered. From that point the startup flag only added unnecessary code complexity. Measurements I created two tables (from tpcds.store_sales) to measure table loading times for large tables: Table #1: PARTITIONED BY SPEC(ss_item_sk, BUCKET(5, ss_sold_time_sk)) partitions: 107818 files: 754726 Table #2: PARTITIONED BY SPEC(ss_item_sk) partitions: 18000 files: 504224 Time taken in IcebergFileMetadataLoader.load() during full table reload: +----------+-------+------+---------+ | | Base | New | Speedup | +----------+-------+------+---------+ | Table #1 | 17.3s | 8.1s | 2.14 | | Table #2 | 7.8s | 4.3s | 1.8 | +----------+-------+------+---------+ I measured incremental table loading only for Table #2 (since there are more files per partition this is the worse scenario for the new code, as it only uses file listings, and each new file were created in a separate partition) Time taken in IcebergFileMetadataLoader.load() during incremental table reload: +------------+------+------+---------+ | #new files | Base | New | Speedup | +------------+------+------+---------+ | 1 | 1.4s | 1.6s | 0.9 | | 100 | 1.5s | 1.9s | 0.8 | | 200 | 1.5s | 1.5s | 1 | +------------+------+------+---------+ We lose a few tenths of a second, but I think the simplified code justifies it. Testing: * some tests were updated because we we don't have startup flag 'iceberg_reload_new_files_threshold' anymore Change-Id: Ia1c2a7119d76db7ce7c43caec2ccb122a014851b Reviewed-on: http://gerrit.cloudera.org:8080/23363 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
cc1cbb559a |
IMPALA-14263: Add broadcast_cost_scale_factor option
This commit enhances the distributed planner's costing model for broadcast joins by introducing the `broadcast_cost_scale_factor` query option. This option enables users to fine-tune the planner's decision between broadcast and partitioned joins. Key changes: - The total broadcast cost is scaled by the new `broadcast_cost_scale_factor` query option, allowing users to favor or penalize broadcast joins as needed when setting query hint is not feasible. - Updated the planner logic and test cases to reflect the new costing model and options. This addresses scenarios where the default costing could lead to suboptimal join distribution choices, particularly in a large-scale cluster where the number of executors can increase broadcast cost, while choosing a partitioned strategy can lead to data skew. Admin can set `broadcast_cost_scale_factor` less than 1.0 to make DistributedPlanner favor broadcast more than partitioned join (with possible downside of higher memory usage per query and higher network transmission). Existing query hints still take precedence over this option. Note that this option is applied independent of `broadcast_to_partition_factor` option (see IMPALA-10287). In MT_DOP>1 setup, it should be sufficient to set `use_dop_for_costing=True` and tune `broadcast_to_partition_factor` only. Testing: Added FE tests. Change-Id: I475f8a26b2171e87952b69f66a5c18f77c2b3133 Reviewed-on: http://gerrit.cloudera.org:8080/23258 Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com> Reviewed-by: Aman Sinha <amsinha@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
aec7380b75 |
IMPALA-14283: Invalidate the cache when served by a new catalogd
Before this patch, coordinator just invalidates the catalog cache when witness the catalog service id changes in DDL/DML responses or statestore catalog updates. This is enough in the legacy catalog mode since these are the only ways that coordinator gets metadata from catalogd. However, in local catalog mode, coordinator sends getPartialCatalogObject requests to fetch metadata from catalogd. If the request is now served by a new catalogd (e.g. due to HA failover), coordinator should invalidate its catalog cache in case catalog version overlaps on the same table and unintentionally reuse stale metadata. To ensure performance, catalogServiceIdLock_ in CatalogdMetaProvider is refactored to be a ReentrantReadWriteLock. Most of the usages on it just need the read lock. This patch also adds the catalog service id in the profile. Tests: - Ran test_warmed_up_metadata_failover_catchup 50 times. - Ran FE tests: CatalogdMetaProviderTest and LocalCatalogTest. - Ran CORE tests Change-Id: I751e43f5d594497a521313579defc5b179dc06ce Reviewed-on: http://gerrit.cloudera.org:8080/23236 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Quanlong Huang <huangquanlong@gmail.com> |
||
|
|
07a5716773 |
IMPALA-13892: Add support for printing STRUCTs
Tuple cache correctness verification is failing as the code in debug-util.cc used for printing the text version of tuples does not support printing structs. It hits a DCHECK and kills Impala. This adds supports for printing structs to debug-util.cc, fixing tuple cache correctness verification for complex types. To print structs correctly, each slot needs to know its field name. The ColumnType has this information, but it requires a field idx to lookup the name. This is the last index in the absolute path for this slot. However, the materialized path can be truncated to remove some indices at the end. Since we need that information to resolve the field name, this adds the struct field idx to the TSlotDescriptor to pass it to the backend. This also adds a counter to the profile to track when correctness verification is on. This is useful for testing. Testing: - Added a custom cluster test using nested types with correctness verification - Examined some of the text files Change-Id: Ib9479754c2766a9dd6483ba065e26a4d3a22e7e9 Reviewed-on: http://gerrit.cloudera.org:8080/23075 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Joe McDonnell <joemcdonnell@cloudera.com> |
||
|
|
64abca481f |
IMPALA-14227: In HA failover, passive catalogd should apply pending HMS events before being active
After IMPALA-14074, the passive catalogd can have a warmed up metadata cache during failover (with catalogd_ha_reset_metadata_on_failover=false and a non-empty warmup_tables_config_file). However, it could still use a stale metadata cache when some pending HMS events generated by the previous active catalogd are not applied yet. This patch adds a wait during HA failover to ensure HMS events before the failover happens are all applied on the new active catalogd. The timeout is configured by a new flag which defaults to 300 (5 minutes): catalogd_ha_failover_catchup_timeout_s. When timeout happens, by default catalogd will fallback to resetting all metadata. Users can decide whether to reset or continue using the current cache. This is configured by another flag, catalogd_ha_reset_metadata_on_failover_catchup_timeout. Since the passive catalogd depends on HMS event processing to keep its metadata up-to-date with the active catalogd, this patch adds validation to avoid starting catalogd with catalogd_ha_reset_metadata_on_failover set to false and hms_event_polling_interval_s <= 0. This patch also makes catalogd_ha_reset_metadata_on_failover a non-hidden flag so it's shown in the /varz web page. Tests: - Ran test_warmed_up_metadata_after_failover 200 times. Without the fix, it usually fails in several runs. - Added new tests for the new flags. Change-Id: Icf4fcb0e27c14197f79625749949b47c033a5f31 Reviewed-on: http://gerrit.cloudera.org:8080/23174 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
c2705fa480 |
IMPALA-14076: Improve readability of workload management query
This patch improve the readability of workload management's insert dml query profiles by: 1. Add a newline between each entry in the VALUES clause. 2. Remove analyzed query from the PLAN column in both tables. For second one, a new query option HIDE_ANALYZED_QUERY is added. If this option is set to True, 'Analyzed query' will not be printed in Plan section of runtime profile. This is helpful for long SQL such as workload management's insert dml query. Testing: - Add explain test case for HIDE_ANALYZED_QUERY option. - Manually run some queries in minicluster with enabled workload management. Confirmed that both improvement happen in DML runtime profile. Change-Id: I30576795dbc2af27a6879684f3757becfd8fc8d0 Reviewed-on: http://gerrit.cloudera.org:8080/23085 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
9f12714d1c |
IMPALA-14224: Cleanup subdirectories in TRUNCATE
If an external table contains data files in subdirectories, and recursive listing is enabled, Impala considers the files in the subdirectories as part of the table. However, currently INSERT OVERWRITE and TRUNCATE do not always delete these files, leading to data corruption. This change takes care of TRUNCATE. Currently TRUNCATE can be run in two different ways: - if the table is being replicated, the HMS api is used - otherwise catalogd deletes the files itself. Two differences between these methods are: - calling HMS leads to an ALTER_TABLE event - calling HMS leads to recursive delete while catalogd only deletes files directly in the partition/table directory. This commit introduces the '--truncate_external_tables_with_hms' startup flag, with default value 'true'. If this flag is set to true, Impala always uses the HMS api for TRUNCATE operations. Note that HMS always deletes stats on TRUNCATE, so setting the DELETE_STATS_IN_TRUNCATE query option to false is not supported if '--truncate_external_tables_with_hms' is set to true: an exception is thrown. Testing: - extended the tests in test_recursive_listing.py::TestRecursiveListing to include TRUNCATE - Moved tests with DELETE_STATS_IN_TRUNCATE=0 from truncate-table.test to truncate-table-no-delete-stats.test, which is run in a new custom cluster test (custom_cluster/test_no_delete_stats_in_truncate.py). Change-Id: Ic0fcc6cf1eca8a0bcf2f93dbb61240da05e35519 Reviewed-on: http://gerrit.cloudera.org:8080/23166 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
c446291ccf |
IMPALA-13074: Add sink node to Web UI's graphical plan for DDL/DML queries
From now on if a plan fragment's root data sink is a table sink, a multi data sink or a merge sink, it will be included in the json response and shown on the Web UI as parent of the plan fragment. Testing * adopted and refined impala-http-handler-test * added new tests for related sink types * tested manually on WebUI with - CTAS statements - UPDATE statements on Iceberg tables - DELETE statements on Iceberg tables - MERGE statements on Iceberg tables Change-Id: Ib2bd442f6499efde7406d87c2b1fd1b46a45381b Reviewed-on: http://gerrit.cloudera.org:8080/22496 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Noemi Pap-Takacs <npaptakacs@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com> |
||
|
|
da190f1d86 |
IMPALA-14074: Warmup metadata cache in catalogd for critical tables
*Background* Catalogd starts with a cold metadata cache - only the db/table names and functions are loaded. Metadata of a table is unloaded until there are queries submitted on the table. The first query will suffer from the delay of loading metadata. There is a flag, --load_catalog_in_background, to let catalogd eagerly load metadata of all tables even if no queries come. Catalogd may load metadata for tables that are possibly never used, potentially increasing catalog size and consequently memory usage. So this flag is turned off by default and not recommended to be used in production. Users do need the metadata of some critical tables to be loaded. Before that the service is considered not ready since important queries might fail in timeout. When Catalogd HA is enabled, it’s also required that the standby catalogd has an up-to-date metadata cache to smoothly take over the active one when failover happens. *New Flags* This patch adds a startup flag for catalogd to specify a config file containing tables that users want their metadata to be loaded. Catalogd adds them to the table loading queue in background when a catalog reset happens, i.e. at catalogd startup or global INVALIDATE METADATA runs. The flag is --warmup_tables_config_file. The value can be a path in the local FS or in remote storage (e.g. HDFS). E.g. --warmup_tables_config_file=file:///opt/impala/warmup_table_list.txt --warmup_tables_config_file=hdfs:///tmp/warmup_table_list.txt Each line in the config file can be a fully qualified table name or a wildcard under a db, e.g. "tpch.*". Catalogd loads the table names at startup and schedules loading on them after a reset of the catalog. The scheduling order is based on the order in the config file. So important tables can be put first. Comments start with "#" or "//" are ignored in the config file. Another flag, --keeps_warmup_tables_loaded (defaults to false), is added to control whether to reload the table after it’s been invalidated, either by an explicit INVALIDATE METADATA <table> command or implicitly invalidated by CatalogdTableInvalidator or HMS RELOAD events. When CatalogdTableInvalidator is enabled with --invalidate_tables_on_memory_pressure=true, users shouldn’t set keeps_warmup_tables_loaded to true if the catalogd heap size is not enough to cache metadata of all these tables. Otherwise, these tables will keep being loaded and invalidated. *Catalogd HA Changes* When Catalogd HA is enabled, the standby catalogd will also reset its catalog and start loading metadata of these tables, after the HA state (active/standby) is determined. Standby catalogd keeps its metadata cache up-to-date by applying HMS notification events. To support a warmed up switch, --catalogd_ha_reset_metadata_on_failover should be set to false. *Limitation* The standby catalogd could still have a stale cache if there are operations in the active catalogd that don’t trigger HMS notification events, or if the HMS notification event is not applied correctly. E.g. Adding a new native function generates an ALTER_DATABASE event, but when applying the event, native function list of the db is not refreshed (IMPALA-14210). These will be resolved in separate JIRAs. *Test* - Added FE unit tests. - Added e2e test for local/hdfs config files. - Added e2e test to verify the standby catalogd has a warmed up cache when failover happens. Change-Id: I2d09eae1f12a8acd2de945984d956d11eeee1ab6 Reviewed-on: http://gerrit.cloudera.org:8080/23155 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
0b1a32fad8 |
IMPALA-13850 (part 4): Implement in-place reset for CatalogD
This patch improve the availability of CatalogD under huge INVALIDATE METADATA operation. Previously, CatalogServiceCatalog.reset() hold versionLock_.writeLock() for the whole reset duration. When the number of database, tables, or functions are big, this write lock can be held for a long time, preventing any other catalog operation from proceeding. This patch improve the situation by: 1. Making CatalogServiceCatalog.reset() rebuild dbCache_ in place and occasionally release the write lock between rebuild stages. 2. Fetch databases, tables, and functions metadata from MetaStore in background using ExecutorService. Added catalog_reset_max_threads flag to control number of threads to do parallel fetch. In order to do so, lexicographic order must be enforced during reset() and ensure all Db invalidation within a single stage is complete before releasing the write lock. Stages should run in approximately the same amount of time. A catalog operation over a database must ensure that no reset operation is currently running, or the database name is lexicographically less than the current database-under-invalidation. This patch adds CatalogResetManager to do background metadata fetching and provide helper methods to help facilitate waiting for reset progress. CatalogServiceCatalog must hold the versionLock_.writeLock() before calling most of CatalogResetManager methods. These are methods in CatalogServiceCatalog class that must wait for CatalogResetManager.waitOngoingMetadataFetch(): addDb() addFunction() addIncompleteTable() addTable() invalidateTableIfExists() removeDb() removeFunction() removeTable() renameTable() replaceTableIfUnchanged() tryLock() updateDb() InvalidateAwareDbSnapshotIterator.hasNext() Concurrent global IM must wait until currently running global IM complete. The waiting happens by calling waitFullMetadataFetch(). CatalogServiceCatalog.getAllDbs() get a snapshot of dbCache_ values at a time. With this patch, it is now possible that some Db in this snapshot maybe removed from dbCache() by concurrent reset(). Caller that cares about snapshot integrity like CatalogServiceCatalog.getCatalogDelta() should be careful when iterating the snapshot. It must iterate in lexicographic order, similar like reset(), and make sure that it does not go beyond the current database-under-invalidation. It also must skip the Db that it is currently being inspected if Db.isRemoved() is True. Added helper class InvalidateAwareDbSnapshot for this kind of iteration Override CatalogServiceCatalog.getDb() and CatalogServiceCatalog.getDbs() to wait until first reset metadata complete or looked up Db found in cache. Expand test_restart_catalogd_twice to test_restart_legacy_catalogd_twice and test_restart_local_catalogd_twice. Update CustomClusterTestSuite.wait_for_wm_init_complete() to correctly pass timeout values to helper methods that it calls. Reduce cluster_size from 10 to 3 in few tests of test_workload_mgmt_init.py to avoid flakiness. Fixed HMS connection leak between tests in AuthorizationStmtTest (see IMPALA-8073). Testing: - Pass exhaustive tests. Change-Id: Ib4ae2154612746b34484391c5950e74b61f85c9d Reviewed-on: http://gerrit.cloudera.org:8080/22640 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Quanlong Huang <huangquanlong@gmail.com> |
||
|
|
c5072807df |
IMPALA-12337: Implement delete orphan files for Iceberg table
This patch implements delete orphan files query for Iceberg table. The following statement becomes available for Iceberg tables: - ALTER TABLE <tbl> EXECUTE remove_orphan_files(<timestamp>) The bulk of implementation copies Hive's implementation of org.apache.iceberg.actions.DeleteOrphanFiles interface (HIVE-27906, 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4), which this patch rename to ImpalaIcebergDeleteOrphanFiles.java. Upon execute(), ImpalaIcebergDeleteOrphanFiles class instance will gather all URI of valid data files and Iceberg metadata files using Iceberg API. These valid URIs then will be compared to recursive file listing obtained through Hadoop FileSystem API under table's 'data' and 'metadata' directory accordingly. Any unmatched URI from FileSystem API listing that has modification time less than 'olderThanTimestamp' parameter will then be removed via Iceberg FileIO API of given Iceberg table. Note that this is a destructive query that will wipe out any files within Iceberg table's 'data' and 'metadata' directory that is not addressable by any valid snapshots. The execution happens in CatalogD via IcebergCatalogOpExecutor.alterTableExecuteRemoveOrphanFiles(). CatalogD supplied CatalogOpExecutor.icebergExecutorService_ as executor service to execute the Iceberg API planFiles and FileIO API for deletion. Also fixed toSql() implementation for all ALTER TABLE EXECUTE queries. Testing: - Add FE and EE tests. Change-Id: I5979cdf15048d5a2c4784918533f65f32e888de0 Reviewed-on: http://gerrit.cloudera.org:8080/23042 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com> |
||
|
|
7b25a7b070 |
IMPALA-13887: Incorporate column/field information into cache key
The correctness verification for the tuple cache found an issue with TestParquet::test_resolution_by_name(). The test creates a table, selects, alters the table to change a column name, and selects again. With parquet_fallback_schema_resolution=NAME, the column names determine behavior. The tuple cache key did not include the column names, so it was producing an incorrect result after changing the column name. This change adds information about the column / field name to the TSlotDescriptor so that it is incorporated into the tuple cache key. This is only needed when producing the tuple cache key, so it is omitted for other cases. Testing: - Ran TestParquet::test_resolution_by_name() with correctness verification - Added custom cluster test that runs the test_resolution_by_name() test case with tuple caching. This fails without this change. Change-Id: Iebfa777452daf66851b86383651d35e1b0a5f262 Reviewed-on: http://gerrit.cloudera.org:8080/23073 Reviewed-by: Yida Wu <wydbaggio000@gmail.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
85a2211bfb |
IMPALA-10349: Support constant folding for non ascii strings
Before this patch constant folding only converted the result of an
expression to StringLiteral if all characters were ASCII. The
change allows both UTF8 strings with non ascii characters and
byte arrays that are not valid UTF8 strings - the latter can
occur when constant folding is applied to BINARY columns,
for example in geospatial functions like st_polygon().
The main goal is being able to push down more predicates, e.g.
before that patch a filter like col="á" couldn't be pushed down
to Iceberg/Kudu/Parquet stat filtering, as all these expect literals.
Main changes:
- TStringLiteral uses a binary instead of a string member.
This doesn't affect BE as in c++ both types are compiled
to std::string. In Jave a java.nio.ByteBuffer is used instead of
String.
- StringLiteral uses a byte[] member to store the value of
the literal in case it is not valid UTF8 and cannot be
represented as Java String. In other cases still a String
is used to keep the change minimal, though it may be more
optimal to use UTF8 byte[] due to the smaller size. Always
converting from byte[] to String may be costy in the catalog
as partition values are stored as *Literals and rest of the
catalog operates on String.
- StringLiteral#compareTo() is switched to byte wise compare on byte[]
to be consistent with BE. This was not needed for ASCII strings
as Java String behaves the same way in that case, but non-ASCII
can have different order (note that Impala does not support
collations).
- When an invalid UTF8 StringLiteral is printed, for example in
case of EXPLAIN output, then it is printed as
unhex("<byte array in hexadecimal>"). This is a non-lossy way to
represent it, but it may be too verbose in some cases, e.g. for
large polygons. A follow up commit may refine this, e.g. by
limiting the max size printed.
An issue found while implementing this is that INSERT does not
handle invalid UTF8 partition values correctly, see IMPALA-14096.
This behavior is not changed in the patch.
Testing:
- Added a few tests that push down non-ascii const expressions in
predicates (both with utf8_mode=true and false).
Change-Id: I70663457a0b0a3443e586350f0a5996bb75ba64a
Reviewed-on: http://gerrit.cloudera.org:8080/22603
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
|
||
|
|
14597c7e2f |
IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
This makes two changes to deflake test_tuple_cache_tpc_queries.py. First, it increases the runtime filter wait time from 60 seconds to 600 seconds. The correctness verification slows down the path that produces the runtime filter. The slowdown is dependent on the speed of storage, so this can get very slow on test machines. Second, this skips correctness checking for locations that are just after streaming aggregations. Streaming aggregations can produce variable output that the correctness checking can't handle. For example a grouping aggregation computing a sum might have a preaggregation produce either (A: 3) or (A: 2), (A: 1) or (A: 1), (A: 1), (A: 1). The finalization sees these as equivalent. This marks the nodes as variable starting with the preaggregation and clears the mark at the finalize stage. When skipping correctness checking, the tuple cache node does not hit the cache normally. This guarantees that its children will run and go through correctness checking. Testing: - Ran test_tuple_cache_tpc_queries.py locally - Added a frontend test for this specific case Change-Id: If5e1be287bdb489a89aea3b2d7bec416220feb9a Reviewed-on: http://gerrit.cloudera.org:8080/23010 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Michael Smith <michael.smith@cloudera.com> |
||
|
|
ccb8eac10a |
IMPALA-14075: Add CatalogOpExecutor.icebergExecutorService_
Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a single thread. It can be really slow on cloud storage systems, especially if the operation needs to remove lots of files. This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize Iceberg API call that supports passing ExecutorService, such as ExpireSnapshots.executeDeleteWith(). Number of threads for this executor service is controlled by CatalogD flag --iceberg_catalog_num_threads. It is default to 16, same as --num_metadata_loading_threads default value. Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match with other validators in backend-gflag-util.cc. Testing: - Lower sleep time between insert queries from 5s to 1s in test_expire_snapshots and test_describe_history_params to speed up tests. - Manually verify that 'IcebergCatalogThread' threads are visible in /jvm-threadz page of CatalogD. - Pass test_iceberg.py. Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485 Reviewed-on: http://gerrit.cloudera.org:8080/22980 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
4837cedc79 |
IMPALA-10319: Support arbitrary encodings on Text files
As proposed in Jira, this implements decoding and encoding of text buffers for Impala/Hive text tables. Given a table with 'serialization.encoding' property set, similarly to Hive, Impala should be able to encode the inserted data into charset specified, consequently saving it into a text file. The opposite decoding operation should be performed upon reading data buffers from text files. Both operations employ boost::locale::conv library. Since Hive doesn't encode line delimiters, charsets that would have delimiters stored differently from ASCII are not allowed. One difference from Hive is that Impala implements 'serialization.encoding' only as a per partition serdeproperty to avoid confusion of allowing both serde and tbl properties. (See related IMPALA-13748) Note: Due to precreated non-UTF-8 files present in the patch 'gerrit-code-review-checks' was performed locally. (See IMPALA-14100) Change-Id: I787cd01caa52a19d6645519a6cedabe0a5253a65 Reviewed-on: http://gerrit.cloudera.org:8080/22049 Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
2a680b302e |
IMPALA-13478: Sync tuple cache files to disk asynchronously
When a tuple cache entry is first being written, we want to sync the contents to disk. Currently, that happens on the fast path and delays the query results, sometimes significantly. This moves the Sync() call off of the fast path by passing the work to a thread pool. The threads in the pool open the file, sync it to disk, then close the file. If anything goes wrong, the cache entry is evicted. The tuple cache can generate writes very quickly, so this needs a backpressure mechanism to avoid overwhelming the disk. In particular, it needs to avoid accumulating dirty buffers to the point that the OS throttles new writes, delaying the query fast path. This implements a limit on outstanding writes (i.e. writes that have not been flushed to disk). To enforce it, writers now call UpdateWriteSize() to reserve space before writing. UpdateWriteSize() can fail if it hits the limit on outstanding writes or if this particular cache entry has hit the maximum size. When it fails, the writer should abort writing the cache entry. Since UpdateWriteSize() is updating the charge in the cache, the outstanding writes are being counted against the capacity, triggering evictions. This improves the tuple cache's adherence to the capacity limit. The outstanding writes limits is configured via the tuple_cache_outstanding_write_limit startup flag, which is either a specific size string (e.g. 1GB) or a percentage of the process memory limit. To avoid updating the cache charge very frequently, this has an update chunk size specified by tuple_cache_outstanding_write_chunk_bytes. This adds counters at the daemon level: - outstanding write bytes - number of writes halted due to backpressure - number of sync calls that fail (due to IO errors) - number of sync calls dropped due to queue backpressure The runtime profile adds a NumTupleCacheBackpressureHalted counter that is set when a write hits the outstanding write limit. This has a startup option to add randomness to the tuple cache keys to make it easy to test a scenario with no cache hits. Testing: - Added unit tests to tuple-cache-mgr-test - Testing with TPC-DS on a cluster with fast NVME SSDs showed a significant improvement in the first-run times due to the asynchronous syncs. - Testing with TPC-H on a system with a slow disk and zero cache hits showed improved behavior with the backpressure Change-Id: I646bb56300656d8b8ac613cb8fe2f85180b386d3 Reviewed-on: http://gerrit.cloudera.org:8080/22215 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
b37f4509fa |
IMPALA-14089: Support REFRESH on multiple partitions
Currently we just support REFRESH on the whole table or a specific partition: REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])] If users want to refresh multiple partitions, they have to submit multiple statements each for a single partition. This has some drawbacks: - It requires holding the table write lock inside catalogd multiple times, which increase lock contention with other read/write operations on the same table, e.g. getPartialCatalogObject requests from coordinators. - Catalog version of the table will be increased multiple times. Coordinators in local catalog mode is more likely to see different versions between their getPartialCatalogObject requests so have to retry planning to resolve InconsistentMetadataFetchException. - Partitions are reloaded in sequence. They should be reloaded in parallel like we do in refreshing the whole table. This patch extends the syntax to refresh multiple partitions in one statement: REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...]) [PARTITION (key_col1=val3 [, key_col2=val4...])...]] Example: REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2); TResetMetadataRequest is extended to have a list of partition specs for this. If the list has only one item, we still use the existing logic of reloading a specific partition. If the list has more than one item, partitions will be reloaded in parallel. This is implemented in CatalogServiceCatalog#reloadTable(). Previously it always invokes HdfsTable#load() with partitionsToUpdate=null. Now the parameter is set when TResetMetadataRequest has the partition list. HMS notification events in RELOAD type will be fired for each partition if enable_reload_events is turned on. Once HIVE-28967 is resolved, we can fire a single event for multiple partitions. Updated docs in impala_refresh.xml. Tests: - Added FE and e2e tests Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e Reviewed-on: http://gerrit.cloudera.org:8080/22938 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
607bad042a |
IMPALA-3841: Enable late materialization for collections
This patch enables late materialization for collections to avoid the cost of materializing collections that will never be accessed by the query. For a collection column, late materialization takes effect only when the collection column is not used in any predicate, including the `!empty()` predicate added by the planner. Otherwise we need to read every row to evaluate the predicate and cannot skip any. Therefore, this patch skips registering the `!empty()` predicates if the query contains zipping unnests. This can affect performance if the table contains many empty collections, but should be noticeable only in very extreme cases. The late materialization threshold is set to 1 in HdfsParquetScanner when there is any collection that can be skipped. This patch also adds the detail of `HdfsScanner::parse_status_` to the error message returned by the HdfsParquetScanner to help figure out the root cause. Performance: - Tests with the queries involving collection columns in table `tpch_nested_parquet.customer` show that when the selectivity is low, the single-threaded (1 impalad and MT_DOP=1) scanning time can be reduced by about 50%, while when the selectivity is high, the scanning time almost does not change. - For queries not involving collections, performance A/B testing shows no regression on TPC-H. Testing: - Added a runtime profile counter NumTopLevelValuesSkipped to record the total number of top-level values skipped for all columns. The counter only counts the values that are not skipped as a page. - Added e2e test cases in test_parquet_late_materialization.py to ensure that late materialization works using the new counter. Change-Id: Ia21bdfa6811408d66d74367e0a9520e20951105f Reviewed-on: http://gerrit.cloudera.org:8080/22662 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
ea0969a772 |
IMPALA-11980 (part 2): Fix absolute import issues for impala_shell
Python 3 changed the behavior of imports with PEP328. Existing imports become absolute unless they use the new relative import syntax. This adapts the impala-shell code to use absolute imports, fixing issues where it is imported from our test code. There are several parts to this: 1. It moves impala shell code into shell/impala_shell. This matches the directory structure of the PyPi package. 2. It changes the imports in the shell code to be absolute paths (i.e. impala_shell.foo rather than foo). This fixes issues with Python 3 absolute imports. It also eliminates the need for ugly hacks in the PyPi package's __init__.py. 3. This changes Thrift generation to put it directly in $IMPALA_HOME/shell rather than $IMPALA_HOME/shell/gen-py. This means that the generated Thrift code is rooted in the same directory as the shell code. 4. This changes the PYTHONPATH to include $IMPALA_HOME/shell and not $IMPALA_HOME/shell/gen-py. This means that the test code is using the same import paths as the pypi package. With all of these changes, the source code is very close to the directory structure of the PyPi package. As long as CMake has generated the thrift files and the Python version file, only a few differences remain. This removes those differences by moving the setup.py / MANIFEST.in and other files from the packaging directory to the top-level shell/ directory. This means that one can pip install directly from the source code. i.e. pip install $IMPALA_HOME/shell This also moves the shell tarball generation script to the packaging directory and changes bin/impala-shell.sh to use Python 3. This sorts the imports using isort for the affected Python files. Testing: - Ran a regular core job with Python 2 - Ran a core job with Python 3 and verified that the absolute import issues are gone. Change-Id: Ica75a24fa6bcb78999b9b6f4f4356951b81c3124 Reviewed-on: http://gerrit.cloudera.org:8080/22330 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Riza Suminto <riza.suminto@cloudera.com> |
||
|
|
3210ec58c5 |
IMPALA-14006: Bound max_instances in CreateInputCollocatedInstances
IMPALA-11604 (part 2) changes how many instances to create in Scheduler::CreateInputCollocatedInstances. This works when the left child fragment of a parent fragment is distributed across nodes. However, if the left child fragment instance is limited to only 1 node (the case of UNPARTITIONED fragment), the scheduler might over-parallelize the parent fragment by scheduling too many instances in a single node. This patch attempts to mitigate the issue in two ways. First, it adds bounding logic in PlanFragment.traverseEffectiveParallelism() to lower parallelism further if the left (probe) side of the child fragment is not well distributed across nodes. Second, it adds TQueryExecRequest.max_parallelism_per_node to relay information from Analyzer.getMaxParallelismPerNode() to the scheduler. With this information, the scheduler can do additional sanity checks to prevent Scheduler::CreateInputCollocatedInstances from over-parallelizing a fragment. Note that this sanity check can also cap MAX_FS_WRITERS option under a similar scenario. Added ScalingVerdict enum and TRACE log it to show the scaling decision steps. Testing: - Add planner test and e2e test that exercise the corner case under COMPUTE_PROCESSING_COST=1 option. - Manually comment the bounding logic in traverseEffectiveParallelism() and confirm that the scheduler's sanity check still enforces the bounding. Change-Id: I65223b820c9fd6e4267d57297b1466d4e56829b3 Reviewed-on: http://gerrit.cloudera.org:8080/22840 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
8f7d2246ec |
IMPALA-12554: (Addendum) Add a flag to not consolidate requests by default
This patch adds a startup flag so that by default the catalog server will not consolidate the grant/revoke requests sent to the Ranger server when there are multiple columns involved in the GRANT/REVOKE statement. Testing: - Added 2 end-to-end tests to make sure the grant/revoke requests sent to the Ranger server would be consolidated only when the flag is explicitly added when we start the catalog server. Change-Id: I4defc59c048be1112380c3a7254ffa8655eee0af Reviewed-on: http://gerrit.cloudera.org:8080/22833 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
5db760662f |
IMPALA-12709: Add support for hierarchical metastore event processing
At present, metastore event processor is single threaded. Notification
events are processed sequentially with a maximum limit of 1000 events
fetched and processed in a single batch. Multiple locks are used to
address the concurrency issues that may arise when catalog DDL
operation processing and metastore event processing tries to
access/update the catalog objects concurrently. Waiting for a lock or
file metadata loading of a table can slow the event processing and can
affect the processing of other events following it. Those events may
not be dependent on the previous event. Altogether it takes a very
long time to synchronize all the HMS events.
Existing metastore event processing is turned into multi-level
event processing with enable_hierarchical_event_processing flag. It
is not enabled by default. Idea is to segregate the events based on
their dependency, maintain the order of events as they occur within
the dependency and process them independently as much as possible.
Following 3 main classes represents the three level threaded event
processing.
1. EventExecutorService
It provides the necessary methods to initialize, start, clear,
stop and process the metastore events processing in hierarchical
mode. It is instantiated from MetastoreEventsProcessor and its
methods are invoked from MetastoreEventsProcessor. Upon receiving
the event to process, EventExecutorService queues the event to
appropriate DbEventExecutor for processing.
2. DbEventExecutor
An instance of this class has an execution thread, manage events
of multiple databases with DbProcessors. An instance of DbProcessor
is maintained to store the context of each database within the
DbEventExecutor. On each scheduled execution, input events on
DbProcessor are segregated to appropriate TableProcessors for the
event processing and also process the database events that are
eligible for processing.
Once a DbEventExecutor is assigned to a database, a DbProcessor
is created. And the subsequent events belonging to the database
are queued to same DbEventExecutor thread for further processing.
Hence, linearizability is ensured in dealing with events within
the database. Each instance of DbEventExecutor has a fixed list
of TableEventExecutors.
3. TableEventExecutor
An instance of this class has an execution thread, processes
events of multiple tables with TableProcessors. An instance of
TableProcessor is maintained to store context of each table within
a TableEventExecutor. On each scheduled execution, events from
TableProcessors are processed.
Once a TableEventExecutor is assigned to table, a TableProcessor
is created. And the subsequent table events are processed by same
TableEventExecutor thread. Hence, linearizability is guaranteed
in processing events of a particular table.
- All the events of a table are processed in the same order they
have occurred.
- Events of different tables are processed in parallel when those
tables are assigned to different TableEventExecutors.
Following new events are added:
1. DbBarrierEvent
This event wraps a database event. It is used to synchronize all
the TableProcessors belonging to database before processing the
database event. It acts as a barrier to restrict the processing
of table events that occurred after the database event until the
database event is processed on DbProcessor.
2. RenameTableBarrierEvent
This event wraps an alter table event for rename. It is used to
synchronize the source and target TableProcessors to
process the rename table event. It ensures the source
TableProcessor removes the table first and then allows the target
TableProcessor to create the renamed table.
3. PseudoCommitTxnEvent and PseudoAbortTxnEvent
CommitTxnEvent and AbortTxnEvent can involve multiple tables in
a transaction and processing these events modifies multiple table
objects. Pseudo events are introduced such that a pseudo event is
created for each table involved in the transaction and these
pseudo events are processed independently at respective
TableProcessors.
Following new flags are introduced:
1. enable_hierarchical_event_processing
To enable the hierarchical event processing on catalogd.
2. num_db_event_executors
To set the number of database level event executors.
3. num_table_event_executors_per_db_event_executor
To set the number of table level event executors within a
database event executor.
4. min_event_processor_idle_ms
To set the minimum time to retain idle db processors and table
processors on the database event executors and table event
executors respectively, when they do not have events to process.
5. max_outstanding_events_on_executors
To set the limit of maximum outstanding events to process on
event executors.
Changed hms_event_polling_interval_s type from int to double to support
millisecond precision interval
TODOs:
1. We need to redefine the lag in the hierarchical processing mode.
2. Need to have a mechanism to capture the actual event processing time
in hierarchical processing mode. Currently, with
enable_hierarchical_event_processing as true, lastSyncedEventId_ and
lastSyncedEventTimeSecs_ are updated upon event dispatch to
EventExecutorService for processing on respective DbEventExecutor
and/or TableEventExecutor. So lastSyncedEventId_ and
lastSyncedEventTimeSecs_ doesn't actually mean events are processed.
3. Hierarchical processing mode currently have a mechanism to show the
total number of outstanding events on all the db and table executors
at the moment. Need to enhance observability further with this mode.
Filed a jira[IMPALA-13801] to fix them.
Testing:
- Executed existing end to end tests.
- Added fe and end-to-end tests with enable_hierarchical_event_processing.
- Added event processing performance tests.
- Have executed the existing tests with hierarchical processing
mode enabled. lastSyncedEventId_ is now used in the new feature of
sync_hms_events_wait_time_s (IMPALA-12152) as well. Some tests fail when
hierarchical processing mode is enabled because lastSyncedEventId_ do
not actually mean event is processed in this mode. This need to be
fixed/verified with above jira[IMPALA-13801].
Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Reviewed-on: http://gerrit.cloudera.org:8080/21031
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
|
||
|
|
faf322dd41 |
IMPALA-12927: Support specifying format for reading JSON BINARY columns
Currently, Impala always assumes that the data in the binary columns of JSON tables is base64 encoded. However, before HIVE-21240, Hive wrote binary data to JSON tables without base64 encoding it, instead writing it as escaped strings. After HIVE-21240, Hive defaults to base64 encoding binary data when writing to JSON tables and introduces the serde property 'json.binary.format' to indicate the encoding method of binary data in JSON tables. To maintain consistency with Hive and avoid correctness issues caused by reading data in an incorrect manner, this patch also introduces the serde property 'json.binary.format' to specify the reading method for binary data in JSON tables. Currently, this property supports reading in either base64 or rawstring formats, same as Hive. Additionally, this patch introduces a query option 'json_binary_format' to achieve the same effect. This query option will only take effect for JSON tables where the serde property 'json.binary.format' is not set. The reading format of binary columns in JSON tables can be configured globally by setting the 'default_query_options'. It should be noted that the default value of 'json_binary_format' is 'NONE', and impala will prohibit reading binary columns of JSON tables that either have "no 'json.binary.format' set and 'json_binary_format' is 'NONE'" or "an invalid 'json.binary.format' value set", and will provide an error message to avoid using an incorrect format without the user noticing. Testing: - Enabled existing binary type E2E tests for JSON tables - Added new E2E test for 'json.binary.format' Change-Id: Idf61fa3afc0f33caa63fbc05393e975733165e82 Reviewed-on: http://gerrit.cloudera.org:8080/22289 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
56b465d91f |
IMPALA-13829: Postpone catalog deleteLog GC for waitForHmsEvent requests
When a db/table is removed in the catalog cache, catalogd assigns it a new catalog version and put it into the deleteLog. This is used for the catalog update thread to collect deletion updates. Once the catalog update thread collects a range of updates, it triggers GC in the deleteLog to clear items older than the last sent catalog version. The deletions will be broadcasted by statestore to all the coordinators eventually. However, waitForHmsEvent requests is also a consumer of the deleteLog and could be impacted by these GCs. waitForHmsEvent is a catalogd RPC used by coordinators when a query wants to wait until the related metadata is in synced with HMS. The response of waitForHmsEvent returns the latest metadata including the deletions on related dbs/tables. If the related deletions in deleteLog is GCed just before the waitForHmsEvent request collects the results, they will be missing in the response. Coordinator might keep using stale metadata of non-existing dbs/tables. This is a quick fix for the issue by postponing deleteLog GC in a configurable number of topic updates, similar to what we have done on the TopicUpdateLog. A thorough fix might need to carefully choose the version to GC or let impalad waits for the deletions from statestore to arrive. A new flag, catalog_delete_log_ttl, is added for this. The deleteLog items can survive for catalog_delete_log_ttl catalog updates. The default is 60 so a deletion can survive for at least 120s. It should be safe enough, i.e. the GCed deletions must have arrived in the impalad side after 60 rounds of catalog updates, otherwise that's an abnormal impalad and already has other more severe issues, e.g. lots of stale tables due to metadata out of sync with catalogd. Note that postponing deleteLog GCs might increase the memory consumption. But since most of its memory is used by db/table/partition names, the memory usage might still be trivial comparing to other metadata like file descriptors and incremental stats in lived catalog objects. This patch also removed some unused imports. Tests: - Added e2e test with a debug action to reproduce the issue. Ran the test 100 times. Without the fix, it consistently fails when runs for 2-3 times. Change-Id: I2441440bca2b928205dd514047ba742a5e8bf05e Reviewed-on: http://gerrit.cloudera.org:8080/22816 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
e7aa31296c |
IMPALA-13738 (Part2): Clean up code in Catalog's table and partition interfaces
Prepare for FE/Catalog refactor by resolving some TODOs and cleaning up unused code in most table and partition interfaces. - removed dead code - removed unused imports - moved static functions from Util classes to interfaces as default methods Testing: Run existing tests to validate changes. Change-Id: I8d9c7ba876e39fa4f4d067761f25dc4ecfd55702 Reviewed-on: http://gerrit.cloudera.org:8080/22728 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
4ddacac14f |
IMPALA-11402: Add limit on files fetched by a single getPartialCatalogObject request
getPartialCatalogObject is a catalogd RPC used by local catalog mode coordinators to fetch metadata on-demand from catalogd. For a table with a huge number (e.g. 6M) of files, catalogd might hit OOM of exceeding the JVM array limit when serializing the response of a getPartialCatalogObject request for all partitions (thus all files). This patch adds a new flag, catalog_partial_fetch_max_files, to define the max number of file descriptors allowed in a response of getPartialCatalogObject. Catalogd will truncate the response in partition level when it's too big, and only return a subset of the requested partitions. Coordinator should send new requests to fetch the remaining partitions. Note that it's possible that table metadata changes between the requests. Coordinator will detect the catalog version changes and throws an InconsistentMetadataFetchException for the planner to replan the query. This is an existing mechanism for other kinds of table metadata. Here are some metrics of the number of files in a single response and the corresponding byte array size and duration of a single response: * 1000000: 371.71MB, 1s487ms * 2000000: 744.51MB, 4s035ms * 3000000: 1.09GB, 6s643ms * 4000000: 1.46GB, duration not measured due to GC pauses * 5000000: 1.82GB, duration not measured due to GC pauses * 6000000: >2GB (hit OOM) Choose 1000000 as the default value for now. We can tune it in the future. Tests: - Added custom-cluster test - Ran e2e tests in local-catalog mode with catalog_partial_fetch_max_files=1000 so the new codes are used. Change-Id: Ibb13fec20de5a17e7fc33613ca5cdebb9ac1a1e5 Reviewed-on: http://gerrit.cloudera.org:8080/22559 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> |
||
|
|
fb789df3be |
IMPALA-13684: Improve waitForHmsEvent() to only wait for related events
waitForHmsEvent is a catalogd RPC for coordinators to send a requested
db/table names to catalogd and wait until it's safe (i.e. no stale
metadata) to start analyzing the statement. The wait time is configured
by query option sync_hms_events_wait_time_s. Currently, when this option
is enabled, catalogd waits until it syncs to the latest HMS event
regardless what the query is.
This patch reduces waiting by only checking related events and wait
until the last related event has been processed. In the ideal case, if
there are no pending events that are related, the query doesn't need to
wait.
Related pending events are determined as follows:
- For queries that need the db list, i.e. SHOW DATABASES, check pending
CREATE/ALTER/DROP_DATABASE events on all dbs. ALTER_DATABASE events
are checked in case the ownership changes and impacts visibility.
- For db statements like SHOW FUNCTIONS, CREATE/ALTER/DROP DATABASE,
check pending CREATE/ALTER/DROP events on that db.
- For db statements that require the table list, i.e. SHOW TABLES,
also check CREATE_TABLE, DROP_TABLE events under that db.
- For table statements,
- check all database events on related db names.
- If there are loaded transactional tables, check all the pending
COMMIT_TXN, ABORT_TXN events. Note that these events might modify
multiple transactional tables and we don't know their table names
until they are processed. To be safe, wait for all transactional
events.
- For all the other table names,
- if they are all missing/unloaded in the catalog, check all the
pending CREATE_TABLE, DROP_TABLE events on them for their
existence.
- Otherwise, some of them are loaded, check all the table events on
them. Note that we can fetch events on multiple tables under the
same db in a single fetch.
If the statement has a SELECT part, views will be expanded so underlying
tables will be checked as well. For performance, this feature assumes
that views won't be changed to tables, and vice versa. This is a rare
use case in regular jobs. Users should use INVALIDATE for such case.
This patch leverages the HMS API to fetch events of several tables under
the same db in batch. MetastoreEventsProcessor.MetaDataFilter is
improved for this.
Tests:
- Added test for multiple tables in a single query.
- Added test with views.
- Added test for transactional tables.
- Ran CORE tests.
Change-Id: Ic033b7e197cd19505653c3ff80c4857cc474bcfc
Reviewed-on: http://gerrit.cloudera.org:8080/22571
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
|
||
|
|
c5a0ec8bdf |
IMPALA-11980 (part 1): Put all thrift-generated python code into the impala_thrift_gen package
This puts all of the thrift-generated python code into the impala_thrift_gen package. This is similar to what Impyla does for its thrift-generated python code, except that it uses the impala_thrift_gen package rather than impala._thrift_gen. This is a preparatory patch for fixing the absolute import issues. This patches all of the thrift files to add the python namespace. This has code to apply the patching to the thirdparty thrift files (hive_metastore.thrift, fb303.thrift) to do the same. Putting all the generated python into a package makes it easier to understand where the imports are getting code. When the subsequent change rearranges the shell code, the thrift generated code can stay in a separate directory. This uses isort to sort the imports for the affected Python files with the provided .isort.cfg file. This also adds an impala-isort shell script to make it easy to run. Testing: - Ran a core job Change-Id: Ie2927f22c7257aa38a78084efe5bd76d566493c0 Reviewed-on: http://gerrit.cloudera.org:8080/20169 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> |
||
|
|
706e1f026c |
IMPALA-13657: Connect Calcite planner to Impala Frontend framework
This commit adds the plumbing created by IMPALA-13653. The Calcite
planner is now called from Impala's Frontend code via 4 hooks which
are:
- CalciteCompilerFactory: the factory class that creates
the implementations of the parser, analysis, and single node
planner hooks.
- CalciteParsedStatement: The class which holds the Calcite SqlNode
AST.
- CalciteAnalysisDriver: The class that does the validation of the
SqlNode AST
- CalciteSingleNodePlanner: The class that converts the AST to a
logical plan, optimizes it, and converts it into an Impala
PlanNode physical plan.
To run on Calcite, one needs to do two things:
1) set the USE_CALCITE_PLANNER env variable to true before starting
the cluster. This adds the jar file into the path in the
bin/setclasspath.sh file, which is not there by default at the time
of this commit.
2) set the use_calcite_planner query option to true.
This commit makes the CalciteJniFrontend class obsolete. Once the
test cases are moved out of there, that class and others can be
removed.
Change-Id: I3b30571beb797ede827ef4d794b8daefb130ccb1
Reviewed-on: http://gerrit.cloudera.org:8080/22319
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
|