Files
impala/tests/util/event_processor_utils.py
Vihang Karajgaonkar 5a9dcd108d IMPALA-8795: Turn on events processing by default
This commit turns on events processing by default. The default
polling interval is set as 1 second which can be overrriden by
setting hms_event_polling_interval_s to non-default value.

When the event polling turned on by default this patch also
moves the test_event_processing.py to tests/metadata instead
of custom cluster test. Some tests within test_event_processing.py
which needed non-default configurations were moved to
tests/custom_cluster/test_events_custom_configs.py.

Additionally, some other tests were modified to take into account
the automatic ability of Impala to detect newly added tables
from hive.

Testing done:
1. Ran exhaustive tests by turning on the events processing multiple
times.
2. Ran exhaustive tests by disabling events processing.
3. Ran dockerized tests.

Change-Id: I9a8b1871a98b913d0ad8bb26a104a296b6a06122
Reviewed-on: http://gerrit.cloudera.org:8080/17612
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
2021-08-09 17:22:31 +00:00

118 lines
4.6 KiB
Python

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Impala tests for Hive Metastore, covering the expected propagation
# of metadata from Hive to Impala or Impala to Hive. Each test
# modifies the metadata via Hive and checks that the modification
# succeeded by querying Impala, or vice versa.
import logging
import requests
import time
import json
from tests.common.impala_cluster import ImpalaCluster
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
LOG = logging.getLogger('event_processor_utils')
LOG.setLevel(level=logging.DEBUG)
class EventProcessorUtils(object):
DEFAULT_CATALOG_URL = "http://localhost:25020"
@staticmethod
def wait_for_event_processing(test_suite, timeout=10):
"""Waits till the event processor has synced to the latest event id from metastore
or the timeout value in seconds whichever is earlier"""
if EventProcessorUtils.get_event_processor_status() == "DISABLED":
return
success = False
assert timeout > 0
assert test_suite.hive_client is not None
current_event_id = EventProcessorUtils.get_current_notification_id(
test_suite.hive_client)
LOG.info("Waiting until events processor syncs to event id:" + str(current_event_id))
end_time = time.time() + timeout
while time.time() < end_time:
last_synced_id = EventProcessorUtils.get_last_synced_event_id()
if last_synced_id >= current_event_id:
LOG.debug(
"Metric last-synced-event-id has reached the desired value:" + str(
last_synced_id))
success = True
break
time.sleep(0.1)
if not success:
raise Exception(
"Event processor did not sync till last known event id {0} \
within {1} seconds".format(current_event_id, timeout))
if isinstance(test_suite, CustomClusterTestSuite):
impala_cluster = test_suite.cluster
else:
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
# Wait until the impalad catalog versions agree with the catalogd's version.
catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
for impalad in impala_cluster.impalads:
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
allow_greater=True)
return success
@staticmethod
def get_event_processor_metrics():
"""Scrapes the catalog's /events webpage and return a dictionary with the event
processor metrics."""
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
assert response.status_code == requests.codes.ok
varz_json = json.loads(response.text)
metrics = varz_json["event_processor_metrics"].strip().splitlines()
# Helper to strip a pair of elements
def strip_pair(p):
return (p[0].strip(), p[1].strip())
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
return dict(pairs)
@staticmethod
def get_event_processor_metric(metric_key, default_val=None):
"""Returns the event processor metric from the /events catalog debug page"""
metrics = EventProcessorUtils.get_event_processor_metrics()
if metric_key not in metrics:
return default_val
return metrics[metric_key]
@staticmethod
def get_last_synced_event_id():
"""Returns the last_synced_event_id."""
metrics = EventProcessorUtils.get_event_processor_metrics()
assert 'last-synced-event-id' in metrics.keys()
return int(metrics['last-synced-event-id'])
@staticmethod
def get_event_processor_status():
"""
Returns the current status of the EventsProcessor
"""
metrics = EventProcessorUtils.get_event_processor_metrics()
assert 'status' in metrics.keys()
return metrics['status']
@staticmethod
def get_current_notification_id(hive_client):
"""Returns the current notification from metastore"""
assert hive_client is not None
return int(hive_client.get_current_notificationEventId().eventId)