IMPALA-13801: Support greatest synced event with hierarchical metastore event processing

It is a follow-up jira/commit to IMPALA-12709. IMPALA-12152 and
IMPALA-12785 are affected when hierarchical metastore event
processing feature is enabled.

Following changes are incorporated with this patch:
1. Added creationTime_ and dispatchTime_ fields in MetastoreEvent
   class to store the current time in millisec. They are used to
   calculate:
   a) Event dispatch time(time between a MetastoreEvent object
      creation and when event is moved to inProgressLog_ of
      EventExecutorService after dispatching it to a
      DbEventExecutor).
   b) Event schedule delays incurred at DbEventExecutors and
      TableEventExecutors(time between an event moved to
      EventExecutorService's inProgressLog_ and before start of
      processing event at appropriate DbEventExecutor and
      TableEventExecutor).
   c) Event process time from EventExecutorService point of
      view(time spent in inProgressLog_ before it is moved to
      processedLog_).
   Logs are added to show the event dispatch time, schedule
   delays, process time from EventExecutorService point of
   view for each event. Also a log is added to show the time
   taken for event's processIfEnabled().
2. Added isDelimiter_ field in MetastoreEvent class to indicate
   whether it is a delimiter event. It is set only when
   hierarchical event processing is enabled. Delimiter is a kind
   of metastore event that do not require event processing.
   Delimeter event can be:
   a) A CommitTxnEvent that do not have any write event info for
      a given transaction.
   b) An AbortTxnEvent that do not have write ids for a given
      transaction.
   c) An IgnoredEvent.
   An event is determined and marked as delimiter in
   EventExecutorService#dispatch(). They are not queued to a
   DbEventExecutor for processing. They are just maintained in
   the inProgressLog_ to preserve continuity and correctness in
   synchronization tracking. The delimiter events are removed from
   inProgressLog_ when their preceding non-delimiter metastore
   event is removed from inProgressLog_.
3. Greatest synced event id is computed based on the dispatched
   events(inProgressLog_) and processed events(processedLog_) tree
   maps. Greatest synced event is the latest event such that all
   events with id less than or equal to the latest event are
   definitely synced.
4. Lag is calculated as difference between latest event time on HMS
   and the greatest synced event time. It is shown in the log.
5. Greatest synced event id is used in IMPALA-12152 changes. When
   greatest synced event id becomes greater than or equal to
   waitForEventId, all the required events are definitely synced.
6. Event processor is paused gracefully when paused with command in
   IMPALA-12785. This ensures that all the fetched events from HMS in
   current batch are processed before the event processor is fully
   paused. It is necessary to process the current batch of events
   because, certain events like AllocWriteIdEvent, AbortTxnEvent and
   CommitTxnEvent update table write ids in catalog upon metastore
   event object creation. And the table write ids are later updated
   to appropriate table object during their event process. Can lead
   to inconsistent state of write ids on table objects when paused
   abruptly in the middle of current batch of event processing.
7. Added greatest synced event id and event time in events processor
   metrics. And updated description of lag, pending events, last
   synced event id and event time metrics.
8. Atomically update the event queue and increment outstanding event
   count in enqueue methods of both DbProcessor and TableProcessor
   so that respective process methods do not process the event until
   event is added to queue and outstanding event count is incremented.
   Otherwise, event can get processed, outstanding event count gets
   decremented before it is incremented in enqueue method.
9. Refactored DbEventExecutor, DbProcessor, TableEventExecutor and
   TableProcessor classes to propapage the exception occurred along
   with event during event processing. EventProcessException is a
   wrapper added to hold reference to event being processed and
   exception occurred.
10.Added AcidTableWriteInfo helper class to store table, writeids
   and partitions for the transaction id received in CommitTxnEvent.

Testing:
 - Added new tests and executed existing end to end tests.
 - Have executed the existing tests with hierarchical event processing
   enabled.

Change-Id: I26240f36aaf85125428dc39a66a2a1e4d3197e85
Reviewed-on: http://gerrit.cloudera.org:8080/22997
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
This commit is contained in:
Venu Reddy
2025-06-09 15:13:22 +05:30
committed by Quanlong Huang
parent 48b38810e8
commit ebbc67cf40
22 changed files with 1401 additions and 452 deletions

View File

@@ -991,6 +991,16 @@ struct TEventProcessorMetrics {
// Outstanding event count to process on executors
22: optional i64 outstanding_event_count
// Number of metastore events that are pending synchronization up to and including
// the latest_event_id
23: optional i64 pending_event_count
// Greatest synced event id
24: optional i64 greatest_synced_event_id
// Greatest synced event time
25: optional i64 greatest_synced_event_time
}
struct TCatalogHmsCacheApiMetrics {

View File

@@ -3626,7 +3626,7 @@
"key": "events-processor.events-received-15min-rate"
},
{
"description": "Last metastore event id that the catalog server processed and synced to",
"description": "It is the id of the latest metastore event such that all events with id less than or equal to the latest metastore event are definitely processed and synced",
"contexts": [
"CATALOGSERVER"
],
@@ -3636,7 +3636,7 @@
"key" : "events-processor.last-synced-event-id"
},
{
"description": "Last metastore event time that the catalog server processed and synced to",
"description": "It is the time of the latest metastore event such that all events with id less than or equal to the latest metastore event are definitely processed and synced",
"contexts": [
"CATALOGSERVER"
],
@@ -3645,6 +3645,26 @@
"kind" : "COUNTER",
"key" : "events-processor.last-synced-event-time"
},
{
"description": "Greatest synced event id is the id of the latest metastore event such that all events with id less than or equal to the latest metastore event are definitely processed and synced. It is same as last-synced-event-id",
"contexts": [
"CATALOGSERVER"
],
"label": "Greatest Synced Event Id",
"units": "NONE",
"kind" : "COUNTER",
"key" : "events-processor.greatest-synced-event-id"
},
{
"description": "Greatest synced event time is the time of the latest metastore event such that all events with id less than or equal to the latest metastore event are definitely processed and synced. It is same as last-synced-event-time",
"contexts": [
"CATALOGSERVER"
],
"label": "Greatest Synced Event Time",
"units": "NONE",
"kind" : "COUNTER",
"key" : "events-processor.greatest-synced-event-time"
},
{
"description": "Latest event id in Hive metastore",
"contexts": [
@@ -3666,7 +3686,7 @@
"key" : "events-processor.latest-event-time"
},
{
"description": "Number of pending events to be synced, i.e. the difference between latest-event-id and last-synced-event-id",
"description": "Number of pending events to be synced till the latest-event-id",
"contexts": [
"CATALOGSERVER"
],
@@ -3676,7 +3696,7 @@
"key" : "events-processor.pending-events"
},
{
"description": "Lag time of the event processing, i.e. the difference between latest-event-time and last-synced-event-time",
"description": "Lag time of the event processing, i.e. the difference between latest-event-time and greatest-synced-event-time",
"contexts": [
"CATALOGSERVER"
],