Files
impala/tests/custom_cluster/test_event_processing_error.py
stiga-huang b410a90b47 IMPALA-12152: Add query options to wait for HMS events sync up
It's a common scenario to run Impala queries after the dependent
external changes are done. E.g. running COMPUTE STATS on a table after
Hive/Spark jobs ingest some new data to it. Currently, it's unsafe to
run the Impala queries immediately after the Hive/Spark jobs finish
since EventProcessor might have a long lag in applying the HMS events.
Note that running REFRESH/INVALIDATE on the table can also solve the
problem. But one of the motivation of EventProcessor is to get rid of
such Impala specific commands.

This patch adds a mechanism to let query planning wait until the
metadata is synced up. Two new query options are added:
 - SYNC_HMS_EVENTS_WAIT_TIME_S configures the timeout in seconds for
   waiting. It's 0 by default, which disables the waiting mechanism.
 - SYNC_HMS_EVENTS_STRICT_MODE controls the behavior if we can't wait
   for metadata to be synced up, e.g. when the waiting times out or
   EventProcessor is in ERROR state. It defaults to false (non-strict
   mode). In the strict mode, coordinator will fail the query. In the
   non-strict mode, coordinator will start planning with a warning
   message in profile (and in client outputs if the client consumes the
   get_log results, e.g. in impala-shell).

Example usage - query the table after inserting into dynamic partitions
in Hive. We don't know what partitions are modified so running REFRESH
in Impala is inefficient since it reloads all partitions.
  hive> insert into tbl partition(p) select * from tbl2;
  impala> set sync_hms_events_wait_time_s=300;
  impala> select * from tbl;
With this new feature, let catalogd reload the updated partitions based
on HMS events, which is more efficient than REFRESH. The wait time can
be set to the largest lag of event processing that has been observed in
the cluster. Note the lag of event processing is shown as the "Lag time"
in the /events page of catalogd WebUI and "events-processor.lag-time" in
the /metrics page. Users can monitor it to get a sense of the lag.

Some timeline items are added in query profile for this waiting
mechanism, e.g.
A succeeded wait:
    Query Compilation: 937.279ms
       - Synced events from Metastore: 909.162ms (909.162ms)
       - Metadata of all 1 tables cached: 911.005ms (1.843ms)
       - Analysis finished: 919.600ms (8.595ms)

A failed wait:
    Query Compilation: 1s321ms
       - Continuing without syncing Metastore events: 40.883ms (40.883ms)
       - Metadata load started: 41.618ms (735.633us)

Added a histogram metric, impala-server.wait-for-hms-event-durations-ms,
to track the duration of this waiting.

--------
Implementation

A new catalogd RPC, WaitForHmsEvent, is added to CatalogService API so
that coordinator can wait until catalogd processes the latest event when
this RPC is triggered. Query planning starts or fails after this RPC
returns. The RPC request contains the potential dbs/tables that are
required by the query. Catalogd records the latest event id when it
receives this RPC. When the last synced event id reaches this, catalogd
returns the catalog updates to the coordinator in the RPC response.
Before that, the RPC thread is in a waiting loop that sleeps in a
configurable interval. It's configured by a hidden flag,
hms_event_sync_sleep_interval_ms (defaults to 100).

Entry-point functions
 - Frontend#waitForHmsEvents()
 - CatalogServiceCatalog#waitForHmsEvent()

Some statements don't need to wait for HMS events, e.g. CREATE/DROP ROLE
statements. This patch adds an overrided method, requiresHmsMetadata(),
in each Statement to mark whether they can skip HMS event sync.

Test side changes:
 - Some test codes use EventProcessorUtils.wait_for_event_processing()
   to wait for HMS events being synced up before running a query. Now
   they are updated to just use these new query options in the query.
 - Note that we still need wait_for_event_processing() in test codes
   that verify metrics after HMS events are synced up.

--------
Limitation

Currently, UPDATE_TBL_COL_STAT_EVENT, UPDATE_PART_COL_STAT_EVENT,
OPEN_TXN events are ignored by the event processor. If the latest event
happens to be in these types and there are no more other events, the
last synced event id can never reach the latest event id. We need to fix
last synced event id to also consider ignored events (IMPALA-13623).

The current implementation waits for the event id when the
WaitForHmsEvent RPC is received at catalogd side. We can improve it
by leveraging HIVE-27499 to efficiently detect whether the given
dbs/tables have unsynced events and just wait for the *largest* id
of them. Dbs/tables without unsynced events don't need to block query
planning. However, this only works for non-transactional tables.
Transactional tables might be modified by COMMIT_TXN or ABORT_TXN events
which don't have the table names. So even with HIVE-27499, we can't
determine whether a transactional table has pending events. IMPALA-13684
will target on improving this on non-transactional tables.

Tests
 - Add test to verify planning waits until catalogd is synced with HMS
   changes.
 - Add test on the error handling when HMS event processing is disabled

Change-Id: I36ac941bb2c2217b09fcfa2eb567b011b38efa2a
Reviewed-on: http://gerrit.cloudera.org:8080/20131
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-02-22 18:14:37 +00:00

392 lines
19 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import range
from hive_metastore.ttypes import FireEventRequest
from hive_metastore.ttypes import FireEventRequestData
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfCatalogV2, SkipIfFS
from tests.metadata.test_event_processing_base import TestEventProcessingBase
from tests.util.acid_txn import AcidTxn
from tests.util.event_processor_utils import EventProcessorUtils
@SkipIfCatalogV2.hms_event_polling_disabled()
class TestEventProcessingError(CustomClusterTestSuite):
"""
Tests for verify event processor not going into error state whenever there are
runtime exceptions while processing events.
"""
@classmethod
def get_workload(self):
return 'functional-query'
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--invalidate_metadata_on_event_processing_failure=false "
"--inject_process_event_failure_event_types='ALTER_TABLE' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_sanity_check(self, unique_database):
"""Tests event processor going into error state for alter table event"""
tbl_name = "hive_alter_table"
self.__create_table_and_load__(unique_database, tbl_name, False, False)
self.run_stmt_in_hive(
"alter table {}.{} set owner user `test-user`"
.format(unique_database, tbl_name))
try:
EventProcessorUtils.wait_for_event_processing(self)
except Exception:
assert EventProcessorUtils.get_event_processor_status() == "ERROR"
self.client.execute("INVALIDATE METADATA")
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ALTER_TABLE' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_alter_table(self, unique_database):
"""Tests event processor going into error state for alter table event"""
tbl_name = "hive_alter_table"
self.__create_table_and_load__(unique_database, tbl_name, False, False)
self.run_stmt_in_hive(
"alter table {}.{} set owner user `test-user`"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
self.verify_owner_property(result, 'test-user')
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ADD_PARTITION' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_add_partition(self, unique_database):
"""Tests event processor going into error state for add partition event"""
tbl_name = "hive_table_add_partition"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
self.client.execute("describe {}.{}".format(unique_database, tbl_name))
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("show partitions {}.{}"
.format(unique_database, tbl_name))
# First line is the header. Only one partition should be shown so the
# result has two lines.
assert "hive_table_add_partition/year=2024" in result.get_data()
assert len(result.data) == 2
@SkipIfFS.hive
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ALTER_PARTITION' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_alter_partition(self, unique_database):
"""Tests event processor going into error state for alter partition event"""
tbl_name = "hive_table_alter_partition"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
self.run_stmt_in_hive(
"analyze table {}.{} compute statistics"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("show partitions {}.{}"
.format(unique_database, tbl_name))
assert "2024" in result.get_data()
assert len(result.data) == 2
@SkipIfFS.hive
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ALTER_PARTITIONS' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_alter_partitions(self, unique_database):
"""Tests event processor going into error state for batch alter partitions event"""
tbl_name = "hive_table_alter_partitions"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
for i in range(5):
self.client.execute(
"alter table {}.{} add partition(year={})"
.format(unique_database, tbl_name, i))
self.run_stmt_in_hive(
"analyze table {}.{} compute statistics"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("show partitions {}.{}"
.format(unique_database, tbl_name))
for i in range(5):
assert str(i) in result.get_data()
assert len(result.data) == 6
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='INSERT' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_insert_event(self, unique_database):
"""Tests event processor going into error state for insert event"""
tbl_name = "hive_table_insert"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
for _ in range(2):
self.client.execute(
"insert into {}.{} partition(year=2024) values (1),(2),(3)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("select count(*) from {}.{}"
.format(unique_database, tbl_name))
assert result.data[0] == '6'
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='INSERT_PARTITIONS' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_insert_events(self, unique_database):
"""Tests event processor going into error state for insert partitions event"""
tbl_name = "hive_table_insert_partitions"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
for _ in range(2):
self.client.execute(
"insert into {}.{} partition(year=2024) values (1),(2),(3)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("select count(*) from {}.{}"
.format(unique_database, tbl_name))
assert result.data[0] == '6'
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='DROP_PARTITION' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_drop_partition(self, unique_database):
"""Tests event processor going into error state for drop partitions event"""
tbl_name = "hive_table_drop_partition"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
self.run_stmt_in_hive(
"alter table {}.{} drop partition(year=2024)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("show partitions {}.{}"
.format(unique_database, tbl_name))
assert "2024" not in result.get_data()
assert len(result.data) == 1
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='RELOAD' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_reload_event(self, unique_database):
"""Tests event processor going into error state for reload event"""
tbl_name = "hive_table_reload_event"
self.__create_table_and_load__(unique_database, tbl_name, False, True)
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
# Refresh at table level
data = FireEventRequestData()
data.refreshEvent = True
req = FireEventRequest(True, data)
req.dbName = unique_database
req.tableName = tbl_name
self.hive_client.fire_listener_event(req)
EventProcessorUtils.wait_for_event_processing(self)
# refresh at partition level
req.partitionVals = ["2024"]
self.hive_client.fire_listener_event(req)
EventProcessorUtils.wait_for_event_processing(self)
# invalidate at table level
data.refreshEvent = False
req = FireEventRequest(True, data)
req.dbName = unique_database
req.tableName = tbl_name
self.hive_client.fire_listener_event(req)
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
@SkipIfFS.hive
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types="
"'COMMIT_COMPACTION_EVENT' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_commit_compaction_event(self, unique_database):
"""Tests event processor going into error state for commit compaction event"""
tbl_name = "hive_table_commit_compaction"
self.__create_table_and_load__(unique_database, tbl_name, True, False)
for _ in range(2):
self.run_stmt_in_hive(
"insert into {}.{} values (1),(2),(3)"
.format(unique_database, tbl_name))
self.run_stmt_in_hive(
"alter table {}.{} compact 'minor' and wait"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ALLOC_WRITE_ID_EVENT' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_alloc_write_id_event(self, unique_database):
tbl_name = "hive_table_alloc_write_id"
self.__create_table_and_load__(unique_database, tbl_name, True, True)
acid = AcidTxn(self.hive_client)
txn_id = acid.open_txns()
acid.allocate_table_write_ids(txn_id, unique_database, tbl_name)
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
@SkipIfFS.hive
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='COMMIT_TXN' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_commit_txn(self, unique_database):
tbl_name = "hive_table_commit_txn"
self.__create_table_and_load__(unique_database, tbl_name, True, True)
self.run_stmt_in_hive(
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_event_types='ABORT_TXN' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_abort_txn_event(self, unique_database):
tbl_name = "hive_table_abort_txn"
acid = AcidTxn(self.hive_client)
self.__create_table_and_load__(unique_database, tbl_name, True, True)
txn_id = acid.open_txns()
acid.abort_txn(txn_id)
EventProcessorUtils.wait_for_event_processing(self)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--invalidate_metadata_on_event_processing_failure=false "
"--invalidate_global_metadata_on_event_processing_failure=true "
"--inject_process_event_failure_event_types="
"'ALTER_TABLE, ADD_PARTITION' "
"--hms_event_polling_interval_s=2")
def test_event_processor_error_global_invalidate(self, unique_database):
"""Test to verify that auto global invalidate put back EP to active
when it goes into error state"""
tbl_name = "hive_table_global_invalidate"
self.__create_table_and_load__(unique_database, tbl_name, True, True)
self.run_stmt_in_hive(
"alter table {}.{} set owner user `test-user`"
.format(unique_database, tbl_name))
self.run_stmt_in_hive(
"alter table {}.{} add partition(year=2024)"
.format(unique_database, tbl_name))
EventProcessorUtils.wait_for_event_processing(self, error_status_possible=True)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
result = self.client.execute("describe formatted {}.{}"
.format(unique_database, tbl_name))
self.verify_owner_property(result, 'test-user')
result = self.client.execute("show partitions {}.{}"
.format(unique_database, tbl_name))
assert "2024" in result.get_data()
@SkipIfFS.hive
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--inject_process_event_failure_ratio=0.5 "
"--inject_process_event_failure_event_types="
"'ALTER_TABLE,ADD_PARTITION,"
"ALTER_PARTITION,INSERT,ABORT_TXN,COMMIT_TXN'")
def test_event_processor_error_stress_test(self, unique_database):
"""Executes inserts for transactional tables and external tables. Also runs
replication tests
"""
# inserts on transactional tables
TestEventProcessingBase._run_test_insert_events_impl(unique_database, True)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
try:
test_db = unique_database + "_no_transact"
self.run_stmt_in_hive("""create database {}""".format(test_db))
# inserts on external tables
TestEventProcessingBase._run_test_insert_events_impl(test_db, False)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
finally:
self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
# replication related tests
TestEventProcessingBase._run_event_based_replication_tests_impl(
self.filesystem_client)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
def __create_table_and_load__(self, db_name, table_name, is_transactional,
is_partitioned):
create_query = " ".join(["create", " transactional " if is_transactional else '',
"table `{}`.`{}`(i int)", " partitioned by (year int) " if is_partitioned else ''])
self.run_stmt_in_hive(create_query.format(db_name, table_name))
EventProcessorUtils.wait_for_event_processing(self)
# Make the table loaded
self.client.execute("describe {}.{}".format(db_name, table_name))
@staticmethod
def verify_owner_property(result, user_name):
match = False
for row in result.data:
fields = row.split("\t")
if "Owner:" in fields[0]:
assert user_name == fields[1].strip()
match = True
assert True, match