mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
It is a follow-up jira/commit to IMPALA-12709. IMPALA-12152 and
IMPALA-12785 are affected when hierarchical metastore event
processing feature is enabled.
Following changes are incorporated with this patch:
1. Added creationTime_ and dispatchTime_ fields in MetastoreEvent
class to store the current time in millisec. They are used to
calculate:
a) Event dispatch time(time between a MetastoreEvent object
creation and when event is moved to inProgressLog_ of
EventExecutorService after dispatching it to a
DbEventExecutor).
b) Event schedule delays incurred at DbEventExecutors and
TableEventExecutors(time between an event moved to
EventExecutorService's inProgressLog_ and before start of
processing event at appropriate DbEventExecutor and
TableEventExecutor).
c) Event process time from EventExecutorService point of
view(time spent in inProgressLog_ before it is moved to
processedLog_).
Logs are added to show the event dispatch time, schedule
delays, process time from EventExecutorService point of
view for each event. Also a log is added to show the time
taken for event's processIfEnabled().
2. Added isDelimiter_ field in MetastoreEvent class to indicate
whether it is a delimiter event. It is set only when
hierarchical event processing is enabled. Delimiter is a kind
of metastore event that do not require event processing.
Delimeter event can be:
a) A CommitTxnEvent that do not have any write event info for
a given transaction.
b) An AbortTxnEvent that do not have write ids for a given
transaction.
c) An IgnoredEvent.
An event is determined and marked as delimiter in
EventExecutorService#dispatch(). They are not queued to a
DbEventExecutor for processing. They are just maintained in
the inProgressLog_ to preserve continuity and correctness in
synchronization tracking. The delimiter events are removed from
inProgressLog_ when their preceding non-delimiter metastore
event is removed from inProgressLog_.
3. Greatest synced event id is computed based on the dispatched
events(inProgressLog_) and processed events(processedLog_) tree
maps. Greatest synced event is the latest event such that all
events with id less than or equal to the latest event are
definitely synced.
4. Lag is calculated as difference between latest event time on HMS
and the greatest synced event time. It is shown in the log.
5. Greatest synced event id is used in IMPALA-12152 changes. When
greatest synced event id becomes greater than or equal to
waitForEventId, all the required events are definitely synced.
6. Event processor is paused gracefully when paused with command in
IMPALA-12785. This ensures that all the fetched events from HMS in
current batch are processed before the event processor is fully
paused. It is necessary to process the current batch of events
because, certain events like AllocWriteIdEvent, AbortTxnEvent and
CommitTxnEvent update table write ids in catalog upon metastore
event object creation. And the table write ids are later updated
to appropriate table object during their event process. Can lead
to inconsistent state of write ids on table objects when paused
abruptly in the middle of current batch of event processing.
7. Added greatest synced event id and event time in events processor
metrics. And updated description of lag, pending events, last
synced event id and event time metrics.
8. Atomically update the event queue and increment outstanding event
count in enqueue methods of both DbProcessor and TableProcessor
so that respective process methods do not process the event until
event is added to queue and outstanding event count is incremented.
Otherwise, event can get processed, outstanding event count gets
decremented before it is incremented in enqueue method.
9. Refactored DbEventExecutor, DbProcessor, TableEventExecutor and
TableProcessor classes to propapage the exception occurred along
with event during event processing. EventProcessException is a
wrapper added to hold reference to event being processed and
exception occurred.
10.Added AcidTableWriteInfo helper class to store table, writeids
and partitions for the transaction id received in CommitTxnEvent.
Testing:
- Added new tests and executed existing end to end tests.
- Have executed the existing tests with hierarchical event processing
enabled.
Change-Id: I26240f36aaf85125428dc39a66a2a1e4d3197e85
Reviewed-on: http://gerrit.cloudera.org:8080/22997
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
193 lines
8.2 KiB
Python
193 lines
8.2 KiB
Python
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Impala tests for Hive Metastore, covering the expected propagation
|
|
# of metadata from Hive to Impala or Impala to Hive. Each test
|
|
# modifies the metadata via Hive and checks that the modification
|
|
# succeeded by querying Impala, or vice versa.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import json
|
|
import logging
|
|
import time
|
|
|
|
import requests
|
|
|
|
from impala_thrift_gen.hive_metastore.ttypes import NotificationEventRequest
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
|
|
LOG = logging.getLogger('event_processor_utils')
|
|
LOG.setLevel(level=logging.DEBUG)
|
|
|
|
|
|
class EventProcessorUtils(object):
|
|
|
|
DEFAULT_CATALOG_URL = "http://localhost:25020"
|
|
|
|
@staticmethod
|
|
def wait_for_synced_event_id(timeout, error_status_possible, target_event_id):
|
|
LOG.info("Waiting until events processor syncs to event id:" + str(
|
|
target_event_id))
|
|
# Wait more than timeout in case there is some progress in synced events.
|
|
# The goal is to decrease the effect of parallel DML/DDL tests that can
|
|
# also fire events that have to be processed and delay syncing up to the
|
|
# last event.
|
|
TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS = 10
|
|
total_timeot = timeout * TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS
|
|
status_list = ["ACTIVE", "PAUSED"]
|
|
if error_status_possible:
|
|
status_list.append("ERROR")
|
|
end_time = time.time() + total_timeot
|
|
last_synced_id = EventProcessorUtils.get_last_synced_event_id()
|
|
last_synced_time = time.time()
|
|
while True:
|
|
t = time.time()
|
|
current_synced_id = EventProcessorUtils.get_last_synced_event_id()
|
|
if current_synced_id >= target_event_id:
|
|
LOG.debug("Metric last-synced-event-id has reached the desired value: %d",
|
|
target_event_id)
|
|
break
|
|
status = EventProcessorUtils.get_event_processor_status()
|
|
if status not in status_list:
|
|
error_msg = EventProcessorUtils.get_event_processor_error_msg()
|
|
raise Exception("Event processor is not working. Status: {0}. Error msg: {1}"
|
|
.format(status, error_msg))
|
|
made_progress = current_synced_id > last_synced_id
|
|
if t >= end_time:
|
|
raise Exception(
|
|
"Event processor did not sync till last known event id {0} \
|
|
within {1} seconds".format(target_event_id, total_timeot))
|
|
elif not made_progress and t >= last_synced_time + timeout:
|
|
raise Exception(
|
|
"Event processor did not make progress since event id {0} \
|
|
within {1} seconds".format(last_synced_id, timeout))
|
|
if made_progress:
|
|
LOG.debug(
|
|
"Metric last-synced-event-id has been increased to %d but has not yet "
|
|
+ "reached the desired value: %d", current_synced_id, target_event_id)
|
|
last_synced_id = current_synced_id
|
|
last_synced_time = t
|
|
time.sleep(0.1)
|
|
|
|
@staticmethod
|
|
def wait_for_event_processing(test_suite, timeout=10, error_status_possible=False):
|
|
if isinstance(test_suite, CustomClusterTestSuite):
|
|
impala_cluster = test_suite.cluster
|
|
else:
|
|
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
|
|
EventProcessorUtils.wait_for_event_processing_impl(test_suite.hive_client,
|
|
impala_cluster, timeout, error_status_possible)
|
|
|
|
@staticmethod
|
|
def wait_for_event_processing_impl(hive_client, impala_cluster, timeout=10,
|
|
error_status_possible=False):
|
|
"""Waits till the event processor has synced to the latest event id from metastore
|
|
or the timeout value in seconds whichever is earlier"""
|
|
if EventProcessorUtils.get_event_processor_status() == "DISABLED":
|
|
return
|
|
assert timeout > 0
|
|
assert hive_client is not None
|
|
current_event_id = EventProcessorUtils.get_current_notification_id(
|
|
hive_client)
|
|
EventProcessorUtils.wait_for_synced_event_id(timeout, error_status_possible,
|
|
current_event_id)
|
|
# Wait until the impalad catalog versions agree with the catalogd's version.
|
|
# Some tests like test_overlap_min_max_filters have DDLs that take longer than 10s.
|
|
# The catalog update might be blocked due to that. Use 20s as the timeout here.
|
|
catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
|
|
for impalad in impala_cluster.impalads:
|
|
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
|
|
timeout=20, allow_greater=True)
|
|
|
|
@staticmethod
|
|
def get_event_processor_metrics():
|
|
"""Scrapes the catalog's /events webpage and return a dictionary with the event
|
|
processor metrics."""
|
|
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
|
|
assert response.status_code == requests.codes.ok
|
|
varz_json = json.loads(response.text)
|
|
metrics = varz_json["event_processor_metrics"].strip().splitlines()
|
|
|
|
# Helper to strip a pair of elements
|
|
def strip_pair(p):
|
|
return (p[0].strip(), p[1].strip())
|
|
|
|
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
|
|
return dict(pairs)
|
|
|
|
@staticmethod
|
|
def get_event_processor_error_msg():
|
|
"""Scrapes the catalog's /events webpage and return the error message (if exists) of
|
|
the event processor"""
|
|
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
|
|
assert response.status_code == requests.codes.ok
|
|
res_json = json.loads(response.text)
|
|
if "event_processor_error_msg" in res_json:
|
|
return res_json["event_processor_error_msg"].strip()
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_int_metric(metric_key, default_val=None):
|
|
"""Returns the int value of event processor metric from the /events catalogd debug
|
|
page"""
|
|
metrics = EventProcessorUtils.get_event_processor_metrics()
|
|
if metric_key not in metrics:
|
|
return int(default_val)
|
|
return int(metrics[metric_key])
|
|
|
|
@staticmethod
|
|
def get_last_synced_event_id():
|
|
"""Returns the greatest event id such that all the events with id less than or
|
|
equal to this event id are definitely synced. When hierarchical event processing is
|
|
enabled, greatest-synced-event-id returns the value. When hierarchical event
|
|
processing is disabled, greatest-synced-event-id and last-synced-event-id have same
|
|
value. So we can always return greatest-synced-event-id irrespective of hierarchical
|
|
event processing flag."""
|
|
metrics = EventProcessorUtils.get_event_processor_metrics()
|
|
assert 'greatest-synced-event-id' in metrics.keys()
|
|
return int(metrics['greatest-synced-event-id'])
|
|
|
|
@staticmethod
|
|
def get_num_skipped_events():
|
|
"""Returns number of skipped events from metrics"""
|
|
metrics = EventProcessorUtils.get_event_processor_metrics()
|
|
assert "events-skipped" in metrics.keys()
|
|
return int(metrics['events-skipped'])
|
|
|
|
@staticmethod
|
|
def get_event_processor_status():
|
|
"""
|
|
Returns the current status of the EventsProcessor
|
|
"""
|
|
metrics = EventProcessorUtils.get_event_processor_metrics()
|
|
assert 'status' in metrics.keys()
|
|
return metrics['status']
|
|
|
|
@staticmethod
|
|
def get_current_notification_id(hive_client):
|
|
"""Returns the current notification from metastore"""
|
|
assert hive_client is not None
|
|
return int(hive_client.get_current_notificationEventId().eventId)
|
|
|
|
@staticmethod
|
|
def get_next_notification(hive_client, last_event_id):
|
|
"""Returns notification events from metastore"""
|
|
assert hive_client is not None
|
|
assert last_event_id > 0
|
|
notification_event_request = NotificationEventRequest(lastEvent=last_event_id)
|
|
return hive_client.get_next_notification(notification_event_request).events
|