IMPALA-14082: Support batch processing of RELOAD events on same table

Currently, RELOAD events of partitioned table are processed one after
the other. Processing them one by one acquires the table lock multiple
times to load individual partitions in sequence. This also keeps the
table version changing which impacts performance of coordinators in
local-catalog mode - query planning needs retry to handle
InconsistentMetadataFetchException due to table version changes.

This patch handles the batch processing logic RELOAD events on same
table by reusing the exisiting logic of BatchPartitionEvent. This
implementation adds four new methods canBeBatched(),addToBatchEvents(),
getPartitionForBatching(), getBatchEventType()(pre-requisites to reuse
batching logic) to the RELOAD event class.

Testing:
- Added an end-to-end to verify the batching.

Change-Id: Ie3e9a99b666a1c928ac2a136bded1e5420f77dab
Reviewed-on: http://gerrit.cloudera.org:8080/23159
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Sai Hemanth Gantasala
2025-07-08 10:30:24 -07:00
committed by Impala Public Jenkins
parent 850f2cf361
commit 46525bcd7c
3 changed files with 108 additions and 10 deletions

View File

@@ -136,6 +136,7 @@ public class MetastoreEvents {
INSERT("INSERT"),
INSERT_PARTITIONS("INSERT_PARTITIONS"),
RELOAD("RELOAD"),
RELOAD_PARTITIONS("RELOAD_PARTITIONS"),
ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
COMMIT_TXN("COMMIT_TXN"),
ABORT_TXN("ABORT_TXN"),
@@ -2991,8 +2992,9 @@ public class MetastoreEvents {
partitions.add(event.getPartitionForBatching());
}
try {
if (baseEvent_ instanceof InsertEvent) {
// for insert event, always reload file metadata so that new files
if ((baseEvent_ instanceof InsertEvent) ||
(baseEvent_ instanceof ReloadEvent)) {
// for insert & reload events, always reload file metadata so that new files
// are reflected in HdfsPartition
reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, getEventDesc(),
true);
@@ -3332,6 +3334,41 @@ public class MetastoreEvents {
}
}
@Override
public boolean canBeBatched(MetastoreEvent event) {
if (!(event instanceof ReloadEvent)) return false;
if (isOlderThanLastSyncEventId(event)) return false;
ReloadEvent reloadEvent = (ReloadEvent) event;
// make sure that the event is on the same table
if (!getFullyQualifiedTblName().equalsIgnoreCase(
reloadEvent.getFullyQualifiedTblName())) {
return false;
}
// we currently only batch partition level reload events
if (this.reloadPartition_ == null || reloadEvent.reloadPartition_ == null) {
return false;
}
return true;
}
@Override
public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
if (!(event instanceof ReloadEvent)) return null;
BatchPartitionEvent<ReloadEvent> batchEvent = new BatchPartitionEvent<>(
this);
Preconditions.checkState(batchEvent.canBeBatched(event));
batchEvent.addToBatchEvents(event);
return batchEvent;
}
@Override
protected Partition getPartitionForBatching() { return reloadPartition_; }
@Override
protected MetastoreEventType getBatchEventType() {
return MetastoreEventType.RELOAD_PARTITIONS;
}
private void processTableInvalidate() throws MetastoreNotificationException {
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();

View File

@@ -7532,7 +7532,8 @@ public class CatalogOpExecutor {
.getPartitionFromThriftPartitionSpec(partSpecList.get(i));
if (partition != null) {
HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
partBuilder.setLastRefreshEventId(eventIds.get(0));
// use last event id, so that batch partition events will not reloaded again
partBuilder.setLastRefreshEventId(eventIds.get(eventIds.size() - 1));
partitionChanged |= hdfsTbl.updatePartition(partBuilder);
} else {
LOG.warn("Partition {} no longer exists in table {}. It might be " +

View File

@@ -17,6 +17,7 @@
from __future__ import absolute_import, division, print_function
from builtins import range
import logging
import os
import pytest
import re
from os import getenv
@@ -570,19 +571,24 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
@CustomClusterTestSuite.with_args(
catalogd_args="--hms_event_polling_interval_s=5"
" --enable_reload_events=true")
" --enable_reload_events=true"
" --enable_skipping_older_events=true")
def test_refresh_invalidate_events(self, unique_database):
"""The reload events generated by this test will be batched by
MetaStoreEvents#BatchPartitionEvent and BatchPartitionEvent#isOlderEvent() requires
'enable_skipping_older_events=true' to skip older reload events."""
self.run_test_refresh_invalidate_events(unique_database, "reload_table")
@CustomClusterTestSuite.with_args(
catalogd_args="--hms_event_polling_interval_s=5"
" --enable_reload_events=true"
" --enable_sync_to_latest_event_on_ddls=true")
" --enable_sync_to_latest_event_on_ddls=true"
" --enable_skipping_older_events=true")
def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database):
self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True)
def run_test_refresh_invalidate_events(self, unique_database, test_reload_table,
enable_sync_to_latest_event_on_ddls=False):
fire_reload_events_from_hive=False):
"""Test is to verify Impala-11808, refresh/invalidate commands should generate a
Reload event in HMS and CatalogD's event processor should process this event.
"""
@@ -624,8 +630,10 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
.format(unique_database, test_reload_table), 2)
EventProcessorUtils.wait_for_event_processing(self)
if enable_sync_to_latest_event_on_ddls:
# Test to verify if older events are being skipped in event processor
if fire_reload_events_from_hive:
# Test to verify if older events from hive are being skipped in event processor.
# Firing 10 consecutive RELOAD events, the first one processes and updates the
# lastRefreshEventId, causing the remaining 9 to be ignored.
data = FireEventRequestData()
data.refreshEvent = True
req = FireEventRequest(True, data)
@@ -633,7 +641,7 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
req.tableName = test_reload_table
# table level reload events
tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
for i in range(10):
for _ in range(10):
self.hive_client.fire_listener_event(req)
EventProcessorUtils.wait_for_event_processing(self)
tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
@@ -641,9 +649,13 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
# partition level reload events
EventProcessorUtils.wait_for_event_processing(self)
part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
self.client.execute(":event_processor('pause')")
req.partitionVals = ["2022"]
for i in range(10):
for _ in range(10):
self.hive_client.fire_listener_event(req)
self.client.execute("refresh {}.{} partition(year=2022)"
.format(unique_database, test_reload_table))
self.client.execute(":event_processor('start')")
EventProcessorUtils.wait_for_event_processing(self)
part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
assert part_events_skipped_after > part_events_skipped_before
@@ -796,6 +808,54 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
assert batch_events_2 == batch_events_1
assert current_skipped_events - prev_skipped_events >= 24
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
def test_batch_reload_events(self, unique_database):
"""Test to verify IMPALA-14082, adding batching logic for partitioned refresh events.
Before batching the events, each event is checked if the event id is greater than
table's lastSyncEventId then the event can be batched else it can be skipped."""
tbl = unique_database + ".batch_refresh_tbl"
self.client.execute(
"create table {} (i int) partitioned by(p int) stored as textfile".format(tbl))
self.client.execute("insert into {} partition(p) values (0,0),(1,1),(2,2)"
.format(tbl))
EventProcessorUtils.wait_for_event_processing(self)
def __get_fs_location(table_name):
return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, table_name)
batch_events_1 = EventProcessorUtils.get_int_metric("batch-events-created")
tbl_path = __get_fs_location("batch_refresh_tbl")
# new data for p=0, overwrite p=2 with p=1, delete data under p=1
self.filesystem_client.create_file(os.path.join(tbl_path + "p=0/",
"new_file.txt"), "4")
self.filesystem_client.delete_file_dir(tbl_path + "p=2/", recursive=True)
self.filesystem_client.make_dir(tbl_path + "p=2/")
self.filesystem_client.copy(tbl_path + "p=1/", tbl_path + "p=2/")
self.filesystem_client.delete_file_dir(tbl_path + "p=1/", recursive=True)
data = FireEventRequestData()
data.refreshEvent = True
req = FireEventRequest(True, data)
req.dbName = unique_database
req.tableName = "batch_refresh_tbl"
req.partitionVals = ["0"]
self.hive_client.fire_listener_event(req)
req.partitionVals = ["1"]
self.hive_client.fire_listener_event(req)
req.partitionVals = ["2"]
self.hive_client.fire_listener_event(req)
EventProcessorUtils.wait_for_event_processing(self)
batch_events_2 = EventProcessorUtils.get_int_metric("batch-events-created")
assert batch_events_2 == batch_events_1 + 1
# p=0 has two values 0, 4 and p=2 has value 1
result = self.execute_query(
"select * from {} order by i".format(tbl))
parsed_data = []
for line in result.get_data().strip().split('\n'):
row = [int(val) for val in line.strip().split('\t')]
parsed_data.append(row)
expected_data = [[0, 0], [1, 2], [4, 0]]
assert expected_data == parsed_data
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
def test_commit_compaction_events(self, unique_database):
"""Test is to verify Impala-11626, commit compaction events triggered in HMS would