It is a follow-up jira/commit to IMPALA-12709. IMPALA-12152 and
IMPALA-12785 are affected when hierarchical metastore event
processing feature is enabled.
Following changes are incorporated with this patch:
1. Added creationTime_ and dispatchTime_ fields in MetastoreEvent
class to store the current time in millisec. They are used to
calculate:
a) Event dispatch time(time between a MetastoreEvent object
creation and when event is moved to inProgressLog_ of
EventExecutorService after dispatching it to a
DbEventExecutor).
b) Event schedule delays incurred at DbEventExecutors and
TableEventExecutors(time between an event moved to
EventExecutorService's inProgressLog_ and before start of
processing event at appropriate DbEventExecutor and
TableEventExecutor).
c) Event process time from EventExecutorService point of
view(time spent in inProgressLog_ before it is moved to
processedLog_).
Logs are added to show the event dispatch time, schedule
delays, process time from EventExecutorService point of
view for each event. Also a log is added to show the time
taken for event's processIfEnabled().
2. Added isDelimiter_ field in MetastoreEvent class to indicate
whether it is a delimiter event. It is set only when
hierarchical event processing is enabled. Delimiter is a kind
of metastore event that do not require event processing.
Delimeter event can be:
a) A CommitTxnEvent that do not have any write event info for
a given transaction.
b) An AbortTxnEvent that do not have write ids for a given
transaction.
c) An IgnoredEvent.
An event is determined and marked as delimiter in
EventExecutorService#dispatch(). They are not queued to a
DbEventExecutor for processing. They are just maintained in
the inProgressLog_ to preserve continuity and correctness in
synchronization tracking. The delimiter events are removed from
inProgressLog_ when their preceding non-delimiter metastore
event is removed from inProgressLog_.
3. Greatest synced event id is computed based on the dispatched
events(inProgressLog_) and processed events(processedLog_) tree
maps. Greatest synced event is the latest event such that all
events with id less than or equal to the latest event are
definitely synced.
4. Lag is calculated as difference between latest event time on HMS
and the greatest synced event time. It is shown in the log.
5. Greatest synced event id is used in IMPALA-12152 changes. When
greatest synced event id becomes greater than or equal to
waitForEventId, all the required events are definitely synced.
6. Event processor is paused gracefully when paused with command in
IMPALA-12785. This ensures that all the fetched events from HMS in
current batch are processed before the event processor is fully
paused. It is necessary to process the current batch of events
because, certain events like AllocWriteIdEvent, AbortTxnEvent and
CommitTxnEvent update table write ids in catalog upon metastore
event object creation. And the table write ids are later updated
to appropriate table object during their event process. Can lead
to inconsistent state of write ids on table objects when paused
abruptly in the middle of current batch of event processing.
7. Added greatest synced event id and event time in events processor
metrics. And updated description of lag, pending events, last
synced event id and event time metrics.
8. Atomically update the event queue and increment outstanding event
count in enqueue methods of both DbProcessor and TableProcessor
so that respective process methods do not process the event until
event is added to queue and outstanding event count is incremented.
Otherwise, event can get processed, outstanding event count gets
decremented before it is incremented in enqueue method.
9. Refactored DbEventExecutor, DbProcessor, TableEventExecutor and
TableProcessor classes to propapage the exception occurred along
with event during event processing. EventProcessException is a
wrapper added to hold reference to event being processed and
exception occurred.
10.Added AcidTableWriteInfo helper class to store table, writeids
and partitions for the transaction id received in CommitTxnEvent.
Testing:
- Added new tests and executed existing end to end tests.
- Have executed the existing tests with hierarchical event processing
enabled.
Change-Id: I26240f36aaf85125428dc39a66a2a1e4d3197e85
Reviewed-on: http://gerrit.cloudera.org:8080/22997
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
At present, metastore event processor is single threaded. Notification
events are processed sequentially with a maximum limit of 1000 events
fetched and processed in a single batch. Multiple locks are used to
address the concurrency issues that may arise when catalog DDL
operation processing and metastore event processing tries to
access/update the catalog objects concurrently. Waiting for a lock or
file metadata loading of a table can slow the event processing and can
affect the processing of other events following it. Those events may
not be dependent on the previous event. Altogether it takes a very
long time to synchronize all the HMS events.
Existing metastore event processing is turned into multi-level
event processing with enable_hierarchical_event_processing flag. It
is not enabled by default. Idea is to segregate the events based on
their dependency, maintain the order of events as they occur within
the dependency and process them independently as much as possible.
Following 3 main classes represents the three level threaded event
processing.
1. EventExecutorService
It provides the necessary methods to initialize, start, clear,
stop and process the metastore events processing in hierarchical
mode. It is instantiated from MetastoreEventsProcessor and its
methods are invoked from MetastoreEventsProcessor. Upon receiving
the event to process, EventExecutorService queues the event to
appropriate DbEventExecutor for processing.
2. DbEventExecutor
An instance of this class has an execution thread, manage events
of multiple databases with DbProcessors. An instance of DbProcessor
is maintained to store the context of each database within the
DbEventExecutor. On each scheduled execution, input events on
DbProcessor are segregated to appropriate TableProcessors for the
event processing and also process the database events that are
eligible for processing.
Once a DbEventExecutor is assigned to a database, a DbProcessor
is created. And the subsequent events belonging to the database
are queued to same DbEventExecutor thread for further processing.
Hence, linearizability is ensured in dealing with events within
the database. Each instance of DbEventExecutor has a fixed list
of TableEventExecutors.
3. TableEventExecutor
An instance of this class has an execution thread, processes
events of multiple tables with TableProcessors. An instance of
TableProcessor is maintained to store context of each table within
a TableEventExecutor. On each scheduled execution, events from
TableProcessors are processed.
Once a TableEventExecutor is assigned to table, a TableProcessor
is created. And the subsequent table events are processed by same
TableEventExecutor thread. Hence, linearizability is guaranteed
in processing events of a particular table.
- All the events of a table are processed in the same order they
have occurred.
- Events of different tables are processed in parallel when those
tables are assigned to different TableEventExecutors.
Following new events are added:
1. DbBarrierEvent
This event wraps a database event. It is used to synchronize all
the TableProcessors belonging to database before processing the
database event. It acts as a barrier to restrict the processing
of table events that occurred after the database event until the
database event is processed on DbProcessor.
2. RenameTableBarrierEvent
This event wraps an alter table event for rename. It is used to
synchronize the source and target TableProcessors to
process the rename table event. It ensures the source
TableProcessor removes the table first and then allows the target
TableProcessor to create the renamed table.
3. PseudoCommitTxnEvent and PseudoAbortTxnEvent
CommitTxnEvent and AbortTxnEvent can involve multiple tables in
a transaction and processing these events modifies multiple table
objects. Pseudo events are introduced such that a pseudo event is
created for each table involved in the transaction and these
pseudo events are processed independently at respective
TableProcessors.
Following new flags are introduced:
1. enable_hierarchical_event_processing
To enable the hierarchical event processing on catalogd.
2. num_db_event_executors
To set the number of database level event executors.
3. num_table_event_executors_per_db_event_executor
To set the number of table level event executors within a
database event executor.
4. min_event_processor_idle_ms
To set the minimum time to retain idle db processors and table
processors on the database event executors and table event
executors respectively, when they do not have events to process.
5. max_outstanding_events_on_executors
To set the limit of maximum outstanding events to process on
event executors.
Changed hms_event_polling_interval_s type from int to double to support
millisecond precision interval
TODOs:
1. We need to redefine the lag in the hierarchical processing mode.
2. Need to have a mechanism to capture the actual event processing time
in hierarchical processing mode. Currently, with
enable_hierarchical_event_processing as true, lastSyncedEventId_ and
lastSyncedEventTimeSecs_ are updated upon event dispatch to
EventExecutorService for processing on respective DbEventExecutor
and/or TableEventExecutor. So lastSyncedEventId_ and
lastSyncedEventTimeSecs_ doesn't actually mean events are processed.
3. Hierarchical processing mode currently have a mechanism to show the
total number of outstanding events on all the db and table executors
at the moment. Need to enhance observability further with this mode.
Filed a jira[IMPALA-13801] to fix them.
Testing:
- Executed existing end to end tests.
- Added fe and end-to-end tests with enable_hierarchical_event_processing.
- Added event processing performance tests.
- Have executed the existing tests with hierarchical processing
mode enabled. lastSyncedEventId_ is now used in the new feature of
sync_hms_events_wait_time_s (IMPALA-12152) as well. Some tests fail when
hierarchical processing mode is enabled because lastSyncedEventId_ do
not actually mean event is processed in this mode. This need to be
fixed/verified with above jira[IMPALA-13801].
Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Reviewed-on: http://gerrit.cloudera.org:8080/21031
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This puts all of the thrift-generated python code into the
impala_thrift_gen package. This is similar to what Impyla
does for its thrift-generated python code, except that it
uses the impala_thrift_gen package rather than impala._thrift_gen.
This is a preparatory patch for fixing the absolute import
issues.
This patches all of the thrift files to add the python namespace.
This has code to apply the patching to the thirdparty thrift
files (hive_metastore.thrift, fb303.thrift) to do the same.
Putting all the generated python into a package makes it easier
to understand where the imports are getting code. When the
subsequent change rearranges the shell code, the thrift generated
code can stay in a separate directory.
This uses isort to sort the imports for the affected Python files
with the provided .isort.cfg file. This also adds an impala-isort
shell script to make it easy to run.
Testing:
- Ran a core job
Change-Id: Ie2927f22c7257aa38a78084efe5bd76d566493c0
Reviewed-on: http://gerrit.cloudera.org:8080/20169
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
EventProcessorUtils.wait_for_event_processing() is used in tests to wait
for HMS events being processed by catalogd and all impalads receive the
catalog updates. Currently, the timeout in waiting for catalog updates
is 10s. However, there are some e2e tests like
test_overlap_min_max_filters that run DDL/DMLs longer than 10s, which
could block the catalog update for longer than 10s. When this util
method is used in e2e tests, it could be impacted by other concurrent
tests and time out.
This patch deflake the issue by bumping the timeout to be 20s.
Change-Id: If6a785e6d98572bf1a3fa3efc81d712c7ecc488e
Reviewed-on: http://gerrit.cloudera.org:8080/22547
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Quanlong Huang <huangquanlong@gmail.com>
Event processor goes to error state before it tries to global invalidate
It remains in error state for a very short period of time. If
wait_for_synced_event_id() obtains event processor status during
this period, it can get status as error. This test was introduced
with IMPALA-12832.
Testing:
- Tested manually. Added sleep in code for testing so that event
processor remains in error state for little longer time.
Change-Id: I787cff4cc9f9df345cd715c02b51b8d93a150edf
Reviewed-on: http://gerrit.cloudera.org:8080/21169
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
At present, failure in event processing needs manual invalidate
metadata. This patch implicitly invalidates the table upon failures
in processing of table events with new
'invalidate_metadata_on_event_processing_failure' flag. And a new
'invalidate_global_metadata_on_event_processing_failure' flag is
added to global invalidate metadata automatically when event
processor goes to non-active state.
Note: Also introduced a config
'inject_process_event_failure_event_types' for automated tests to
simulate event processor failures. This config is used to specify what
event types can be intentionally failed. This config should only be
used for testing purpose. Need IMPALA-12851 as a prerequisite
Testing:
- Added end-to-end tests to mimic failures in event processor and verified
that event processor is active
- Added unit test to verify the 'auto_global_invalidate_metadata' config
- Passed FE tests
Co-Authored-by: Sai Hemanth Gantasala <saihemanth@cloudera.com>
Change-Id: Ia67fc04c995802d3b6b56f79564bf0954b012c6c
Reviewed-on: http://gerrit.cloudera.org:8080/21065
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
When the event-processor goes into the ERROR/NEEDS_INVALIDATE state, we
can only check logs to get the detailed information. This is
inconvenient in triaging failures. This patch exposes the error message
in the /events WebUI. It includes the timestamp string and the
stacktrace of the exception.
This patch makes the /events page visable. Also modifies the test code
of EventProcessorUtils.wait_for_synced_event_id() to print the error
message if the event processor is down.
A trivial bug of lastProcessedEvent is not updated (IMPALA-11588) is
also fixed in this patch. Refactored the variable to be a member of the
class so internal methods can update it before processing each event.
Some new metrics are not added in the /events page, e.g.
latest-event-id, latest-event-time-ms, last-synced-event-time-ms. This
patch addes them and also add a metric of event-processing-delay-ms
which is latest-event-time-ms minors last-synced-event-time-ms.
Tests:
- Manually inject codes to fail the event processor and verified the
WebUI.
- Ran metadata/test_event_processing.py when the event processor is in
ERROR state. Verified the error message is shown up in test output.
Change-Id: I077375422bc3d24eed57c95c6b05ac408228f083
Reviewed-on: http://gerrit.cloudera.org:8080/19916
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
There are some flaky tests where wait_for_event_processing timeouts,
e.g. TestEventProcessing.test_insert_events. My theory is that this
is caused by parallel tests with DDL/DML statements that can also
fire HMS events that have to be processed by catalogd.
The change bumps (default: 10 sec->100 sec) the timeout in case
there is some progress in event processing. If still the same
event is processed then the old timeout is used.
An alternative approach would be to mark the related test as serial,
but I would prefer to avoid this as it would make test jobs slower.
The event processor status is also checked to timeout earlier if
the event processor is without hope of recovery.
Change-Id: I676854f7df9aea5fa10fb6ecf6381195bc8fa4b8
Reviewed-on: http://gerrit.cloudera.org:8080/19614
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This takes steps to make Python 2 behave like Python 3 as
a way to flush out issues with running on Python 3. Specifically,
it handles two main differences:
1. Python 3 requires absolute imports within packages. This
can be emulated via "from __future__ import absolute_import"
2. Python 3 changed division to "true" division that doesn't
round to an integer. This can be emulated via
"from __future__ import division"
This changes all Python files to add imports for absolute_import
and division. For completeness, this also includes print_function in the
import.
I scrutinized each old-division location and converted some locations
to use the integer division '//' operator if it needed an integer
result (e.g. for indices, counts of records, etc). Some code was also using
relative imports and needed to be adjusted to handle absolute_import.
This fixes all Pylint warnings about no-absolute-import and old-division,
and these warnings are now banned.
Testing:
- Ran core tests
Change-Id: Idb0fcbd11f3e8791f5951c4944be44fb580e576b
Reviewed-on: http://gerrit.cloudera.org:8080/19588
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Joe McDonnell <joemcdonnell@cloudera.com>
from Impala
In this patch, we use TUpdateCatalogRequest to refresh metadata after
'LOAD DATA' instead of TResetMetadataRequest so that we can reuse the
code for 'INSERT' statements. It will fire an insert event just same
as what we did for 'INSERT' statements.
We also fix the inconsistent indentation in event_processor_utils.py.
Testing:
- Run existing test_load.py
- Added test_load_data_from_impala() in test_event_processing.py
Change-Id: I7f1b470f40e0aaf891c9f3f327af393b2f9c74bc
Reviewed-on: http://gerrit.cloudera.org:8080/19052
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
This patch improves the performance of events processor
by batching together consecutive ALTER_PARTITION or
INSERT events. Currently, without this patch, if
the events stream consists of a lot of consecutive
ALTER_PARTITION events which cannot be skipped,
events processor will refresh partition from each
event one by one. Similarly, in case of INSERT events
in a partition events processor refresh one partition
at a time.
By batching together such consecutive ALTER_PARTITION or
INSERT events, events processor needs to take lock on the table
only once per batch and can refresh all the partitions from
the events using multiple threads. For transactional (acid)
tables, this provides even significant performance gain
since currently we refresh the whole table in case of
ALTER_PARTITION or INSERT partition events. By batching them
together, events processor will refresh the table once per
batch.
The batch of eligible ALTER_PARTITION and INSERT events will
be processed as ALTER_PARTITIONS and INSERT_PARTITIONS event
respectively.
Performance tests:
In order to simulate bunch of ALTER_PARTITION and INSERT
events, a simple test was performed by running the following
query from hive:
insert into store_sales_copy partition(ss_sold_date_sk)
select * from store_sales;
This query generates 1824 ALTER_PARTITION and 1824 INSERT
events and time taken to process all the events generated
was measured before and after the patch for external and
ACID table.
Table Type Before After
======================================================
External table 75 sec 25 sec
ACID tables 313 sec 47 sec
Additionally, the patch also fixes a minor bug in
evaluateSelfEvent() method which should return false when
serviceId does not match.
Testing Done:
1. Added new tests which cover the batching logic of events.
2. Exhaustive tests.
Change-Id: I5d27a68a64436d31731e9a219b1efd6fc842de73
Reviewed-on: http://gerrit.cloudera.org:8080/17848
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Sourabh Goyal <sourabhg@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
apis are accessed from catalog's metastore server.
This patch fixes a scenario where if table/db already exists in
cache and a user drops it via catalog metastore server endpoint
(i.e drop_table HMS api). When recreating the same via Impala
Shell, user gets an error that table/db already exists. The patch
fixes it by dropping table/db from HMS endpoints so that new
table/db succeeds
Testing:
Added new unit tests which cover drop_database and
drop_table HMS API
Change-Id: Ic2e2ad2630e2028b8ad26a6272ee766b27e0935c
Reviewed-on: http://gerrit.cloudera.org:8080/17576
Reviewed-by: <kishen@cloudera.com>
Reviewed-by: Vihang Karajgaonkar <vihang@cloudera.com>
Tested-by: Vihang Karajgaonkar <vihang@cloudera.com>
This commit turns on events processing by default. The default
polling interval is set as 1 second which can be overrriden by
setting hms_event_polling_interval_s to non-default value.
When the event polling turned on by default this patch also
moves the test_event_processing.py to tests/metadata instead
of custom cluster test. Some tests within test_event_processing.py
which needed non-default configurations were moved to
tests/custom_cluster/test_events_custom_configs.py.
Additionally, some other tests were modified to take into account
the automatic ability of Impala to detect newly added tables
from hive.
Testing done:
1. Ran exhaustive tests by turning on the events processing multiple
times.
2. Ran exhaustive tests by disabling events processing.
3. Ran dockerized tests.
Change-Id: I9a8b1871a98b913d0ad8bb26a104a296b6a06122
Reviewed-on: http://gerrit.cloudera.org:8080/17612
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
This commit redoes some of the self-event detection logic, specifically
for the partition events. Before the patch, the self-event identifiers
for a partition were stored at a table level when generating the
partition events. This was problematic since unlike ADD_PARTITION and
DROP_PARTITION event, ALTER_PARTITION event is generated one per
partition. Due to this if there are multiple ALTER_PARTITION events
generated, only the first event is identified as a self-event and the
rest of the events are processed. This patch fixes this by adding the
self-event identifiers to each partition so that when the event is later
received, each ALTER_PARTITION uses the state stored in HdfsPartition to
valuate the self-events. The patch makes sure that the event processor
takes a table lock during self-event evaluation to avoid races with
other parts of the code which try to modify the table at the same time.
Additionally, this patch also changes the event processor to refresh a
loaded table (incomplete tables are not refreshed) when a ALTER_TABLE
event is received instead of invalidating the table. This makes the
events processor consistent with respect to all the other event types.
In future, we should add a flag to choose the behavior preference
(prefer invalidate or refresh).
Also, this patch fixes the following related issues:
1. Self-event logic was not triggered for alter database events when
user modifies the comment on the database.
2. In case of queries like "alter table add if not exists partition...",
the partition is not added since its pre-existing. The self-event
identifiers should not be added in such cases since no event is expected
from such queries.
3. Changed wait_for_event_processing test util method in
EventProcessorUtils to use a more deterministic way to determine if the
catalog updates have propogated to impalad instead of waiting for a
random duration of time. This also speeds up the event processing tests
significantly.
Testing Done:
1. Added a e2e self-events test which runs multiple impala
queries and makes sure that the event is skips processing.
2. Ran MetastoreEventsProcessorTest
3. Ran core tests on CDH and CDP builds.
Change-Id: I9b4148f6be0f9f946c8ad8f314d64b095731744c
Reviewed-on: http://gerrit.cloudera.org:8080/14799
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Certain Hive queries like "alter table <table> add if not exists
partition (<part_spec>)" generate a add_partition event even if the
partition did not really exists. Such events have a empty partition list
in the event message which trips on the Precondition check in the
AddPartitionEvent. This causes event processor to go into error state.
The only way to recover is to issue invalidate metadata in such a case.
The patch adds logic to ignore such events.
Testing:
1. Added a test case which reproduces the issue. The test case works
after the patch is applied.
Change-Id: I877ce6233934e7090cd18e497f748bc6479838cb
Reviewed-on: http://gerrit.cloudera.org:8080/14049
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Vihang Karajgaonkar <vihang@cloudera.com>