IMPALA-8592: Add support for insert events for 'LOAD DATA' statements

from Impala

In this patch, we use TUpdateCatalogRequest to refresh metadata after
'LOAD DATA' instead of TResetMetadataRequest so that we can reuse the
code for 'INSERT' statements. It will fire an insert event just same
as what we did for 'INSERT' statements.

We also fix the inconsistent indentation in event_processor_utils.py.

Testing:
- Run existing test_load.py
- Added test_load_data_from_impala() in test_event_processing.py

Change-Id: I7f1b470f40e0aaf891c9f3f327af393b2f9c74bc
Reviewed-on: http://gerrit.cloudera.org:8080/19052
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
This commit is contained in:
Yu-Wen Lai
2022-09-21 22:35:33 -07:00
committed by Quanlong Huang
parent 5e0cafd440
commit cf9c443ddc
7 changed files with 194 additions and 84 deletions

View File

@@ -23,68 +23,72 @@ import logging
import requests
import time
import json
from hive_metastore.ttypes import NotificationEventRequest
from tests.common.impala_cluster import ImpalaCluster
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
LOG = logging.getLogger('event_processor_utils')
LOG.setLevel(level=logging.DEBUG)
class EventProcessorUtils(object):
DEFAULT_CATALOG_URL = "http://localhost:25020"
@staticmethod
def wait_for_event_processing(test_suite, timeout=10):
"""Waits till the event processor has synced to the latest event id from metastore
or the timeout value in seconds whichever is earlier"""
if EventProcessorUtils.get_event_processor_status() == "DISABLED":
return
success = False
assert timeout > 0
assert test_suite.hive_client is not None
current_event_id = EventProcessorUtils.get_current_notification_id(
test_suite.hive_client)
LOG.info("Waiting until events processor syncs to event id:" + str(current_event_id))
end_time = time.time() + timeout
while time.time() < end_time:
last_synced_id = EventProcessorUtils.get_last_synced_event_id()
if last_synced_id >= current_event_id:
LOG.debug(
"Metric last-synced-event-id has reached the desired value:" + str(
last_synced_id))
success = True
break
time.sleep(0.1)
if not success:
raise Exception(
"Event processor did not sync till last known event id {0} \
within {1} seconds".format(current_event_id, timeout))
if isinstance(test_suite, CustomClusterTestSuite):
impala_cluster = test_suite.cluster
else:
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
# Wait until the impalad catalog versions agree with the catalogd's version.
catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
for impalad in impala_cluster.impalads:
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
allow_greater=True)
return success
"""Waits till the event processor has synced to the latest event id from metastore
or the timeout value in seconds whichever is earlier"""
if EventProcessorUtils.get_event_processor_status() == "DISABLED":
return
success = False
assert timeout > 0
assert test_suite.hive_client is not None
current_event_id = EventProcessorUtils.get_current_notification_id(
test_suite.hive_client)
LOG.info("Waiting until events processor syncs to event id:" + str(
current_event_id))
end_time = time.time() + timeout
while time.time() < end_time:
last_synced_id = EventProcessorUtils.get_last_synced_event_id()
if last_synced_id >= current_event_id:
LOG.debug(
"Metric last-synced-event-id has reached the desired value:" + str(
last_synced_id))
success = True
break
time.sleep(0.1)
if not success:
raise Exception(
"Event processor did not sync till last known event id {0} \
within {1} seconds".format(current_event_id, timeout))
if isinstance(test_suite, CustomClusterTestSuite):
impala_cluster = test_suite.cluster
else:
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
# Wait until the impalad catalog versions agree with the catalogd's version.
catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
for impalad in impala_cluster.impalads:
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
allow_greater=True)
return success
@staticmethod
def get_event_processor_metrics():
"""Scrapes the catalog's /events webpage and return a dictionary with the event
processor metrics."""
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
assert response.status_code == requests.codes.ok
varz_json = json.loads(response.text)
metrics = varz_json["event_processor_metrics"].strip().splitlines()
"""Scrapes the catalog's /events webpage and return a dictionary with the event
processor metrics."""
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
assert response.status_code == requests.codes.ok
varz_json = json.loads(response.text)
metrics = varz_json["event_processor_metrics"].strip().splitlines()
# Helper to strip a pair of elements
def strip_pair(p):
return (p[0].strip(), p[1].strip())
# Helper to strip a pair of elements
def strip_pair(p):
return (p[0].strip(), p[1].strip())
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
return dict(pairs)
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
return dict(pairs)
@staticmethod
def get_int_metric(metric_key, default_val=None):
@@ -104,10 +108,10 @@ class EventProcessorUtils(object):
@staticmethod
def get_num_skipped_events():
"""Returns number of skipped events from metrics"""
metrics = EventProcessorUtils.get_event_processor_metrics()
assert "events-skipped" in metrics.keys()
return int(metrics['events-skipped'])
"""Returns number of skipped events from metrics"""
metrics = EventProcessorUtils.get_event_processor_metrics()
assert "events-skipped" in metrics.keys()
return int(metrics['events-skipped'])
@staticmethod
def get_event_processor_status():
@@ -123,3 +127,11 @@ class EventProcessorUtils(object):
"""Returns the current notification from metastore"""
assert hive_client is not None
return int(hive_client.get_current_notificationEventId().eventId)
@staticmethod
def get_next_notification(hive_client, last_event_id):
"""Returns notification events from metastore"""
assert hive_client is not None
assert last_event_id > 0
notification_event_request = NotificationEventRequest(lastEvent=last_event_id)
return hive_client.get_next_notification(notification_event_request).events