1513 Commits

Author SHA1 Message Date
Daniel Vanko
9d112dae23 IMPALA-14536: Fix CONVERT TO ICEBERG to not throw exception on Iceberg tables
Previously, running ALTER TABLE <table> CONVERT TO ICEBERG on an Iceberg
table produced an error. This patch fixes that, so the statement will do
nothing when called on an Iceberg table and return with 'Table has
already been migrated.' message.

This is achieved by adding a new flag to StatementBase to signal when a
statement ends up NO_OP, if that's true, the new TStmtType::NO_OP will
be set as TExecRequest's type and noop_result can be used to set result
from Frontend-side.

Tests:
 * extended fe and e2e tests

Change-Id: I41ecbfd350d38e4e3fd7b813a4fc27211d828f73
Reviewed-on: http://gerrit.cloudera.org:8080/23699
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Peter Rozsa <prozsa@cloudera.com>
2025-12-12 15:35:28 +00:00
Arnab Karmakar
ddd82e02b9 IMPALA-14065: Support WHERE clause in SHOW PARTITIONS statement
This patch extends the SHOW PARTITIONS statement to allow an optional
WHERE clause that filters partitions based on partition column values.
The implementation adds support for various comparison operators,
IN lists, BETWEEN clauses, IS NULL, and logical AND/OR expressions
involving partition columns.

Non-partition columns, subqueries, and analytic expressions in the
WHERE clause are not allowed and will result in an analysis error.

New analyzer tests have been added to AnalyzeDDLTest#TestShowPartitions
to verify correct parsing, semantic validation, and error handling for
supported and unsupported cases.

Testing:
- Added new unit tests in AnalyzeDDLTest for valid and invalid WHERE
clause cases.
- Verified functional tests covering partition filtering behavior.

Change-Id: I2e2a14aabcea3fb17083d4ad6f87b7861113f89e
Reviewed-on: http://gerrit.cloudera.org:8080/23566
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-12-11 15:36:08 +00:00
Sai Hemanth Gantasala
1684c2d9da IMPALA-14131: Add flag to configure the default value of
'impala.disableHmsSync'

FEATURE: Implement global 'disable_hms_sync_by_default' flag for event
processing. This change introduces a new catalogd startup flag,
`disable_hms_sync_by_default`, to simplify skipping/processing events.

Problem: Disabling event processing globally requires tedious process
of setting 'impala.disableHmsSync' property on every database and table,
especially if few specific tables requires sync up of events.

Solution: The new flag provides a global default for the
'impala.disableHmsSync' property.

Behavior:
- If `disable_hms_sync_by_default` is true (the intended default-off
state), event processing is skipped for all tables/databases unless
the property "impala.disableHmsSync"="false" is explicitly set.
- This allows users to easily keep event processing off by default
and opt-in specific databases or tables to start syncing.
- The check order is: table-property > db-property > global default.
- HMS polling remains independent and unaffected by this flag.

Change-Id: I4ee617aed48575502d9cf5cf2cbea6ec897d6839
Reviewed-on: http://gerrit.cloudera.org:8080/23487
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-12-11 05:10:32 +00:00
jichen0919
7e29ac23da IMPALA-14092 Part2: Support querying of paimon data table via JNI
This patch mainly implement the querying of paimon data table
through JNI based scanner.

Features implemented:
- support column pruning.
The partition pruning and predicate push down will be submitted
as the third part of the patch.

We implemented this by treating the paimon table as normal
unpartitioned table. When querying paimon table:
- PaimonScanNode will decide paimon splits need to be scanned,
  and then transfer splits to BE do the jni-based scan operation.

- We also collect the required columns that need to be scanned,
  and pass the columns to Scanner for column pruning. This is
  implemented by passing the field ids of the columns to BE,
  instead of column position to support schema evolution.

- In the original implementation, PaimonJniScanner will directly
  pass paimon row object to BE, and call corresponding paimon row
  field accessor, which is a java method to convert row fields to
  impala row batch tuples. We find it is slow due to overhead of
  JVM method calling.
  To minimize the overhead, we refashioned the implementation,
  the PaimonJniScanner will convert the paimon row batches to
  arrow recordbatch, which stores data in offheap region of
  impala JVM. And PaimonJniScanner will pass the arrow offheap
  record batch memory pointer to the BE backend.
  BE PaimonJniScanNode will directly read data from JVM offheap
  region, and convert the arrow record batch to impala row batch.

  The benchmark shows the later implementation is 2.x better
  than the original implementation.

  The lifecycle of arrow row batch is mainly like this:
  the arrow row batch is generated in FE,and passed to BE.
  After the record batch is imported to BE successfully,
  BE will be in charge of freeing the row batch.
  There are two free paths: the normal path, and the
  exception path. For the normal path, when the arrow batch
  is totally consumed by BE, BE will call jni to fetch the next arrow
  batch. For this case, the arrow batch is freed automatically.
  For the exceptional path, it happends when query  is cancelled, or memory
  failed to allocate. For these corner cases, arrow batch is freed in the
  method close if it is not totally consumed by BE.

Current supported impala data types for query includes:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE

TODO:
    - Patches pending submission:
        - Support tpcds/tpch data-loading
          for paimon data table.
        - Virtual Column query support for querying
          paimon data table.
        - Query support with time travel.
        - Query support for paimon meta tables.
    - WIP:
        - Snapshot incremental read.
        - Complex type query support.
        - Native paimon table scanner, instead of
          jni based.

Testing:
    - Create tests table in functional_schema_template.sql
    - Add TestPaimonScannerWithLimit in test_scanners.py
    - Add test_paimon_query in test_paimon.py.
    - Already passed the tpcds/tpch test for paimon table, due to the
      testing table data is currently generated by spark, and it is
      not supported by impala now, we have to do this since hive
      doesn't support generating paimon table for dynamic-partitioned
      tables. we plan to submit a separate patch for tpcds/tpch data
      loading and associated tpcds/tpch query tests.
    - JVM Offheap memory leak tests, have run looped tpch tests for
      1 day, no obvious offheap memory increase is observed,
      offheap memory usage is within 10M.

Change-Id: Ie679a89a8cc21d52b583422336b9f747bdf37384
Reviewed-on: http://gerrit.cloudera.org:8080/23613
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
2025-12-05 18:19:57 +00:00
ttttttz
5d1f1e0180 IMPALA-14183: Rename the environment variable USE_APACHE_HIVE to USE_APACHE_HIVE_3
When the environment variable USE_APACHE_HIVE is set to true, build
Impala for adapting to Apache Hive 3.x. In order to better distinguish it
from Apache Hive 2.x later, rename USE_APACHE_HIVE to USE_APACHE_HIVE_3.
Additionally, to facilitate referencing different versions of the Hive
MetastoreShim, the major version of Hive has been added to the environment
variable IMPALA_HIVE_DIST_TYPE.

Change-Id: I11b5fe1604b6fc34469fb357c98784b7ad88574d
Reviewed-on: http://gerrit.cloudera.org:8080/21724
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-12-03 13:38:45 +00:00
Noemi Pap-Takacs
fdad9d3204 IMPALA-13725: Add Iceberg table repair functionalities
In some cases users delete files directly from storage without
going through the Iceberg API, e.g. they remove old partitions.

This corrupts the table, and makes queries that try to read the
missing files fail.
This change introduces a repair statement that deletes the
dangling references of missing files from the metadata.
Note that the table cannot be repaired if there are missing
delete files because Iceberg's DeleteFiles API which is used
to execute the operation allows removing only data files.

Testing:
 - E2E
   - HDFS
   - S3, Ozone
 - analysis

Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e
Reviewed-on: http://gerrit.cloudera.org:8080/23512
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-11-25 13:03:52 +00:00
Arnab Karmakar
a2a11dec62 IMPALA-13263: Add single-argument overload for ST_ConvexHull()
Implemented a single-argument version of ST_ConvexHull() to align with
PostGIS behavior and simplify usage across geometry types.

Testing:
Added new tests in test_geospatial_functions.py for ST_ConvexHull(),
which previously had no test coverage, to verify correctness across
supported geometry types.

Change-Id: Idb17d98f5e75929ec0143aa16195a84dd6e50796
Reviewed-on: http://gerrit.cloudera.org:8080/23604
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
2025-11-18 10:26:04 +00:00
Arnab Karmakar
068158e495 IMPALA-12401: Support more info types for HS2 GetInfo() API
This patch adds support for 40+ additional TGetInfoType values in the
HiveServer2 GetInfo() API, improving ODBC/JDBC driver compatibility.

Previously, only 3 info types were supported (CLI_SERVER_NAME,
CLI_DBMS_NAME, CLI_DBMS_VER).

The implementation follows the ODBC CLI specification and matches the
behavior of Hive's GetInfo implementation where applicable.

Testing:
- Added unit tests in test_hs2.py for new info types
- Tests verify correct return values and data types for each info type

Change-Id: I1ce5f2b9dcc2e4633b4679b002f57b5b4ea3e8bf
Reviewed-on: http://gerrit.cloudera.org:8080/23528
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
2025-11-17 19:32:50 +00:00
Mihaly Szjatinya
087b715a2b IMPALA-14108: Add support for SHOW FILES IN table PARTITION for Iceberg
tables

This patch implements partition filtering support for the SHOW FILES
statement on Iceberg tables, based on the functionality added in
IMPALA-12243. Prior to this change, the syntax resulted in a
NullPointerException.

Key changes:
- Added ShowFilesStmt.analyzeIceberg() to validate and transform
  partition expressions using IcebergPartitionExpressionRewriter and
  IcebergPartitionPredicateConverter. After that, it collects matching
  file paths using IcebergUtil.planFiles().
- Added FeIcebergTable.Utils.getIcebergTableFilesFromPaths() to
  accept pre-filtered file lists from the analysis phase.
- Enhanced TShowFilesParams thrift struct with optional selected_files
  field to pass pre-filtered file paths from frontend to backend.

Testing:
- Analyzer tests for negative cases: non-existent partitions, invalid
  expressions, non-partition columns, unsupported transforms.
- Analyzer tests for positive cases: all transform types, complex
  expressions.
- Authorization tests for non-filtered and filtered syntaxes.
- E2E tests covering every partition transform type with various
  predicates.
- Schema evolution and rollback scenarios.

The implementation follows AlterTableDropPartition's pattern where the
analysis phase performs validation/metadata retrieval and the execution
phase handles result formatting and display.

Change-Id: Ibb9913e078e6842861bdbb004ed5d67286bd3152
Reviewed-on: http://gerrit.cloudera.org:8080/23455
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-11-14 21:43:10 +00:00
Arnab Karmakar
760eb4f2fa IMPALA-13066: Extend SHOW CREATE TABLE to include stats and partitions
Adds a new WITH STATS option to the SHOW CREATE TABLE statement to
emit additional SQL statements for recreating table statistics and
partitions.

When specified, Impala outputs:

- Base CREATE TABLE statement.

- ALTER TABLE ... SET TBLPROPERTIES for table-level stats.

- ALTER TABLE ... SET COLUMN STATS for all non-partition columns,
restoring column stats.

- For partitioned tables:

  - ALTER TABLE ... ADD PARTITION statements to recreate partitions.

  - Per-partition ALTER TABLE ... PARTITION (...) SET TBLPROPERTIES
  to restore partition-level stats.

Partition output is limited by the PARTITION_LIMIT query option
(default 1000). Setting PARTITION_LIMIT=0 includes all partitions and
emits a warning if the limit is exceeded.

Tests added to verify correctness of emitted statements. Default
behavior of SHOW CREATE TABLE remains unchanged for compatibility.

Change-Id: I87950ae9d9bb73cb2a435cf5bcad076df1570dc2
Reviewed-on: http://gerrit.cloudera.org:8080/23536
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-11-12 06:11:37 +00:00
jichen0919
541fb3f405 IMPALA-14092 Part1: Prohibit Unsupported Operation for paimon table
This patch is to prohibit un-supported operation against
paimon table. All unsupported operations are added the
checked in the analyze stage in order to avoid
mis-operation. Currently only CREATE/DROP statement
is supported, the prohibition will be removed later
after the corresponding operation is truly supported.

TODO:
    - Patches pending submission:
        - Support jni based query for paimon data table.
        - Support tpcds/tpch data-loading
          for paimon data table.
        - Virtual Column query support for querying
          paimon data table.
        - Query support with time travel.
        - Query support for paimon meta tables.

Testing:
    - Add unit test for AnalyzeDDLTest.java.
    - Add unit test for AnalyzerTest.java.
    - Add test_paimon_negative and test_paimon_query in test_paimon.py.

Change-Id: Ie39fa4836cb1be1b1a53aa62d5c02d7ec8fdc9d7
Reviewed-on: http://gerrit.cloudera.org:8080/23530
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-10-23 23:06:08 +00:00
Joe McDonnell
1913ab46ed IMPALA-14501: Migrate most scripts from impala-python to impala-python3
To remove the dependency on Python 2, existing scripts need to use
python3 rather than python. These commands find those
locations (for impala-python and regular python):
git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python
git grep bin/python | grep -v python3

This removes or switches most of these locations by various means:
1. If a python file has a #!/bin/env impala-python (or python) but
   doesn't have a main function, it removes the hash-bang and makes
   sure that the file is not executable.
2. Most scripts can simply switch from impala-python to impala-python3
   (or python to python3) with minimal changes.
3. The cm-api pypi package (which doesn't support Python 3) has been
   replaced by the cm-client pypi package and interfaces have changed.
   Rather than migrating the code (which hasn't been used in years), this
   deletes the old code and stops installing cm-api into the virtualenv.
   The code can be restored and revamped if there is any interest in
   interacting with CM clusters.
4. This switches tests/comparison over to impala-python3, but this code has
   bit-rotted. Some pieces can be run manually, but it can't be fully
   verified with Python 3. It shouldn't hold back the migration on its own.
5. This also replaces locations of impala-python in comments / documentation /
   READMEs.
6. kazoo (used for interacting with HBase) needed to be upgraded to a
   version that supports Python 3. The newest version of kazoo requires
   upgrades of other component versions, so this uses kazoo 2.8.0 to avoid
   needing other upgrades.

The two remaining uses of impala-python are:
 - bin/cmake_aux/create_virtualenv.sh
 - bin/impala-env-versioned-python
These will be removed separately when we drop Python 2 support
completely. In particular, these are useful for testing impala-shell
with Python 2 until we stop supporting Python 2 for impala-shell.

The docker-based tests still use /usr/bin/python, but this can
be switched over independently (and doesn't impact impala-python)

Testing:
 - Ran core job
 - Ran build + dataload on Centos 7, Redhat 8
 - Manual testing of individual scripts (except some bitrotted areas like the
   random query generator)

Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc
Reviewed-on: http://gerrit.cloudera.org:8080/23468
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
2025-10-22 16:30:17 +00:00
stiga-huang
f0a781806f IMPALA-14494: Tag catalogd logs of GetPartialCatalogObject requests with correct query ids
Catalogd logs of GetPartialCatalogObject requests are not tagged with
correct query ids. Instead, the query id that is previously using that
thread is printed in the logs. This is fixed by using
ScopedThreadContext which resets the query id at the end of the RPC
code.

Add DCHECKs to make sure ThreadDebugInfo is initialized before being
used in Catalog methods. An instance is added in CatalogdMain() for
this.

This patch also adds the query id in GetPartialCatalogObject requests so
catalogd can tag the responding thread with it.

Some codes are copied from Michael Smith's patch: https://gerrit.cloudera.org/c/22738/

Tested by enabling TRACE logging in org.apache.impala.common.JniUtil to
verify logs of GetPartialCatalogObject requests.

I20251014 09:39:39.685225 342587 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of CATALOG_SERVICE_ID
I20251014 09:39:39.690346 342587 JniUtil.java:176] 964e37e9303d6f8a:eab7096000000000] Finished getPartialCatalogObject request: Getting partial catalog object of CATALOG_SERVICE_ID. Time spent: 5ms
I20251014 09:39:39.699471 342587 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of DATABASE:functional
I20251014 09:39:39.701821 342587 JniUtil.java:176] 964e37e9303d6f8a:eab7096000000000] Finished getPartialCatalogObject request: Getting partial catalog object of DATABASE:functional. Time spent: 2ms
I20251014 09:39:39.711462 341074 TAcceptQueueServer.cpp:368] New connection to server CatalogService from client <Host: 127.0.0.1 Port: 42084>
I20251014 09:39:39.719146 342588 JniUtil.java:165] 964e37e9303d6f8a:eab7096000000000] getPartialCatalogObject request: Getting partial catalog object of TABLE:functional.alltypestiny

Change-Id: Ie63363ac60e153e3a69f2a4cf6a0f4ce10701674
Reviewed-on: http://gerrit.cloudera.org:8080/23535
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-10-16 07:06:29 +00:00
Riza Suminto
1008decc07 IMPALA-14447: Parallelize table loading in getMissingTables()
StmtMetadataLoader.getMissingTables() load missing tables in serial
manner. In local catalog mode, large number of serial table loading can
incur significant round trip latency to CatalogD. This patch parallelize
the table loading by using executor service to lookup and gather all
non-null FeTables from given TableName set.

Modify LocalCatalog.loadDbs() and LocalDb.loadTableNames() slightly to
make it thread-safe. Change FrontendProfile.Scope to support nested
scope referencing the same FrontendProfile instance.

Added new flag max_stmt_metadata_loader_threads to control the maximum
number of threads to use for loading table metadata during query
compilation. It is deafult to 8 threads per query compilation.

If there is only one table to load, max_stmt_metadata_loader_threads set
to 1, or RejectedExecutionException raised, fallback to load table
serially.

Testing:
Run and pass few tests such as test_catalogd_ha.py,
test_concurrent_ddls.py, and test_observability.py.
Add FE tests CatalogdMetaProviderTest.testProfileParallelLoad.
Manually run following query and observe parallel loading by setting
TRACE level log in CatalogdMetaProvider.java.

use functional;
select count(*) from alltypesnopart
union select count(*) from alltypessmall
union select count(*) from alltypestiny
union select count(*) from alltypesagg;

Change-Id: I97a5165844ae846b28338d62e93a20121488d79f
Reviewed-on: http://gerrit.cloudera.org:8080/23436
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-10-13 12:53:47 +00:00
pranav.lodha
a77fec6391 IMPALA-13661: Support parallelism above JDBC tables for joins/aggregates
Impala's planner generates a single-fragment, single-
threaded scan node for queries on JDBC tables because table
statistics are not properly available from the external
JDBC source. As a result, even large JDBC tables are
executed serially, causing suboptimal performance for joins,
aggregations, and scans over millions of rows.

This patch enables Impala to estimate the number of rows in a JDBC
table by issuing a COUNT(*) query at query preparation time. The
estimation is returned via TPrepareResult.setNum_rows_estimate()
and propagated into DataSourceScanNode. The scan node then uses
this cardinality to drive planner heuristics such as join order,
fragment parallelization, and scanner thread selection.

The design leverages the existing JDBC accessor layer:
- JdbcDataSource.prepare() constructs the configuration and invokes
  GenericJdbcDatabaseAccessor.getTotalNumberOfRecords().
- The accessor wraps the underlying query in:
      SELECT COUNT(*) FROM (<query>) tmptable
  ensuring correctness for both direct table scans and parameterized
  query strings.
- The result is captured as num_rows_estimate, which is then applied
  during computeStats() in DataSourceScanNode.
With accurate (or approximate) row counts, the planner can now:
- Assign multiple scanner threads to JDBC scan nodes instead of
   falling back to a single-thread plan.
- Introduce exchange nodes where beneficial, parallelizing data
   fetches across multiple JDBC connections.
- Produce better join orders by comparing JDBC row cardinalities
   against native Impala tables.
- Avoid severe underestimation that previously defaulted to wrong
   table statistics, leading to degenerate plans.

For a sample join query mentioned in the test file,
these are the improvements:

Before Optimization:
- Cardinality fixed at 1 for all JDBC scans
- Single fragment, single thread per query
- Max per-host resource reservation: ~9.7 MB, 1 thread
- No EXCHANGE or MERGING EXCHANGE operators
- No broadcast distribution; joins executed serially
- Example query runtime: ~77s

SCAN JDBC A
   \
    HASH JOIN
       \
        SCAN JDBC B
           \
            HASH JOIN
               \
                SCAN JDBC C
                   \
                    TOP-N -> ROOT

After Optimization:
- Cardinality derived from COUNT(*) (e.g. 150K, 1.5M rows)
- Multiple fragments per scan, 7 threads per query
- Max per-host resource reservation: ~123 MB, 7 threads
- Plans include EXCHANGE and MERGING EXCHANGE operators
- Broadcast joins on small sides, improving parallelism
- Example query runtime: ~38s (~2x faster)

SCAN JDBC A --> EXCHANGE(SND) --+
                                  \
                                   EXCHANGE(RCV) -> HASH JOIN(BCAST) --+
SCAN JDBC B --> EXCHANGE(SND) ----/                                   \
                                                                         HASH JOIN(BCAST) --+
SCAN JDBC C --> EXCHANGE(SND) ------------------------------------------/                 \
                                                                                             TOP-N
                                                                                               \
                                                                                                MERGING EXCHANGE -> ROOT

Also added a new backend configuration flag
--min_jdbc_scan_cardinality (default: 10) to provide a
lower bound for scan node cardinality estimates
during planning. This flag is propagated from BE
to FE via TBackendGflags and surfaced through
BackendConfig, ensuring the planner never produces
unrealistically low cardinality values.

TODO: Add a query option for this optimization
to avoid extra JDBC round trip for smaller
queries (IMPALA-14417).

Testing: All cases of Planner tests are written in
jdbc-parallel.test. Some basic metrics
are also mentioned in the commit message.

Change-Id: If47d29bdda5b17a1b369440f04d4e209d12133d9
Reviewed-on: http://gerrit.cloudera.org:8080/23112
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
2025-10-04 15:42:38 +00:00
Venu Reddy
ebbc67cf40 IMPALA-13801: Support greatest synced event with hierarchical metastore event processing
It is a follow-up jira/commit to IMPALA-12709. IMPALA-12152 and
IMPALA-12785 are affected when hierarchical metastore event
processing feature is enabled.

Following changes are incorporated with this patch:
1. Added creationTime_ and dispatchTime_ fields in MetastoreEvent
   class to store the current time in millisec. They are used to
   calculate:
   a) Event dispatch time(time between a MetastoreEvent object
      creation and when event is moved to inProgressLog_ of
      EventExecutorService after dispatching it to a
      DbEventExecutor).
   b) Event schedule delays incurred at DbEventExecutors and
      TableEventExecutors(time between an event moved to
      EventExecutorService's inProgressLog_ and before start of
      processing event at appropriate DbEventExecutor and
      TableEventExecutor).
   c) Event process time from EventExecutorService point of
      view(time spent in inProgressLog_ before it is moved to
      processedLog_).
   Logs are added to show the event dispatch time, schedule
   delays, process time from EventExecutorService point of
   view for each event. Also a log is added to show the time
   taken for event's processIfEnabled().
2. Added isDelimiter_ field in MetastoreEvent class to indicate
   whether it is a delimiter event. It is set only when
   hierarchical event processing is enabled. Delimiter is a kind
   of metastore event that do not require event processing.
   Delimeter event can be:
   a) A CommitTxnEvent that do not have any write event info for
      a given transaction.
   b) An AbortTxnEvent that do not have write ids for a given
      transaction.
   c) An IgnoredEvent.
   An event is determined and marked as delimiter in
   EventExecutorService#dispatch(). They are not queued to a
   DbEventExecutor for processing. They are just maintained in
   the inProgressLog_ to preserve continuity and correctness in
   synchronization tracking. The delimiter events are removed from
   inProgressLog_ when their preceding non-delimiter metastore
   event is removed from inProgressLog_.
3. Greatest synced event id is computed based on the dispatched
   events(inProgressLog_) and processed events(processedLog_) tree
   maps. Greatest synced event is the latest event such that all
   events with id less than or equal to the latest event are
   definitely synced.
4. Lag is calculated as difference between latest event time on HMS
   and the greatest synced event time. It is shown in the log.
5. Greatest synced event id is used in IMPALA-12152 changes. When
   greatest synced event id becomes greater than or equal to
   waitForEventId, all the required events are definitely synced.
6. Event processor is paused gracefully when paused with command in
   IMPALA-12785. This ensures that all the fetched events from HMS in
   current batch are processed before the event processor is fully
   paused. It is necessary to process the current batch of events
   because, certain events like AllocWriteIdEvent, AbortTxnEvent and
   CommitTxnEvent update table write ids in catalog upon metastore
   event object creation. And the table write ids are later updated
   to appropriate table object during their event process. Can lead
   to inconsistent state of write ids on table objects when paused
   abruptly in the middle of current batch of event processing.
7. Added greatest synced event id and event time in events processor
   metrics. And updated description of lag, pending events, last
   synced event id and event time metrics.
8. Atomically update the event queue and increment outstanding event
   count in enqueue methods of both DbProcessor and TableProcessor
   so that respective process methods do not process the event until
   event is added to queue and outstanding event count is incremented.
   Otherwise, event can get processed, outstanding event count gets
   decremented before it is incremented in enqueue method.
9. Refactored DbEventExecutor, DbProcessor, TableEventExecutor and
   TableProcessor classes to propapage the exception occurred along
   with event during event processing. EventProcessException is a
   wrapper added to hold reference to event being processed and
   exception occurred.
10.Added AcidTableWriteInfo helper class to store table, writeids
   and partitions for the transaction id received in CommitTxnEvent.

Testing:
 - Added new tests and executed existing end to end tests.
 - Have executed the existing tests with hierarchical event processing
   enabled.

Change-Id: I26240f36aaf85125428dc39a66a2a1e4d3197e85
Reviewed-on: http://gerrit.cloudera.org:8080/22997
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
2025-09-26 10:53:46 +00:00
Joe McDonnell
e05d92cb3d IMPALA-13548: Schedule scan ranges oldest to newest for tuple caching
Scheduling does not sort scan ranges by modification time. When a new
file is added to a table, its order in the list of scan ranges is
not based on modification time. Instead, it is based on which partition
it belongs to and what its filename is. A new file that is added early
in the list of scan ranges can cause cascading differences in scheduling.
For tuple caching, this means that multiple runtime cache keys could
change due to adding a single file.

To minimize that disruption, this adds the ability to sort the scan
ranges by modification time and schedule scan ranges oldest to newest.
This enables it for scan nodes that feed into tuple cache nodes
(similar to deterministic scan range assignment).

Testing:
 - Modified TestTupleCacheFullCluster::test_scan_range_distributed
   to have stricter checks about how many cache keys change after
   an insert (only one should change)
 - Modified TupleCacheTest#testDeterministicScheduling to verify that
   oldest to newest scheduling is also enabled.

Change-Id: Ia4108c7a00c6acf8bbfc036b2b76e7c02ae44d47
Reviewed-on: http://gerrit.cloudera.org:8080/23228
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-22 20:51:46 +00:00
Joe McDonnell
ca356a8df5 IMPALA-13437 (part 2): Implement cost-based tuple cache placement
This changes the default behavior of the tuple cache to consider
cost when placing the TupleCacheNodes. It tries to pick the best
locations within a budget. First, it eliminates unprofitable locations
via a threshold. Next, it ranks the remaining locations by their
profitability. Finally, it picks the best locations in rank order
until it reaches the budget.

The threshold is based on the ratio of processing cost for regular
execution versus the processing cost for reading from the cache.
If the ratio is below the threshold, the location is eliminated.
The threshold is specified by the tuple_cache_required_cost_reduction_factor
query option. This defaults to 3.0, which means that the cost of
reading from the cache must be less than 1/3 the cost of computing
the value normally. A higher value makes this more restrictive
about caching locations, which pushes in the direction of lower
overhead.

The ranking is based on the cost reduction per byte. This is given
by the formula:
 (regular processing cost - cost to read from cache) / estimated serialized size
This prefers locations with small results or high reduction in cost.

The budget is based on the estimated serialized size per node. This
limits the total caching that a query will do. A higher value allows more
caching, which can increase the overhead on the first run of a query. A lower
value is less aggressive and can limit the overhead at the expense of less
caching. This uses a per-node limit as the limit should scale based on the
size of the executor group as each executor brings extra capacity. The budget
is specified by the tuple_cache_budget_bytes_per_executor.

The old behavior to place the tuple cache at all eligible locations is
still available via the tuple_cache_placement_policy query option. The
default is the cost_based policy described above, but the old behavior
is available via the all_eligible policy. This is useful for correctness
testing (and the existing tuple cache test cases).

This changes the explain plan output:
 - The hash trace is only enabled at VERBOSE level. This means that the regular
   profile will not contain the hash trace, as the regular profile uses EXTENDED.
 - This adds additional information at VERBOSE to display the cost information
   for each plan node. This can help trace why a particular location was
   not picked.

Testing:
 - This adds a TPC-DS planner test with tuple caching enabled (based on the
   existing TpcdsCpuCostPlannerTest)
 - This modifies existing tests to adapt to changes in the explain plan output

Change-Id: Ifc6e7b95621a7937d892511dc879bf7c8da07cdc
Reviewed-on: http://gerrit.cloudera.org:8080/23219
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-18 21:02:51 +00:00
Noemi Pap-Takacs
821c7347d1 IMPALA-13267: Display number of partitions for Iceberg tables
Before this change, query plans and profile reported only a single
partition even for partitioned Iceberg tables, which was misleading
for users.
Now we can display the number of scanned partitions correctly for
both partitioned and unpartitioned Iceberg tables. This is achieved by
extracting the partition values from the file descriptors and storing
them in the IcebergContentFileStore. Instead of storing this information
redundantly in all file descriptors, we store them in one place and
reference the partition metadata in the FDs with an id.
This also gives the opportunity to optimize memory consumption in the
Catalog and Coordinator as well as reduce network traffic between them
in the future.

Time travel is handled similarly to oldFileDescMap. In that case
we don't know the total number of partitions in the old snapshot,
so the output is [Num scanned partitions]/unknown.

Testing:
 - Planner tests
 - E2E tests
   - partition transforms
   - partition evolution
   - DROP PARTITION
   - time travel

Change-Id: Ifb2f654bc6c9bdf9cfafc27b38b5ca2f7b6b4872
Reviewed-on: http://gerrit.cloudera.org:8080/23113
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-12 20:36:10 +00:00
jichen0919
826c8cf9b0 IMPALA-14081: Support create/drop paimon table for impala
This patch mainly implement the creation/drop of paimon table
through impala.

Supported impala data types:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE

Syntax for creating paimon table:

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
(
[col_name data_type ,...]
[PRIMARY KEY (col1,col2)]
)
[PARTITIONED BY (col_name data_type [COMMENT 'col_comment'], ...)]
STORED AS PAIMON
[LOCATION 'hdfs_path']
[TBLPROPERTIES (
'primary-key'='col1,col2',
'file.format' = 'orc/parquet',
'bucket' = '2',
'bucket-key' = 'col3',
];

Two types of paimon catalogs are supported.

(1) Create table with hive catalog:

CREATE TABLE paimon_hive_cat(userid INT,movieId INT)
STORED AS PAIMON;

(2) Create table with hadoop catalog:

CREATE [EXTERNAL] TABLE paimon_hadoop_cat
STORED AS PAIMON
TBLPROPERTIES('paimon.catalog'='hadoop',
'paimon.catalog_location'='/path/to/paimon_hadoop_catalog',
'paimon.table_identifier'='paimondb.paimontable');

SHOW TABLE STAT/SHOW COLUMN STAT/SHOW PARTITIONS/SHOW FILES
statements are also supported.

TODO:
    - Patches pending submission:
        - Query support for paimon data files.
        - Partition pruning and predicate push down.
        - Query support with time travel.
        - Query support for paimon meta tables.
    - WIP:
        - Complex type query support.
        - Virtual Column query support for querying
          paimon data table.
        - Native paimon table scanner, instead of
          jni based.
Testing:
    - Add unit test for paimon impala type conversion.
    - Add unit test for ToSqlTest.java.
    - Add unit test for AnalyzeDDLTest.java.
    - Update default_file_format TestEnumCase in
      be/src/service/query-options-test.cc.
    - Update test case in
      testdata/workloads/functional-query/queries/QueryTest/set.test.
    - Add test cases in metadata/test_show_create_table.py.
    - Add custom test test_paimon.py.

Change-Id: I57e77f28151e4a91353ef77050f9f0cd7d9d05ef
Reviewed-on: http://gerrit.cloudera.org:8080/22914
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
2025-09-10 21:24:49 +00:00
Zoltan Borok-Nagy
711797e7fb IMPALA-14349: Encode FileDescriptors in time in loading Iceberg Tables
With this patch we create Iceberg file descriptors from
LocatedFileStatus objects during IcebergFileMetadataLoader's
parallelListing(). This has the following benefits:
 * We parallelize the creation of Iceberg file descriptor objects
 * We don't need to maintain a large hash map with all the
   LocatedFileStatus objects at once. Now we only need to keep a few
   LocatedFileStatus objects per partition in memory while we are
   converting them to Iceberg file descriptors. I.e., the GC is free to
   destroy the LocatedFileStatus objects we don't use anymore.

This patch retires startup flag 'iceberg_reload_new_files_threshold'.
Since IMPALA-13254 we only list partitions that have new data files,
and we load them in parallel, i.e. efficient incremental table loading
is already covered. From that point the startup flag only added
unnecessary code complexity.

Measurements

I created two tables (from tpcds.store_sales) to measure table loading
times for large tables:

Table #1:
  PARTITIONED BY SPEC(ss_item_sk, BUCKET(5, ss_sold_time_sk))
  partitions: 107818
  files: 754726

Table #2:
  PARTITIONED BY SPEC(ss_item_sk)
  partitions: 18000
  files: 504224

Time taken in IcebergFileMetadataLoader.load() during full table reload:
+----------+-------+------+---------+
|          | Base  | New  | Speedup |
+----------+-------+------+---------+
| Table #1 | 17.3s | 8.1s |    2.14 |
| Table #2 |  7.8s | 4.3s |     1.8 |
+----------+-------+------+---------+

I measured incremental table loading only for Table #2 (since there are
more files per partition this is the worse scenario for the new code, as
it only uses file listings, and each new file were created in a separate
partition)

Time taken in IcebergFileMetadataLoader.load() during incremental table
reload:
+------------+------+------+---------+
| #new files | Base | New  | Speedup |
+------------+------+------+---------+
|          1 | 1.4s | 1.6s |     0.9 |
|        100 | 1.5s | 1.9s |     0.8 |
|        200 | 1.5s | 1.5s |       1 |
+------------+------+------+---------+

We lose a few tenths of a second, but I think the simplified code
justifies it.

Testing:
 * some tests were updated because we we don't have
   startup flag 'iceberg_reload_new_files_threshold' anymore

Change-Id: Ia1c2a7119d76db7ce7c43caec2ccb122a014851b
Reviewed-on: http://gerrit.cloudera.org:8080/23363
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-09 20:34:01 +00:00
Riza Suminto
cc1cbb559a IMPALA-14263: Add broadcast_cost_scale_factor option
This commit enhances the distributed planner's costing model for
broadcast joins by introducing the `broadcast_cost_scale_factor` query
option. This option enables users to fine-tune the planner's decision
between broadcast and partitioned joins.

Key changes:
- The total broadcast cost is scaled by the new
  `broadcast_cost_scale_factor` query option, allowing users to favor or
  penalize broadcast joins as needed when setting query hint is not
  feasible.
- Updated the planner logic and test cases to reflect the new costing
  model and options.

This addresses scenarios where the default costing could lead to
suboptimal join distribution choices, particularly in a large-scale
cluster where the number of executors can increase broadcast cost, while
choosing a partitioned strategy can lead to data skew. Admin can set
`broadcast_cost_scale_factor` less than 1.0 to make DistributedPlanner
favor broadcast more than partitioned join (with possible downside of
higher memory usage per query and higher network transmission).

Existing query hints still take precedence over this option. Note that
this option is applied independent of `broadcast_to_partition_factor`
option (see IMPALA-10287). In MT_DOP>1 setup, it should be sufficient to
set `use_dop_for_costing=True` and tune `broadcast_to_partition_factor`
only.

Testing:
Added FE tests.

Change-Id: I475f8a26b2171e87952b69f66a5c18f77c2b3133
Reviewed-on: http://gerrit.cloudera.org:8080/23258
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Aman Sinha <amsinha@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-08-06 21:37:07 +00:00
stiga-huang
aec7380b75 IMPALA-14283: Invalidate the cache when served by a new catalogd
Before this patch, coordinator just invalidates the catalog cache when
witness the catalog service id changes in DDL/DML responses or
statestore catalog updates. This is enough in the legacy catalog mode
since these are the only ways that coordinator gets metadata from
catalogd. However, in local catalog mode, coordinator sends
getPartialCatalogObject requests to fetch metadata from catalogd. If the
request is now served by a new catalogd (e.g. due to HA failover),
coordinator should invalidate its catalog cache in case catalog version
overlaps on the same table and unintentionally reuse stale metadata.

To ensure performance, catalogServiceIdLock_ in CatalogdMetaProvider is
refactored to be a ReentrantReadWriteLock. Most of the usages on it just
need the read lock.

This patch also adds the catalog service id in the profile.

Tests:
 - Ran test_warmed_up_metadata_failover_catchup 50 times.
 - Ran FE tests: CatalogdMetaProviderTest and LocalCatalogTest.
 - Ran CORE tests

Change-Id: I751e43f5d594497a521313579defc5b179dc06ce
Reviewed-on: http://gerrit.cloudera.org:8080/23236
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Quanlong Huang <huangquanlong@gmail.com>
2025-08-04 23:02:37 +00:00
Joe McDonnell
07a5716773 IMPALA-13892: Add support for printing STRUCTs
Tuple cache correctness verification is failing as the code in
debug-util.cc used for printing the text version of tuples does
not support printing structs. It hits a DCHECK and kills Impala.

This adds supports for printing structs to debug-util.cc, fixing
tuple cache correctness verification for complex types. To print
structs correctly, each slot needs to know its field name. The
ColumnType has this information, but it requires a field idx to
lookup the name. This is the last index in the absolute path for
this slot. However, the materialized path can be truncated to
remove some indices at the end. Since we need that information to
resolve the field name, this adds the struct field idx to the
TSlotDescriptor to pass it to the backend.

This also adds a counter to the profile to track when correctness
verification is on. This is useful for testing.

Testing:
 - Added a custom cluster test using nested types with
   correctness verification
 - Examined some of the text files

Change-Id: Ib9479754c2766a9dd6483ba065e26a4d3a22e7e9
Reviewed-on: http://gerrit.cloudera.org:8080/23075
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Joe McDonnell <joemcdonnell@cloudera.com>
2025-07-23 16:15:30 +00:00
stiga-huang
64abca481f IMPALA-14227: In HA failover, passive catalogd should apply pending HMS events before being active
After IMPALA-14074, the passive catalogd can have a warmed up metadata
cache during failover (with catalogd_ha_reset_metadata_on_failover=false
and a non-empty warmup_tables_config_file). However, it could still use
a stale metadata cache when some pending HMS events generated by the
previous active catalogd are not applied yet.

This patch adds a wait during HA failover to ensure HMS events before
the failover happens are all applied on the new active catalogd. The
timeout is configured by a new flag which defaults to 300 (5 minutes):
catalogd_ha_failover_catchup_timeout_s. When timeout happens, by default
catalogd will fallback to resetting all metadata. Users can decide
whether to reset or continue using the current cache. This is configured
by another flag, catalogd_ha_reset_metadata_on_failover_catchup_timeout.

Since the passive catalogd depends on HMS event processing to keep its
metadata up-to-date with the active catalogd, this patch adds validation
to avoid starting catalogd with catalogd_ha_reset_metadata_on_failover
set to false and hms_event_polling_interval_s <= 0.

This patch also makes catalogd_ha_reset_metadata_on_failover a
non-hidden flag so it's shown in the /varz web page.

Tests:
 - Ran test_warmed_up_metadata_after_failover 200 times. Without the
   fix, it usually fails in several runs.
 - Added new tests for the new flags.

Change-Id: Icf4fcb0e27c14197f79625749949b47c033a5f31
Reviewed-on: http://gerrit.cloudera.org:8080/23174
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-07-17 17:30:19 +00:00
Riza Suminto
c2705fa480 IMPALA-14076: Improve readability of workload management query
This patch improve the readability of workload management's insert dml
query profiles by:
1. Add a newline between each entry in the VALUES clause.
2. Remove analyzed query from the PLAN column in both tables.

For second one, a new query option HIDE_ANALYZED_QUERY is added. If this
option is set to True, 'Analyzed query' will not be printed in Plan
section of runtime profile. This is helpful for long SQL such as
workload management's insert dml query.

Testing:
- Add explain test case for HIDE_ANALYZED_QUERY option.
- Manually run some queries in minicluster with enabled workload
  management. Confirmed that both improvement happen in DML runtime
  profile.

Change-Id: I30576795dbc2af27a6879684f3757becfd8fc8d0
Reviewed-on: http://gerrit.cloudera.org:8080/23085
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-07-15 03:51:13 +00:00
Csaba Ringhofer
9f12714d1c IMPALA-14224: Cleanup subdirectories in TRUNCATE
If an external table contains data files in subdirectories, and
recursive listing is enabled, Impala considers the files in the
subdirectories as part of the table. However, currently INSERT OVERWRITE
and TRUNCATE do not always delete these files, leading to data
corruption.

This change takes care of TRUNCATE.

Currently TRUNCATE can be run in two different ways:
 - if the table is being replicated, the HMS api is used
 - otherwise catalogd deletes the files itself.
Two differences between these methods are:
 - calling HMS leads to an ALTER_TABLE event
 - calling HMS leads to recursive delete while catalogd only
   deletes files directly in the partition/table directory.

This commit introduces the '--truncate_external_tables_with_hms' startup
flag, with default value 'true'. If this flag is set to true, Impala
always uses the HMS api for TRUNCATE operations.

Note that HMS always deletes stats on TRUNCATE, so setting the
DELETE_STATS_IN_TRUNCATE query option to false is not supported if
'--truncate_external_tables_with_hms' is set to true: an exception is
thrown.

Testing:
 - extended the tests in test_recursive_listing.py::TestRecursiveListing
   to include TRUNCATE
 - Moved tests with DELETE_STATS_IN_TRUNCATE=0 from truncate-table.test
   to truncate-table-no-delete-stats.test, which is run in a new custom
   cluster test (custom_cluster/test_no_delete_stats_in_truncate.py).

Change-Id: Ic0fcc6cf1eca8a0bcf2f93dbb61240da05e35519
Reviewed-on: http://gerrit.cloudera.org:8080/23166
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-07-14 22:29:48 +00:00
Daniel Vanko
c446291ccf IMPALA-13074: Add sink node to Web UI's graphical plan for DDL/DML queries
From now on if a plan fragment's root data sink is a table sink, a
multi data sink or a merge sink, it will be included in the json
response and shown on the Web UI as parent of the plan fragment.

Testing
 * adopted and refined impala-http-handler-test
 * added new tests for related sink types
 * tested manually on WebUI with
   - CTAS statements
   - UPDATE statements on Iceberg tables
   - DELETE statements on Iceberg tables
   - MERGE statements on Iceberg tables

Change-Id: Ib2bd442f6499efde7406d87c2b1fd1b46a45381b
Reviewed-on: http://gerrit.cloudera.org:8080/22496
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Noemi Pap-Takacs <npaptakacs@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
2025-07-14 13:03:49 +00:00
stiga-huang
da190f1d86 IMPALA-14074: Warmup metadata cache in catalogd for critical tables
*Background*

Catalogd starts with a cold metadata cache - only the db/table names and
functions are loaded. Metadata of a table is unloaded until there are
queries submitted on the table. The first query will suffer from the
delay of loading metadata. There is a flag,
--load_catalog_in_background, to let catalogd eagerly load metadata of
all tables even if no queries come. Catalogd may load metadata for
tables that are possibly never used, potentially increasing catalog size
and consequently memory usage. So this flag is turned off by default and
not recommended to be used in production.

Users do need the metadata of some critical tables to be loaded. Before
that the service is considered not ready since important queries might
fail in timeout. When Catalogd HA is enabled, it’s also required that
the standby catalogd has an up-to-date metadata cache to smoothly take
over the active one when failover happens.

*New Flags*

This patch adds a startup flag for catalogd to specify a config file
containing tables that users want their metadata to be loaded. Catalogd
adds them to the table loading queue in background when a catalog reset
happens, i.e. at catalogd startup or global INVALIDATE METADATA runs.

The flag is --warmup_tables_config_file. The value can be a path in the
local FS or in remote storage (e.g. HDFS). E.g.
  --warmup_tables_config_file=file:///opt/impala/warmup_table_list.txt
  --warmup_tables_config_file=hdfs:///tmp/warmup_table_list.txt

Each line in the config file can be a fully qualified table name or a
wildcard under a db, e.g. "tpch.*". Catalogd loads the table names at
startup and schedules loading on them after a reset of the catalog. The
scheduling order is based on the order in the config file. So important
tables can be put first. Comments start with "#" or "//" are ignored in
the config file.

Another flag, --keeps_warmup_tables_loaded (defaults to false), is added
to control whether to reload the table after it’s been invalidated,
either by an explicit INVALIDATE METADATA <table> command or implicitly
invalidated by CatalogdTableInvalidator or HMS RELOAD events.

When CatalogdTableInvalidator is enabled with
--invalidate_tables_on_memory_pressure=true, users shouldn’t set
keeps_warmup_tables_loaded to true if the catalogd heap size is not
enough to cache metadata of all these tables. Otherwise, these tables
will keep being loaded and invalidated.

*Catalogd HA Changes*
When Catalogd HA is enabled, the standby catalogd will also reset its
catalog and start loading metadata of these tables, after the HA state
(active/standby) is determined. Standby catalogd keeps its metadata
cache up-to-date by applying HMS notification events. To support a
warmed up switch, --catalogd_ha_reset_metadata_on_failover should be set
to false.

*Limitation*
The standby catalogd could still have a stale cache if there are
operations in the active catalogd that don’t trigger HMS notification
events, or if the HMS notification event is not applied correctly. E.g.
Adding a new native function generates an ALTER_DATABASE event, but when
applying the event, native function list of the db is not refreshed
(IMPALA-14210). These will be resolved in separate JIRAs.

*Test*
 - Added FE unit tests.
 - Added e2e test for local/hdfs config files.
 - Added e2e test to verify the standby catalogd has a warmed up cache
   when failover happens.

Change-Id: I2d09eae1f12a8acd2de945984d956d11eeee1ab6
Reviewed-on: http://gerrit.cloudera.org:8080/23155
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-07-12 18:50:56 +00:00
Riza Suminto
0b1a32fad8 IMPALA-13850 (part 4): Implement in-place reset for CatalogD
This patch improve the availability of CatalogD under huge INVALIDATE
METADATA operation. Previously, CatalogServiceCatalog.reset() hold
versionLock_.writeLock() for the whole reset duration. When the number
of database, tables, or functions are big, this write lock can be held
for a long time, preventing any other catalog operation from proceeding.

This patch improve the situation by:
1. Making CatalogServiceCatalog.reset() rebuild dbCache_ in place and
   occasionally release the write lock between rebuild stages.
2. Fetch databases, tables, and functions metadata from MetaStore in
   background using ExecutorService. Added catalog_reset_max_threads
   flag to control number of threads to do parallel fetch.

In order to do so, lexicographic order must be enforced during reset()
and ensure all Db invalidation within a single stage is complete before
releasing the write lock. Stages should run in approximately the same
amount of time. A catalog operation over a database must ensure that no
reset operation is currently running, or the database name is
lexicographically less than the current database-under-invalidation.

This patch adds CatalogResetManager to do background metadata fetching
and provide helper methods to help facilitate waiting for reset
progress. CatalogServiceCatalog must hold the versionLock_.writeLock()
before calling most of CatalogResetManager methods.

These are methods in CatalogServiceCatalog class that must wait for
CatalogResetManager.waitOngoingMetadataFetch():

addDb()
addFunction()
addIncompleteTable()
addTable()
invalidateTableIfExists()
removeDb()
removeFunction()
removeTable()
renameTable()
replaceTableIfUnchanged()
tryLock()
updateDb()
InvalidateAwareDbSnapshotIterator.hasNext()

Concurrent global IM must wait until currently running global IM
complete. The waiting happens by calling waitFullMetadataFetch().

CatalogServiceCatalog.getAllDbs() get a snapshot of dbCache_ values at a
time. With this patch, it is now possible that some Db in this snapshot
maybe removed from dbCache() by concurrent reset(). Caller that cares
about snapshot integrity like CatalogServiceCatalog.getCatalogDelta()
should be careful when iterating the snapshot. It must iterate in
lexicographic order, similar like reset(), and make sure that it does
not go beyond the current database-under-invalidation. It also must skip
the Db that it is currently being inspected if Db.isRemoved() is True.
Added helper class InvalidateAwareDbSnapshot for this kind of iteration

Override CatalogServiceCatalog.getDb() and
CatalogServiceCatalog.getDbs() to wait until first reset metadata
complete or looked up Db found in cache.

Expand test_restart_catalogd_twice to test_restart_legacy_catalogd_twice
and test_restart_local_catalogd_twice. Update
CustomClusterTestSuite.wait_for_wm_init_complete() to correctly pass
timeout values to helper methods that it calls. Reduce cluster_size from
10 to 3 in few tests of test_workload_mgmt_init.py to avoid flakiness.

Fixed HMS connection leak between tests in AuthorizationStmtTest (see
IMPALA-8073).

Testing:
- Pass exhaustive tests.

Change-Id: Ib4ae2154612746b34484391c5950e74b61f85c9d
Reviewed-on: http://gerrit.cloudera.org:8080/22640
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
2025-07-09 14:05:04 +00:00
Riza Suminto
c5072807df IMPALA-12337: Implement delete orphan files for Iceberg table
This patch implements delete orphan files query for Iceberg table.

The following statement becomes available for Iceberg tables:
 - ALTER TABLE <tbl> EXECUTE remove_orphan_files(<timestamp>)

The bulk of implementation copies Hive's implementation of
org.apache.iceberg.actions.DeleteOrphanFiles interface (HIVE-27906,
6b2e21a93ef3c1776b689a7953fc59dbf52e4be4), which this patch rename to
ImpalaIcebergDeleteOrphanFiles.java. Upon execute(),
ImpalaIcebergDeleteOrphanFiles class instance will gather all URI of
valid data files and Iceberg metadata files using Iceberg API. These
valid URIs then will be compared to recursive file listing obtained
through Hadoop FileSystem API under table's 'data' and 'metadata'
directory accordingly. Any unmatched URI from FileSystem API listing
that has modification time less than 'olderThanTimestamp' parameter will
then be removed via Iceberg FileIO API of given Iceberg table. Note that
this is a destructive query that will wipe out any files within Iceberg
table's 'data' and 'metadata' directory that is not addressable by any
valid snapshots.

The execution happens in CatalogD via
IcebergCatalogOpExecutor.alterTableExecuteRemoveOrphanFiles(). CatalogD
supplied CatalogOpExecutor.icebergExecutorService_ as executor service
to execute the Iceberg API planFiles and FileIO API for deletion.

Also fixed toSql() implementation for all ALTER TABLE EXECUTE queries.

Testing:
- Add FE and EE tests.

Change-Id: I5979cdf15048d5a2c4784918533f65f32e888de0
Reviewed-on: http://gerrit.cloudera.org:8080/23042
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
2025-06-30 15:05:12 +00:00
Joe McDonnell
7b25a7b070 IMPALA-13887: Incorporate column/field information into cache key
The correctness verification for the tuple cache found an issue
with TestParquet::test_resolution_by_name(). The test creates a
table, selects, alters the table to change a column name, and
selects again. With parquet_fallback_schema_resolution=NAME, the
column names determine behavior. The tuple cache key did not
include the column names, so it was producing an incorrect result
after changing the column name.

This change adds information about the column / field name to the
TSlotDescriptor so that it is incorporated into the tuple cache key.
This is only needed when producing the tuple cache key, so it is
omitted for other cases.

Testing:
 - Ran TestParquet::test_resolution_by_name() with correctness
   verification
 - Added custom cluster test that runs the test_resolution_by_name()
   test case with tuple caching. This fails without this change.

Change-Id: Iebfa777452daf66851b86383651d35e1b0a5f262
Reviewed-on: http://gerrit.cloudera.org:8080/23073
Reviewed-by: Yida Wu <wydbaggio000@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-06-27 18:13:19 +00:00
Csaba Ringhofer
85a2211bfb IMPALA-10349: Support constant folding for non ascii strings
Before this patch constant folding only converted the result of an
expression to StringLiteral if all characters were ASCII. The
change allows both UTF8 strings with non ascii characters and
byte arrays that are not valid UTF8 strings - the latter can
occur when constant folding is applied to BINARY columns,
for example in geospatial functions like st_polygon().

The main goal is being able to push down more predicates, e.g.
before that patch a filter like col="á" couldn't be pushed down
to Iceberg/Kudu/Parquet stat filtering, as all these expect literals.

Main changes:
- TStringLiteral uses a binary instead of a string member.
  This doesn't affect BE as in c++ both types are compiled
  to std::string. In Jave a java.nio.ByteBuffer is used instead of
  String.
- StringLiteral uses a byte[] member to store the value of
  the literal in case it is not valid UTF8 and cannot be
  represented as Java String. In other cases still a String
  is used to keep the change minimal, though it may be more
  optimal to use UTF8 byte[] due to the smaller size. Always
  converting from byte[] to String may be costy in the catalog
  as partition values are stored as *Literals and rest of the
  catalog operates on String.
- StringLiteral#compareTo() is switched to byte wise compare on byte[]
  to be consistent with BE. This was not needed for ASCII strings
  as Java String behaves the same way in that case, but non-ASCII
  can have different order (note that Impala does not support
  collations).
- When an invalid UTF8 StringLiteral is printed, for example in
  case of EXPLAIN output, then it is printed as
  unhex("<byte array in hexadecimal>"). This is a non-lossy way to
  represent it, but it may be too verbose in some cases, e.g. for
  large polygons. A follow up commit may refine this, e.g. by
  limiting the max size printed.

An issue found while implementing this is that INSERT does not
handle invalid UTF8 partition values correctly, see IMPALA-14096.
This behavior is not changed in the patch.

Testing:
- Added a few tests that push down non-ascii const expressions in
  predicates (both with utf8_mode=true and false).

Change-Id: I70663457a0b0a3443e586350f0a5996bb75ba64a
Reviewed-on: http://gerrit.cloudera.org:8080/22603
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-06-25 18:22:31 +00:00
Joe McDonnell
14597c7e2f IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
This makes two changes to deflake test_tuple_cache_tpc_queries.py.
First, it increases the runtime filter wait time from 60 seconds to
600 seconds. The correctness verification slows down the path
that produces the runtime filter. The slowdown is dependent on
the speed of storage, so this can get very slow on test machines.

Second, this skips correctness checking for locations that are just
after streaming aggregations. Streaming aggregations can produce
variable output that the correctness checking can't handle.
For example a grouping aggregation computing a sum might have
a preaggregation produce either (A: 3) or (A: 2), (A: 1) or
(A: 1), (A: 1), (A: 1). The finalization sees these as equivalent.
This marks the nodes as variable starting with the preaggregation
and clears the mark at the finalize stage.

When skipping correctness checking, the tuple cache node does not
hit the cache normally. This guarantees that its children will run
and go through correctness checking.

Testing:
 - Ran test_tuple_cache_tpc_queries.py locally
 - Added a frontend test for this specific case

Change-Id: If5e1be287bdb489a89aea3b2d7bec416220feb9a
Reviewed-on: http://gerrit.cloudera.org:8080/23010
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
2025-06-12 15:48:32 +00:00
Riza Suminto
ccb8eac10a IMPALA-14075: Add CatalogOpExecutor.icebergExecutorService_
Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a
single thread. It can be really slow on cloud storage systems,
especially if the operation needs to remove lots of files.

This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize
Iceberg API call that supports passing ExecutorService, such as
ExpireSnapshots.executeDeleteWith(). Number of threads for this executor
service is controlled by CatalogD flag --iceberg_catalog_num_threads. It
is default to 16, same as --num_metadata_loading_threads default value.

Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match
with other validators in backend-gflag-util.cc.

Testing:
- Lower sleep time between insert queries from 5s to 1s in
  test_expire_snapshots and test_describe_history_params to speed up
  tests.
- Manually verify that 'IcebergCatalogThread' threads are visible in
  /jvm-threadz page of CatalogD.
- Pass test_iceberg.py.

Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485
Reviewed-on: http://gerrit.cloudera.org:8080/22980
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-06-10 14:40:52 +00:00
Mihaly Szjatinya
4837cedc79 IMPALA-10319: Support arbitrary encodings on Text files
As proposed in Jira, this implements decoding and encoding of text
buffers for Impala/Hive text tables. Given a table with
'serialization.encoding' property set, similarly to Hive, Impala should
be able to encode the inserted data into charset specified, consequently
saving it into a text file. The opposite decoding operation should be
performed upon reading data buffers from text files. Both operations
employ boost::locale::conv library.

Since Hive doesn't encode line delimiters, charsets that would have
delimiters stored differently from ASCII are not allowed.

One difference from Hive is that Impala implements
'serialization.encoding' only as a per partition serdeproperty to avoid
confusion of allowing both serde and tbl properties. (See related
IMPALA-13748)

Note: Due to precreated non-UTF-8 files present in the patch
'gerrit-code-review-checks' was performed locally. (See IMPALA-14100)

Change-Id: I787cd01caa52a19d6645519a6cedabe0a5253a65
Reviewed-on: http://gerrit.cloudera.org:8080/22049
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-06-01 21:31:00 +00:00
Joe McDonnell
2a680b302e IMPALA-13478: Sync tuple cache files to disk asynchronously
When a tuple cache entry is first being written, we want to
sync the contents to disk. Currently, that happens on the
fast path and delays the query results, sometimes significantly.
This moves the Sync() call off of the fast path by passing
the work to a thread pool. The threads in the pool open
the file, sync it to disk, then close the file. If anything
goes wrong, the cache entry is evicted.

The tuple cache can generate writes very quickly, so this needs
a backpressure mechanism to avoid overwhelming the disk. In
particular, it needs to avoid accumulating dirty buffers to
the point that the OS throttles new writes, delaying the query
fast path. This implements a limit on outstanding writes (i.e.
writes that have not been flushed to disk). To enforce it,
writers now call UpdateWriteSize() to reserve space before
writing. UpdateWriteSize() can fail if it hits the limit on
outstanding writes or if this particular cache entry has hit
the maximum size. When it fails, the writer should abort writing
the cache entry.

Since UpdateWriteSize() is updating the charge in the cache,
the outstanding writes are being counted against the capacity,
triggering evictions. This improves the tuple cache's adherence
to the capacity limit.

The outstanding writes limits is configured via the
tuple_cache_outstanding_write_limit startup flag, which is
either a specific size string (e.g. 1GB) or a percentage of
the process memory limit. To avoid updating the cache charge
very frequently, this has an update chunk size specified
by tuple_cache_outstanding_write_chunk_bytes.

This adds counters at the daemon level:
 - outstanding write bytes
 - number of writes halted due to backpressure
 - number of sync calls that fail (due to IO errors)
 - number of sync calls dropped due to queue backpressure
The runtime profile adds a NumTupleCacheBackpressureHalted
counter that is set when a write hits the outstanding write
limit.

This has a startup option to add randomness to the tuple cache
keys to make it easy to test a scenario with no cache hits.

Testing:
 - Added unit tests to tuple-cache-mgr-test
 - Testing with TPC-DS on a cluster with fast NVME SSDs showed
   a significant improvement in the first-run times due to the
   asynchronous syncs.
 - Testing with TPC-H on a system with a slow disk and zero cache
   hits showed improved behavior with the backpressure

Change-Id: I646bb56300656d8b8ac613cb8fe2f85180b386d3
Reviewed-on: http://gerrit.cloudera.org:8080/22215
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-29 01:41:41 +00:00
stiga-huang
b37f4509fa IMPALA-14089: Support REFRESH on multiple partitions
Currently we just support REFRESH on the whole table or a specific
partition:
  REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

If users want to refresh multiple partitions, they have to submit
multiple statements each for a single partition. This has some
drawbacks:
 - It requires holding the table write lock inside catalogd multiple
   times, which increase lock contention with other read/write
   operations on the same table, e.g. getPartialCatalogObject requests
   from coordinators.
 - Catalog version of the table will be increased multiple times.
   Coordinators in local catalog mode is more likely to see different
   versions between their getPartialCatalogObject requests so have to
   retry planning to resolve InconsistentMetadataFetchException.
 - Partitions are reloaded in sequence. They should be reloaded in
   parallel like we do in refreshing the whole table.

This patch extends the syntax to refresh multiple partitions in one
statement:
  REFRESH [db_name.]table_name
  [PARTITION (key_col1=val1 [, key_col2=val2...])
   [PARTITION (key_col1=val3 [, key_col2=val4...])...]]
Example:
  REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2);

TResetMetadataRequest is extended to have a list of partition specs for
this. If the list has only one item, we still use the existing logic of
reloading a specific partition. If the list has more than one item,
partitions will be reloaded in parallel. This is implemented in
CatalogServiceCatalog#reloadTable(). Previously it always invokes
HdfsTable#load() with partitionsToUpdate=null. Now the parameter is
set when TResetMetadataRequest has the partition list.

HMS notification events in RELOAD type will be fired for each partition
if enable_reload_events is turned on. Once HIVE-28967 is resolved, we
can fire a single event for multiple partitions.

Updated docs in impala_refresh.xml.

Tests:
 - Added FE and e2e tests

Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e
Reviewed-on: http://gerrit.cloudera.org:8080/22938
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-28 05:18:53 +00:00
Xuebin Su
607bad042a IMPALA-3841: Enable late materialization for collections
This patch enables late materialization for collections to avoid the
cost of materializing collections that will never be accessed by the
query.

For a collection column, late materialization takes effect only when the
collection column is not used in any predicate, including the `!empty()`
predicate added by the planner. Otherwise we need to read every row to
evaluate the predicate and cannot skip any. Therefore, this patch skips
registering the `!empty()` predicates if the query contains zipping
unnests. This can affect performance if the table contains many empty
collections, but should be noticeable only in very extreme cases.

The late materialization threshold is set to 1 in HdfsParquetScanner
when there is any collection that can be skipped.

This patch also adds the detail of `HdfsScanner::parse_status_` to the
error message returned by the HdfsParquetScanner to help figure out the
root cause.

Performance:
- Tests with the queries involving collection columns in table
  `tpch_nested_parquet.customer` show that when the selectivity is low,
  the single-threaded (1 impalad and MT_DOP=1) scanning time can be
  reduced by about 50%, while when the selectivity is high, the scanning
  time almost does not change.
- For queries not involving collections, performance A/B testing
  shows no regression on TPC-H.

Testing:
- Added a runtime profile counter NumTopLevelValuesSkipped to record
  the total number of top-level values skipped for all columns. The
  counter only counts the values that are not skipped as a page.
- Added e2e test cases in test_parquet_late_materialization.py to ensure
  that late materialization works using the new counter.

Change-Id: Ia21bdfa6811408d66d74367e0a9520e20951105f
Reviewed-on: http://gerrit.cloudera.org:8080/22662
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-27 15:45:52 +00:00
Joe McDonnell
ea0969a772 IMPALA-11980 (part 2): Fix absolute import issues for impala_shell
Python 3 changed the behavior of imports with PEP328. Existing
imports become absolute unless they use the new relative import
syntax. This adapts the impala-shell code to use absolute
imports, fixing issues where it is imported from our test code.

There are several parts to this:
1. It moves impala shell code into shell/impala_shell.
   This matches the directory structure of the PyPi package.
2. It changes the imports in the shell code to be
   absolute paths (i.e. impala_shell.foo rather than foo).
   This fixes issues with Python 3 absolute imports.
   It also eliminates the need for ugly hacks in the PyPi
   package's __init__.py.
3. This changes Thrift generation to put it directly in
   $IMPALA_HOME/shell rather than $IMPALA_HOME/shell/gen-py.
   This means that the generated Thrift code is rooted in
   the same directory as the shell code.
4. This changes the PYTHONPATH to include $IMPALA_HOME/shell
   and not $IMPALA_HOME/shell/gen-py. This means that the
   test code is using the same import paths as the pypi
   package.

With all of these changes, the source code is very close
to the directory structure of the PyPi package. As long as
CMake has generated the thrift files and the Python version
file, only a few differences remain. This removes those
differences by moving the setup.py / MANIFEST.in and other
files from the packaging directory to the top-level
shell/ directory. This means that one can pip install
directly from the source code. i.e. pip install $IMPALA_HOME/shell

This also moves the shell tarball generation script to the
packaging directory and changes bin/impala-shell.sh to use
Python 3.

This sorts the imports using isort for the affected Python files.

Testing:
 - Ran a regular core job with Python 2
 - Ran a core job with Python 3 and verified that the absolute
   import issues are gone.

Change-Id: Ica75a24fa6bcb78999b9b6f4f4356951b81c3124
Reviewed-on: http://gerrit.cloudera.org:8080/22330
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Riza Suminto <riza.suminto@cloudera.com>
2025-05-21 15:14:11 +00:00
Riza Suminto
3210ec58c5 IMPALA-14006: Bound max_instances in CreateInputCollocatedInstances
IMPALA-11604 (part 2) changes how many instances to create in
Scheduler::CreateInputCollocatedInstances. This works when the left
child fragment of a parent fragment is distributed across nodes.
However, if the left child fragment instance is limited to only 1
node (the case of UNPARTITIONED fragment), the scheduler might
over-parallelize the parent fragment by scheduling too many instances in
a single node.

This patch attempts to mitigate the issue in two ways. First, it adds
bounding logic in PlanFragment.traverseEffectiveParallelism() to lower
parallelism further if the left (probe) side of the child fragment is
not well distributed across nodes.

Second, it adds TQueryExecRequest.max_parallelism_per_node to relay
information from Analyzer.getMaxParallelismPerNode() to the scheduler.
With this information, the scheduler can do additional sanity checks to
prevent Scheduler::CreateInputCollocatedInstances from
over-parallelizing a fragment. Note that this sanity check can also cap
MAX_FS_WRITERS option under a similar scenario.

Added ScalingVerdict enum and TRACE log it to show the scaling decision
steps.

Testing:
- Add planner test and e2e test that exercise the corner case under
  COMPUTE_PROCESSING_COST=1 option.
- Manually comment the bounding logic in traverseEffectiveParallelism()
  and confirm that the scheduler's sanity check still enforces the
  bounding.

Change-Id: I65223b820c9fd6e4267d57297b1466d4e56829b3
Reviewed-on: http://gerrit.cloudera.org:8080/22840
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-07 03:34:15 +00:00
Fang-Yu Rao
8f7d2246ec IMPALA-12554: (Addendum) Add a flag to not consolidate requests by default
This patch adds a startup flag so that by default the catalog server
will not consolidate the grant/revoke requests sent to the Ranger server
when there are multiple columns involved in the GRANT/REVOKE statement.

Testing:
 - Added 2 end-to-end tests to make sure the grant/revoke requests
   sent to the Ranger server would be consolidated only when the flag
   is explicitly added when we start the catalog server.

Change-Id: I4defc59c048be1112380c3a7254ffa8655eee0af
Reviewed-on: http://gerrit.cloudera.org:8080/22833
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-01 11:34:38 +00:00
Venu Reddy
5db760662f IMPALA-12709: Add support for hierarchical metastore event processing
At present, metastore event processor is single threaded. Notification
events are processed sequentially with a maximum limit of 1000 events
fetched and processed in a single batch. Multiple locks are used to
address the concurrency issues that may arise when catalog DDL
operation processing and metastore event processing tries to
access/update the catalog objects concurrently. Waiting for a lock or
file metadata loading of a table can slow the event processing and can
affect the processing of other events following it. Those events may
not be dependent on the previous event. Altogether it takes a very
long time to synchronize all the HMS events.

Existing metastore event processing is turned into multi-level
event processing with enable_hierarchical_event_processing flag. It
is not enabled by default. Idea is to segregate the events based on
their dependency, maintain the order of events as they occur within
the dependency and process them independently as much as possible.
Following 3 main classes represents the three level threaded event
processing.
1. EventExecutorService
   It provides the necessary methods to initialize, start, clear,
   stop and process the metastore events processing in hierarchical
   mode. It is instantiated from MetastoreEventsProcessor and its
   methods are invoked from MetastoreEventsProcessor. Upon receiving
   the event to process, EventExecutorService queues the event to
   appropriate DbEventExecutor for processing.
2. DbEventExecutor
   An instance of this class has an execution thread, manage events
   of multiple databases with DbProcessors. An instance of DbProcessor
   is maintained to store the context of each database within the
   DbEventExecutor. On each scheduled execution, input events on
   DbProcessor are segregated to appropriate TableProcessors for the
   event processing and also process the database events that are
   eligible for processing.
   Once a DbEventExecutor is assigned to a database, a DbProcessor
   is created. And the subsequent events belonging to the database
   are queued to same DbEventExecutor thread for further processing.
   Hence, linearizability is ensured in dealing with events within
   the database. Each instance of DbEventExecutor has a fixed list
   of TableEventExecutors.
3. TableEventExecutor
   An instance of this class has an execution thread, processes
   events of multiple tables with TableProcessors. An instance of
   TableProcessor is maintained to store context of each table within
   a TableEventExecutor. On each scheduled execution, events from
   TableProcessors are processed.
   Once a TableEventExecutor is assigned to table, a TableProcessor
   is created. And the subsequent table events are processed by same
   TableEventExecutor thread. Hence, linearizability is guaranteed
   in processing events of a particular table.
   - All the events of a table are processed in the same order they
     have occurred.
   - Events of different tables are processed in parallel when those
     tables are assigned to different TableEventExecutors.

Following new events are added:
1. DbBarrierEvent
   This event wraps a database event. It is used to synchronize all
   the TableProcessors belonging to database before processing the
   database event. It acts as a barrier to restrict the processing
   of table events that occurred after the database event until the
   database event is processed on DbProcessor.
2. RenameTableBarrierEvent
   This event wraps an alter table event for rename. It is used to
   synchronize the source and target TableProcessors to
   process the rename table event. It ensures the source
   TableProcessor removes the table first and then allows the target
   TableProcessor to create the renamed table.
3. PseudoCommitTxnEvent and PseudoAbortTxnEvent
   CommitTxnEvent and AbortTxnEvent can involve multiple tables in
   a transaction and processing these events modifies multiple table
   objects. Pseudo events are introduced such that a pseudo event is
   created for each table involved in the transaction and these
   pseudo events are processed independently at respective
   TableProcessors.

Following new flags are introduced:
1. enable_hierarchical_event_processing
   To enable the hierarchical event processing on catalogd.
2. num_db_event_executors
   To set the number of database level event executors.
3. num_table_event_executors_per_db_event_executor
   To set the number of table level event executors within a
   database event executor.
4. min_event_processor_idle_ms
   To set the minimum time to retain idle db processors and table
   processors on the database event executors and table event
   executors respectively, when they do not have events to process.
5. max_outstanding_events_on_executors
   To set the limit of maximum outstanding events to process on
   event executors.

Changed hms_event_polling_interval_s type from int to double to support
millisecond precision interval

TODOs:
1. We need to redefine the lag in the hierarchical processing mode.
2. Need to have a mechanism to capture the actual event processing time
   in hierarchical processing mode. Currently, with
   enable_hierarchical_event_processing as true, lastSyncedEventId_ and
   lastSyncedEventTimeSecs_ are updated upon event dispatch to
   EventExecutorService for processing on respective DbEventExecutor
   and/or TableEventExecutor. So lastSyncedEventId_ and
   lastSyncedEventTimeSecs_ doesn't actually mean events are processed.
3. Hierarchical processing mode currently have a mechanism to show the
   total number of outstanding events on all the db and table executors
   at the moment. Need to enhance observability further with this mode.
Filed a jira[IMPALA-13801] to fix them.

Testing:
 - Executed existing end to end tests.
 - Added fe and end-to-end tests with enable_hierarchical_event_processing.
 - Added event processing performance tests.
 - Have executed the existing tests with hierarchical processing
   mode enabled. lastSyncedEventId_ is now used in the new feature of
   sync_hms_events_wait_time_s (IMPALA-12152) as well. Some tests fail when
   hierarchical processing mode is enabled because lastSyncedEventId_ do
   not actually mean event is processed in this mode. This need to be
   fixed/verified with above jira[IMPALA-13801].

Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Reviewed-on: http://gerrit.cloudera.org:8080/21031
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-30 11:51:03 +00:00
Eyizoha
faf322dd41 IMPALA-12927: Support specifying format for reading JSON BINARY columns
Currently, Impala always assumes that the data in the binary columns of
JSON tables is base64 encoded. However, before HIVE-21240, Hive wrote
binary data to JSON tables without base64 encoding it, instead writing
it as escaped strings. After HIVE-21240, Hive defaults to base64
encoding binary data when writing to JSON tables and introduces the
serde property 'json.binary.format' to indicate the encoding method of
binary data in JSON tables.

To maintain consistency with Hive and avoid correctness issues caused by
reading data in an incorrect manner, this patch also introduces the
serde property 'json.binary.format' to specify the reading method for
binary data in JSON tables. Currently, this property supports reading in
either base64 or rawstring formats, same as Hive.

Additionally, this patch introduces a query option 'json_binary_format'
to achieve the same effect. This query option will only take effect for
JSON tables where the serde property 'json.binary.format' is not set.
The reading format of binary columns in JSON tables can be configured
globally by setting the 'default_query_options'. It should be noted that
the default value of 'json_binary_format' is 'NONE', and impala will
prohibit reading binary columns of JSON tables that either have
"no 'json.binary.format' set and 'json_binary_format' is 'NONE'" or
"an invalid 'json.binary.format' value set", and will provide an error
message to avoid using an incorrect format without the user noticing.

Testing:
  - Enabled existing binary type E2E tests for JSON tables
  - Added new E2E test for 'json.binary.format'

Change-Id: Idf61fa3afc0f33caa63fbc05393e975733165e82
Reviewed-on: http://gerrit.cloudera.org:8080/22289
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-29 16:16:12 +00:00
stiga-huang
56b465d91f IMPALA-13829: Postpone catalog deleteLog GC for waitForHmsEvent requests
When a db/table is removed in the catalog cache, catalogd assigns it a
new catalog version and put it into the deleteLog. This is used for the
catalog update thread to collect deletion updates. Once the catalog
update thread collects a range of updates, it triggers GC in the
deleteLog to clear items older than the last sent catalog version. The
deletions will be broadcasted by statestore to all the coordinators
eventually.

However, waitForHmsEvent requests is also a consumer of the deleteLog
and could be impacted by these GCs. waitForHmsEvent is a catalogd RPC
used by coordinators when a query wants to wait until the related
metadata is in synced with HMS. The response of waitForHmsEvent returns
the latest metadata including the deletions on related dbs/tables.
If the related deletions in deleteLog is GCed just before the
waitForHmsEvent request collects the results, they will be missing in
the response. Coordinator might keep using stale metadata of
non-existing dbs/tables.

This is a quick fix for the issue by postponing deleteLog GC in a
configurable number of topic updates, similar to what we have done on
the TopicUpdateLog. A thorough fix might need to carefully choose the
version to GC or let impalad waits for the deletions from statestore to
arrive.

A new flag, catalog_delete_log_ttl, is added for this. The deleteLog
items can survive for catalog_delete_log_ttl catalog updates. The
default is 60 so a deletion can survive for at least 120s. It should be
safe enough, i.e. the GCed deletions must have arrived in the impalad
side after 60 rounds of catalog updates, otherwise that's an abnormal
impalad and already has other more severe issues, e.g. lots of stale
tables due to metadata out of sync with catalogd.

Note that postponing deleteLog GCs might increase the memory
consumption. But since most of its memory is used by db/table/partition
names, the memory usage might still be trivial comparing to other
metadata like file descriptors and incremental stats in lived catalog
objects.

This patch also removed some unused imports.

Tests:
 - Added e2e test with a debug action to reproduce the issue. Ran the
   test 100 times. Without the fix, it consistently fails when runs for
   2-3 times.

Change-Id: I2441440bca2b928205dd514047ba742a5e8bf05e
Reviewed-on: http://gerrit.cloudera.org:8080/22816
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-29 07:41:41 +00:00
Noemi Pap-Takacs
e7aa31296c IMPALA-13738 (Part2): Clean up code in Catalog's table and partition interfaces
Prepare for FE/Catalog refactor by resolving some TODOs and
cleaning up unused code in most table and partition interfaces.
 - removed dead code
 - removed unused imports
 - moved static functions from Util classes to interfaces
   as default methods

Testing:
  Run existing tests to validate changes.

Change-Id: I8d9c7ba876e39fa4f4d067761f25dc4ecfd55702
Reviewed-on: http://gerrit.cloudera.org:8080/22728
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-25 14:41:21 +00:00
stiga-huang
4ddacac14f IMPALA-11402: Add limit on files fetched by a single getPartialCatalogObject request
getPartialCatalogObject is a catalogd RPC used by local catalog mode
coordinators to fetch metadata on-demand from catalogd.
For a table with a huge number (e.g. 6M) of files, catalogd might hit
OOM of exceeding the JVM array limit when serializing the response of
a getPartialCatalogObject request for all partitions (thus all files).

This patch adds a new flag, catalog_partial_fetch_max_files, to define
the max number of file descriptors allowed in a response of
getPartialCatalogObject. Catalogd will truncate the response in
partition level when it's too big, and only return a subset of the
requested partitions. Coordinator should send new requests to fetch the
remaining partitions. Note that it's possible that table metadata
changes between the requests. Coordinator will detect the catalog
version changes and throws an InconsistentMetadataFetchException for the
planner to replan the query. This is an existing mechanism for other
kinds of table metadata.

Here are some metrics of the number of files in a single response and
the corresponding byte array size and duration of a single response:
 * 1000000: 371.71MB, 1s487ms
 * 2000000: 744.51MB, 4s035ms
 * 3000000: 1.09GB, 6s643ms
 * 4000000: 1.46GB, duration not measured due to GC pauses
 * 5000000: 1.82GB, duration not measured due to GC pauses
 * 6000000: >2GB (hit OOM)
Choose 1000000 as the default value for now. We can tune it in the
future.

Tests:
 - Added custom-cluster test
 - Ran e2e tests in local-catalog mode with
   catalog_partial_fetch_max_files=1000 so the new codes are used.

Change-Id: Ibb13fec20de5a17e7fc33613ca5cdebb9ac1a1e5
Reviewed-on: http://gerrit.cloudera.org:8080/22559
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-21 15:20:59 +00:00
stiga-huang
fb789df3be IMPALA-13684: Improve waitForHmsEvent() to only wait for related events
waitForHmsEvent is a catalogd RPC for coordinators to send a requested
db/table names to catalogd and wait until it's safe (i.e. no stale
metadata) to start analyzing the statement. The wait time is configured
by query option sync_hms_events_wait_time_s. Currently, when this option
is enabled, catalogd waits until it syncs to the latest HMS event
regardless what the query is.

This patch reduces waiting by only checking related events and wait
until the last related event has been processed. In the ideal case, if
there are no pending events that are related, the query doesn't need to
wait.

Related pending events are determined as follows:
 - For queries that need the db list, i.e. SHOW DATABASES, check pending
   CREATE/ALTER/DROP_DATABASE events on all dbs. ALTER_DATABASE events
   are checked in case the ownership changes and impacts visibility.
 - For db statements like SHOW FUNCTIONS, CREATE/ALTER/DROP DATABASE,
   check pending CREATE/ALTER/DROP events on that db.
   - For db statements that require the table list, i.e. SHOW TABLES,
     also check CREATE_TABLE, DROP_TABLE events under that db.
 - For table statements,
   - check all database events on related db names.
   - If there are loaded transactional tables, check all the pending
     COMMIT_TXN, ABORT_TXN events. Note that these events might modify
     multiple transactional tables and we don't know their table names
     until they are processed. To be safe, wait for all transactional
     events.
   - For all the other table names,
     - if they are all missing/unloaded in the catalog, check all the
       pending CREATE_TABLE, DROP_TABLE events on them for their
       existence.
     - Otherwise, some of them are loaded, check all the table events on
       them. Note that we can fetch events on multiple tables under the
       same db in a single fetch.

If the statement has a SELECT part, views will be expanded so underlying
tables will be checked as well. For performance, this feature assumes
that views won't be changed to tables, and vice versa. This is a rare
use case in regular jobs. Users should use INVALIDATE for such case.

This patch leverages the HMS API to fetch events of several tables under
the same db in batch. MetastoreEventsProcessor.MetaDataFilter is
improved for this.

Tests:
 - Added test for multiple tables in a single query.
 - Added test with views.
 - Added test for transactional tables.
 - Ran CORE tests.

Change-Id: Ic033b7e197cd19505653c3ff80c4857cc474bcfc
Reviewed-on: http://gerrit.cloudera.org:8080/22571
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-17 09:06:19 +00:00
Joe McDonnell
c5a0ec8bdf IMPALA-11980 (part 1): Put all thrift-generated python code into the impala_thrift_gen package
This puts all of the thrift-generated python code into the
impala_thrift_gen package. This is similar to what Impyla
does for its thrift-generated python code, except that it
uses the impala_thrift_gen package rather than impala._thrift_gen.
This is a preparatory patch for fixing the absolute import
issues.

This patches all of the thrift files to add the python namespace.
This has code to apply the patching to the thirdparty thrift
files (hive_metastore.thrift, fb303.thrift) to do the same.

Putting all the generated python into a package makes it easier
to understand where the imports are getting code. When the
subsequent change rearranges the shell code, the thrift generated
code can stay in a separate directory.

This uses isort to sort the imports for the affected Python files
with the provided .isort.cfg file. This also adds an impala-isort
shell script to make it easy to run.

Testing:
 - Ran a core job

Change-Id: Ie2927f22c7257aa38a78084efe5bd76d566493c0
Reviewed-on: http://gerrit.cloudera.org:8080/20169
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
2025-04-15 17:03:02 +00:00
Steve Carlin
706e1f026c IMPALA-13657: Connect Calcite planner to Impala Frontend framework
This commit adds the plumbing created by IMPALA-13653. The Calcite
planner is now called from Impala's Frontend code via 4 hooks which
are:

- CalciteCompilerFactory: the factory class that creates
    the implementations of the parser, analysis, and single node
    planner hooks.
- CalciteParsedStatement: The class which holds the Calcite SqlNode
    AST.
- CalciteAnalysisDriver: The class that does the validation of the
    SqlNode AST
- CalciteSingleNodePlanner: The class that converts the AST to a
    logical plan, optimizes it, and converts it into an Impala
    PlanNode physical plan.

To run on Calcite, one needs to do two things:

1) set the USE_CALCITE_PLANNER env variable to true before starting
the cluster. This adds the jar file into the path in the
bin/setclasspath.sh file, which is not there by default at the time
of this commit.
2) set the use_calcite_planner query option to true.

This commit makes the CalciteJniFrontend class obsolete. Once the
test cases are moved out of there, that class and others can be
removed.

Change-Id: I3b30571beb797ede827ef4d794b8daefb130ccb1
Reviewed-on: http://gerrit.cloudera.org:8080/22319
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
2025-04-09 23:55:15 +00:00