IMPALA-14227: (Addendum) Add more tests for catalogd HA warm failover

This adds more tests in test_catalogd_ha.py for warm failover.
Refactored _test_metadata_after_failover to run in the following way:
 - Run DDL/DML in the active catalogd.
 - Kill the active catalogd and wait until the failover finishes.
 - Verify the DDL/DML results in the new active catalogd.
 - Restart the killed catalogd
It accepts two methods in parameters to perform the DDL/DML and the
verifier. In the last step, the killed catalogd is started so we keep
having 2 catalogd and can merge these into a single test by invoking
_test_metadata_after_failover for different method pairs. This saves
some test time.

The following DDL/DML statements are tested:
 - CreateTable
 - AddPartition
 - REFRESH
 - DropPartition
 - INSERT
 - DropTable
After each failover, the table is verified to be warmed up (i.e. loaded).

Also validate flags in startup to make sure enable_insert_events and
enable_reload_events are both set to true when warm failover is enabled,
i.e. --catalogd_ha_reset_metadata_on_failover=false.

Change-Id: I6b20adeb0bd175592b425e521138c41196347600
Reviewed-on: http://gerrit.cloudera.org:8080/23206
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This commit is contained in:
stiga-huang
2025-07-22 10:55:08 +08:00
committed by Wenzhe Zhou
parent 5fc66bfabc
commit 5bdd9c7f39
6 changed files with 195 additions and 56 deletions

View File

@@ -87,6 +87,8 @@ DECLARE_int64(thrift_rpc_max_message_size);
DECLARE_int64(thrift_external_rpc_max_message_size);
DECLARE_double(hms_event_polling_interval_s);
DECLARE_bool(catalogd_ha_reset_metadata_on_failover);
DECLARE_bool(enable_insert_events);
DECLARE_bool(enable_reload_events);
DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds "
"between memory maintenance iterations");
@@ -564,12 +566,23 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize()));
}
if (!FLAGS_catalogd_ha_reset_metadata_on_failover
&& FLAGS_hms_event_polling_interval_s <= 0) {
CLEAN_EXIT_WITH_ERROR(Substitute(
"Invalid hms_event_polling_interval_s: $0. It should be larger than 0 when "
"--catalogd_ha_reset_metadata_on_failover is false",
FLAGS_hms_event_polling_interval_s));
if (!FLAGS_catalogd_ha_reset_metadata_on_failover) {
if (FLAGS_hms_event_polling_interval_s <= 0) {
CLEAN_EXIT_WITH_ERROR(Substitute(
"Invalid hms_event_polling_interval_s: $0. It should be larger than 0 when "
"--catalogd_ha_reset_metadata_on_failover is false",
FLAGS_hms_event_polling_interval_s));
}
if (!FLAGS_enable_insert_events) {
CLEAN_EXIT_WITH_ERROR(Substitute(
"--enable_insert_events should be true when "
"--catalogd_ha_reset_metadata_on_failover is false"));
}
if (!FLAGS_enable_reload_events) {
CLEAN_EXIT_WITH_ERROR(Substitute(
"--enable_reload_events should be true when "
"--catalogd_ha_reset_metadata_on_failover is false"));
}
}
impala::InitGoogleLoggingSafe(argv[0]);

View File

@@ -175,6 +175,7 @@ testdata/data/sfs_d2.txt
testdata/data/sfs_d4.txt
testdata/data/load_data_with_catalog_v1.txt
testdata/data/warmup_table_list.txt
testdata/data/warmup_test_config.txt
testdata/datasets/functional/functional_schema_template.sql
testdata/impala-profiles/README
testdata/impala-profiles/impala_profile_log_tpcds_compute_stats

View File

@@ -481,7 +481,7 @@ public class CatalogServiceCatalog extends Catalog {
keepsWarmupTablesLoaded_ = BackendConfig.INSTANCE.keepsWarmupTablesLoaded();
warmupTables_ = FileSystemUtil.loadWarmupTableNames(
BackendConfig.INSTANCE.getWarmupTablesConfigFile());
LOG.info("Loaded {} table names to warmup", warmupTables_.size());
LOG.info("Loaded {} table names to warmup:\n{}", warmupTables_.size(), warmupTables_);
}
/**

1
testdata/data/warmup_test_config.txt vendored Normal file
View File

@@ -0,0 +1 @@
warmup_test_db.*

View File

@@ -23,7 +23,7 @@ import requests
import time
from builtins import round
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite, IMPALA_HOME
from tests.common.environ import build_flavor_timeout
from tests.common.impala_connection import ERROR
from tests.common.parametrize import UniqueDatabase
@@ -175,8 +175,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
def __test_catalogd_auto_failover(self, unique_database):
"""Stop active catalogd and verify standby catalogd becomes active.
Restart original active catalogd. Verify that statestore does not resume its
active role. If test_query_fail_during_failover is True, run a query during failover
and comfirm that it is fail."""
active role. Run a query during failover and comfirm that it is fail."""
(active_catalogd, standby_catalogd) = self.__get_catalogds()
catalogd_service_1 = active_catalogd.service
catalogd_service_2 = standby_catalogd.service
@@ -241,7 +240,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args=SS_AUTO_FAILOVER_ARGS,
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_auto_failover(self, unique_database):
"""Tests for Catalog Service auto fail over without failed RPCs."""
@@ -259,7 +259,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
statestored_args=(
SS_AUTO_FAILOVER_ARGS
+ "--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0"),
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
"""Tests for Catalog Service auto fail over with failed RPCs."""
@@ -368,7 +369,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
"--statestore_heartbeat_frequency_ms=1000",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_manual_failover(self, unique_database):
"""Tests for Catalog Service manual fail over without failed RPCs."""
@@ -386,7 +388,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
"--statestore_heartbeat_frequency_ms=1000 "
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_manual_failover_with_failed_rpc(self, unique_database):
"""Tests for Catalog Service manual fail over with failed RPCs."""
@@ -522,10 +525,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=true "
"--catalog_topic_mode=minimal",
impalad_args="--use_local_catalog=true",
start_args="--enable_catalogd_ha")
def test_metadata_after_failover(self, unique_database):
self._test_metadata_after_failover(unique_database)
self._test_metadata_after_failover(
unique_database, self._create_native_fn, self._verify_native_fn)
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -535,43 +543,54 @@ class TestCatalogdHA(CustomClusterTestSuite):
impalad_args="--use_local_catalog=true",
start_args="--enable_catalogd_ha")
def test_metadata_after_failover_with_delayed_reset(self, unique_database):
self._test_metadata_after_failover(unique_database)
self._test_metadata_after_failover(
unique_database, self._create_native_fn, self._verify_native_fn)
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--catalog_topic_mode=minimal "
"--catalog_topic_mode=minimal --enable_reload_events=true "
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
impalad_args="--use_local_catalog=true",
start_args="--enable_catalogd_ha")
def test_metadata_after_failover_with_hms_sync(self, unique_database):
self._test_metadata_after_failover(unique_database, skip_func_test=True)
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
"--warmup_tables_config_file="
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
def test_warmed_up_metadata_after_failover(self, unique_database):
"""Verify that the metadata is warmed up in the standby catalogd."""
for catalogd in self.__get_catalogds():
self._test_warmed_up_tables(catalogd.service)
latest_catalogd = self._test_metadata_after_failover(
unique_database, skip_func_test=True)
self._test_warmed_up_tables(latest_catalogd)
# TODO: due to IMPALA-14210 the standby catalogd can't update the native function
# list by applying the ALTER_DATABASE event. So not testing native functions
# creation until IMPALA-14210 is resolved.
active_catalogd, _ = self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
self._test_warmed_up_tables(active_catalogd.service)
active_catalogd, _ = self._test_metadata_after_failover(
unique_database, self._drop_table, self._verify_table_dropped)
self._test_warmed_up_tables(active_catalogd.service)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
"--catalogd_ha_failover_catchup_timeout_s=2 "
"--warmup_tables_config_file="
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
def test_failover_catchup_timeout_and_reset(self, unique_database):
self._test_metadata_after_failover(unique_database, skip_func_test=True)
self._test_metadata_after_failover(
unique_database, self._create_new_table, self._verify_new_table)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -579,15 +598,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
"--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
"--catalogd_ha_failover_catchup_timeout_s=2 "
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
"--warmup_tables_config_file="
"--enable_reload_events=true --warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
def test_failover_catchup_timeout_not_reset(self, unique_database):
# Use allow_table_not_exists=True since the table is missing due to catalog not reset.
latest_catalogd = self._test_metadata_after_failover(
unique_database, allow_table_not_exists=True, skip_func_test=True)
# Skip verifying the table existence since it's missing due to catalog not reset.
latest_catalogd, _ = self._test_metadata_after_failover(
unique_database, self._create_new_table, self._noop_verifier)
# Verify tables are still loaded
self._test_warmed_up_tables(latest_catalogd)
self._test_warmed_up_tables(latest_catalogd.service)
# Run a global IM to bring up 'unique_database' in the new catalogd. Otherwise, the
# cleanup_database step will fail.
self.execute_query("invalidate metadata")
@@ -599,22 +618,126 @@ class TestCatalogdHA(CustomClusterTestSuite):
catalogd.verify_table_metadata_loaded(db, table)
catalogd.verify_table_metadata_loaded(db, "store", expect_loaded=False)
def _test_metadata_after_failover(self, unique_database,
allow_table_not_exists=False, skip_func_test=False):
"""Verify that the metadata is correct after failover. Returns the current active
catalogd"""
(active_catalogd, standby_catalogd) = self.__get_catalogds()
catalogd_service_2 = standby_catalogd.service
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalog_topic_mode=minimal "
"--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000 "
"--enable_reload_events=true --warmup_tables_config_file="
"file://%s/testdata/data/warmup_test_config.txt" % IMPALA_HOME,
impalad_args="--use_local_catalog=true",
start_args="--enable_catalogd_ha")
def test_warmed_up_metadata_failover_catchup(self):
"""All tables under the 'warmup_test_db' will be warmed up based on the config.
Use local-catalog mode so coordinator needs to fetch metadata from catalogd after
each DDL."""
db = "warmup_test_db"
self.execute_query("create database if not exists " + db)
try:
self._test_metadata_after_failover(
db, self._create_new_table, self._verify_new_table)
active_catalogd, _ = self._test_metadata_after_failover(
db, self._add_new_partition, self._verify_new_partition)
active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
active_catalogd, _ = self._test_metadata_after_failover(
db, self._refresh_table, self._verify_refresh)
active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
active_catalogd, _ = self._test_metadata_after_failover(
db, self._drop_partition, self._verify_no_partitions)
active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
active_catalogd, _ = self._test_metadata_after_failover(
db, self._insert_table, self._verify_insert)
active_catalogd.service.verify_table_metadata_loaded(db, "tbl")
self._test_metadata_after_failover(
db, self._drop_table, self._verify_table_dropped)
finally:
for i in range(2):
try:
self.execute_query("drop database if exists %s cascade" % db)
break
except Exception as e:
# Retry in case we hit IMPALA-14228.
LOG.warn("Ignored cleanup failure: " + str(e))
def _create_native_fn(self, unique_database):
create_func_impala = ("create function {database}.identity_tmp(bigint) "
"returns bigint location '{location}' symbol='Identity'")
self.client.execute(create_func_impala.format(
database=unique_database,
location=get_fs_path('/test-warehouse/libTestUdfs.so')))
database=unique_database,
location=get_fs_path('/test-warehouse/libTestUdfs.so')))
self.execute_query_expect_success(
self.client, "select %s.identity_tmp(10)" % unique_database)
self.client, "select %s.identity_tmp(10)" % unique_database)
self.client.execute("create table %s.tbl(i int)" % unique_database)
def _verify_native_fn(self, unique_database):
self.execute_query_expect_success(
self.client, "select %s.identity_tmp(10)" % unique_database)
def _create_new_table(self, unique_database):
table_location = get_fs_path("/test-warehouse/%s.tbl" % unique_database)
self.client.execute("create table %s.tbl like functional.alltypes"
" stored as parquet location '%s'"
% (unique_database, table_location))
def _verify_new_table(self, unique_database):
self.execute_query("describe %s.tbl" % unique_database)
def _drop_table(self, unique_database):
self.client.execute("drop table %s.tbl" % unique_database)
def _verify_table_dropped(self, unique_database):
res = self.client.execute("show tables in " + unique_database)
assert len(res.data) == 0
def _add_new_partition(self, unique_database):
self.execute_query("alter table %s.tbl add partition(year=2025, month=1)" %
unique_database)
def _verify_new_partition(self, unique_database):
res = self.execute_query("show partitions %s.tbl" % unique_database)
LOG.info("partition result: {}".format(res.data))
assert "year=2025/month=1" in res.data[0]
def _refresh_table(self, unique_database):
"""Add a new file to the table and refresh it"""
table_location = get_fs_path("/test-warehouse/%s.tbl" % unique_database)
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
dst_path = "%s/year=2025/month=1/alltypes.parq" % table_location
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
self.execute_query("refresh %s.tbl partition (year=2025, month=1)" % unique_database)
def _verify_refresh(self, unique_database):
res = self.execute_query("select count(*) from %s.tbl where year=2025 and month=1"
% unique_database)
assert res.data == ["1000"]
def _drop_partition(self, unique_database):
self.execute_query("alter table %s.tbl drop partition(year=2025, month=1)"
% unique_database)
def _verify_no_partitions(self, unique_database):
res = self.execute_query("show partitions %s.tbl" % unique_database)
# The result should only have the "Total" line.
assert len(res.data) == 1
def _insert_table(self, unique_database):
self.execute_query("insert into %s.tbl partition(year, month)"
" select * from functional.alltypes" % unique_database)
def _verify_insert(self, unique_database):
res = self.execute_scalar("select count(*) from %s.tbl" % unique_database)
assert res == '7300'
def _noop_verifier(self, unique_database): # noqa: U100
pass
def _test_metadata_after_failover(self, unique_database, metadata_op_fn, verifier_fn):
"""Verify that the metadata is correct after failover. Returns the updated tuple of
(active_catalogd, standby_catalogd)"""
(active_catalogd, standby_catalogd) = self.__get_catalogds()
catalogd_service_2 = standby_catalogd.service
metadata_op_fn(unique_database)
# Kill active catalogd
active_catalogd.kill()
@@ -627,22 +750,22 @@ class TestCatalogdHA(CustomClusterTestSuite):
"catalog-server.ha-number-active-status-change") > 0
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
# TODO: due to IMPALA-14210 the standby catalogd can't update the native function
# list by applying the ALTER_DATABASE event. So this will fail as function not found.
# Remove this condition after IMPALA-14210 is resolved.
if not skip_func_test:
self.execute_query_expect_success(
self.client, "select %s.identity_tmp(10)" % unique_database)
for i in range(2):
try:
verifier_fn(unique_database)
break
except Exception as e:
if i == 0:
# Due to IMPALA-14228, we allow retry on connection failure to the previous
# active catalogd. Example error message:
# Couldn't open transport for xxx:26000 (connect() failed: Connection refused)
assert str(active_catalogd.service.service_port) in str(e)
LOG.info("Retry for error " + str(e))
continue
assert False, str(e)
# Check if the new active catalogd has the new table in its cache.
try:
self.execute_query("describe %s.tbl" % unique_database)
except Exception as e:
if not allow_table_not_exists:
# Due to IMPALA-14228, the query could still fail. But it's not due to stale
# metadata so allow this until we resolve IMPALA-14228.
assert "Error making an RPC call to Catalog server" in str(e)
return catalogd_service_2
active_catalogd.start()
return standby_catalogd, active_catalogd
def test_page_with_disable_ha(self):
self.__test_catalog_ha_info_page()

View File

@@ -113,7 +113,8 @@ class TestExtDataSources(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
"--statestore_heartbeat_frequency_ms=1000",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_ha_failover(self):
"""The test case for cluster started with catalogd HA enabled."""