IMPALA-14227: In HA failover, passive catalogd should apply pending HMS events before being active

After IMPALA-14074, the passive catalogd can have a warmed up metadata
cache during failover (with catalogd_ha_reset_metadata_on_failover=false
and a non-empty warmup_tables_config_file). However, it could still use
a stale metadata cache when some pending HMS events generated by the
previous active catalogd are not applied yet.

This patch adds a wait during HA failover to ensure HMS events before
the failover happens are all applied on the new active catalogd. The
timeout is configured by a new flag which defaults to 300 (5 minutes):
catalogd_ha_failover_catchup_timeout_s. When timeout happens, by default
catalogd will fallback to resetting all metadata. Users can decide
whether to reset or continue using the current cache. This is configured
by another flag, catalogd_ha_reset_metadata_on_failover_catchup_timeout.

Since the passive catalogd depends on HMS event processing to keep its
metadata up-to-date with the active catalogd, this patch adds validation
to avoid starting catalogd with catalogd_ha_reset_metadata_on_failover
set to false and hms_event_polling_interval_s <= 0.

This patch also makes catalogd_ha_reset_metadata_on_failover a
non-hidden flag so it's shown in the /varz web page.

Tests:
 - Ran test_warmed_up_metadata_after_failover 200 times. Without the
   fix, it usually fails in several runs.
 - Added new tests for the new flags.

Change-Id: Icf4fcb0e27c14197f79625749949b47c033a5f31
Reviewed-on: http://gerrit.cloudera.org:8080/23174
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-07-15 16:21:09 +08:00
committed by Impala Public Jenkins
parent 438461db9e
commit 64abca481f
13 changed files with 214 additions and 24 deletions

View File

@@ -171,11 +171,24 @@ DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log r
"to retain in catalogd. If -1, the operation log has unbounded size.");
// The standby catalogd may have stale metadata for some reason, like event processor
// could have hung or could be just behind in processing events. Also the standby
// catalogd doesn't get invalidate requests from coordinators so we should probably
// could have hung or could be just behind in processing events. So we should probably
// reset its metadata when it becomes active to avoid stale metadata.
DEFINE_bool_hidden(catalogd_ha_reset_metadata_on_failover, true, "If true, reset all "
"metadata when the catalogd becomes active.");
DEFINE_bool(catalogd_ha_reset_metadata_on_failover, true, "If true, reset all metadata "
"when the catalogd becomes active. If false, catalogd keeps using its current "
"metadata but will apply all pending HMS events before being active. Setting this "
"to false requires enabling HMS notification events processing, i.e. "
"hms_event_polling_interval_s > 0");
DEFINE_int32(catalogd_ha_failover_catchup_timeout_s, 300, "When "
"catalogd_ha_reset_metadata_on_failover is false, catalogd wait before transitioning to"
" the active state until pending HMS events are applied. This flag controls the timeout"
" for this wait.");
DEFINE_bool(catalogd_ha_reset_metadata_on_failover_catchup_timeout, true, "If true, "
"catalogd will reset all metadata when it times out in catching up HMS events in HA "
"failover. If false, catalogd ignored the pending HMS events when timeout happens and "
"continue transitioning to the active state. This is only used when "
"catalogd_ha_reset_metadata_on_failover is false.");
DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the entries "
"of the catalog topic update log are garbage collected. An entry may survive "
@@ -802,7 +815,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
int64_t active_catalogd_version, const TCatalogRegistration& catalogd_registration) {
lock_guard<mutex> l(catalog_lock_);
unique_lock<mutex> l(catalog_lock_);
if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
is_registration_reply, active_catalogd_version)) {
return;
@@ -836,6 +849,11 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
if (!status.ok()) {
LOG(ERROR) << "Failed to refresh data sources triggered by catalogd failover.";
}
// If HA state has been determined, this is a failover. Apply pending HMS events
// to avoid stale metadata. Note that only HMS events before the failover happens
// need to be applied. HMS events after that are OK to be applied later since they
// are not applied in the previous active catalogd as well.
if (is_ha_determined_) WaitUntilHmsEventsSynced(l);
}
// Signal the catalog update gathering thread to start.
topic_updates_ready_ = false;
@@ -860,6 +878,81 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
is_ha_determined_ = true;
}
void CatalogServer::WaitUntilHmsEventsSynced(const unique_lock<std::mutex>& lock) {
DCHECK(lock.mutex() == &catalog_lock_ && lock.owns_lock());
uint64_t timeout_ns = FLAGS_catalogd_ha_failover_catchup_timeout_s * NANOS_PER_SEC;
MonotonicStopWatch timeout_timer;
timeout_timer.Start();
LOG(INFO) << "Getting latest HMS event event id and waiting for EventProcessor to "
"sync up";
TEventProcessorMetricsSummaryResponse response;
TEventProcessorMetricsSummaryRequest req;
// Fetches the latest HMS event id from HMS directly in case the cached value in
// EventProcessor is stale. It's updated every 1s (by default) but could be stale due
// to slow HMS RPCs.
req.get_latest_event_from_hms = true;
Status status = catalog_->GetEventProcessorSummary(req, &response);
DCHECK(status.ok());
if (response.__isset.error_msg) {
triggered_first_reset_ = false;
LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata in failover";
return;
}
DCHECK(response.__isset.progress);
int64_t latest_hms_event_id = response.progress.latest_event_id;
int64_t last_synced_hms_event_id = response.progress.last_synced_event_id;
// If there are exceptions in fetching the latest HMS event id, 'latest_hms_event_id'
// is set to -1. Retry since we do need a correct value.
while (latest_hms_event_id < 0 && timeout_timer.ElapsedTime() < timeout_ns) {
SleepForMs(100);
status = catalog_->GetEventProcessorSummary(req, &response);
DCHECK(status.ok());
DCHECK(response.__isset.progress);
latest_hms_event_id = response.progress.latest_event_id;
}
if (latest_hms_event_id < 0) {
triggered_first_reset_ = false;
LOG(ERROR) << "Timed out getting the latest event id from HMS. Resetting all metadata"
"in failover";
return;
}
// Now that we figure out latest_hms_event_id from HMS, we loop and wait until
// last_synced_event_id catch up with this latest_hms_event_id or timeout passed.
// Setting get_latest_event_from_hms to false so GetEventProcessorSummary won't send
// HMS RPCs.
req.get_latest_event_from_hms = false;
while (last_synced_hms_event_id < latest_hms_event_id
&& timeout_timer.ElapsedTime() < timeout_ns) {
LOG(INFO) << "Wait until the pending "
<< (latest_hms_event_id - last_synced_hms_event_id)
<< " HMS events are applied. Latest event in HMS: id=" << latest_hms_event_id
<< ". Last synced event: id=" << last_synced_hms_event_id;
SleepForMs(REFRESH_METRICS_INTERVAL_MS);
status = catalog_->GetEventProcessorSummary(req, &response);
DCHECK(status.ok());
if (response.__isset.error_msg) {
triggered_first_reset_ = false;
LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata in "
"failover";
return;
}
DCHECK(response.__isset.progress);
last_synced_hms_event_id = response.progress.last_synced_event_id;
}
if (last_synced_hms_event_id < latest_hms_event_id) {
LOG(WARNING) << "Timed out waiting for catching up HMS events.";
if (FLAGS_catalogd_ha_reset_metadata_on_failover_catchup_timeout) {
triggered_first_reset_ = false;
LOG(INFO) << "Fallback to resetting all metadata.";
} else {
LOG(WARNING) << "Continue with the current cache. Metadata might be stale.";
}
return;
}
LOG(INFO) << "EventProcessor is synced up with HMS event id during failover: "
<< latest_hms_event_id;
}
[[noreturn]] void CatalogServer::TriggerResetMetadata() {
while (true) {
bool must_reset = false;
@@ -1173,8 +1266,10 @@ void CatalogServer::GetCatalogUsage(Document* document) {
void CatalogServer::EventMetricsUrlCallback(
const Webserver::WebRequest& req, Document* document) {
auto& allocator = document->GetAllocator();
TEventProcessorMetricsSummaryRequest request;
TEventProcessorMetricsSummaryResponse event_processor_summary_response;
Status status = catalog_->GetEventProcessorSummary(&event_processor_summary_response);
Status status = catalog_->GetEventProcessorSummary(
request, &event_processor_summary_response);
if (!status.ok()) {
Value error(status.GetDetail(), allocator);
document->AddMember("error", error, allocator);

View File

@@ -289,6 +289,12 @@ class CatalogServer {
void UpdateActiveCatalogd(bool is_registration_reply, int64_t active_catalogd_version,
const TCatalogRegistration& catalogd_registration);
/// Wait until the pending HMS events are applied. Used in HA failover before the
/// standby catalogd becomes active to get rid of stale metadata when
/// catalogd_ha_reset_metadata_on_failover is set to false. Callers should hold the
/// catalog_lock_.
void WaitUntilHmsEventsSynced(const std::unique_lock<std::mutex>& lock);
/// Returns the current active status of the catalogd.
bool IsActive();

View File

@@ -70,7 +70,7 @@ Catalog::Catalog() {
{"getOperationUsage", "()[B", &get_operation_usage_id_},
{"getCatalogVersion", "()J", &get_catalog_version_id_},
{"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
{"getEventProcessorSummary", "()[B", &get_event_processor_summary_},
{"getEventProcessorSummary", "([B)[B", &get_event_processor_summary_},
{"prioritizeLoad", "([B)V", &prioritize_load_id_},
{"getPartitionStats", "([B)[B", &get_partition_stats_id_},
{"updateTableUsage", "([B)V", &update_table_usage_id_},
@@ -191,9 +191,9 @@ Status Catalog::GetOperationUsage(TGetOperationUsageResponse* response) {
return JniUtil::CallJniMethod(catalog_, get_operation_usage_id_, response);
}
Status Catalog::GetEventProcessorSummary(
Status Catalog::GetEventProcessorSummary(const TEventProcessorMetricsSummaryRequest& req,
TEventProcessorMetricsSummaryResponse* response) {
return JniUtil::CallJniMethod(catalog_, get_event_processor_summary_, response);
return JniUtil::CallJniMethod(catalog_, get_event_processor_summary_, req, response);
}
Status Catalog::GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* response) {

View File

@@ -118,7 +118,8 @@ class Catalog {
/// Returns the metastore event processor summary view. The summary string
/// in the response can contain detailed metrics along with status
Status GetEventProcessorSummary(TEventProcessorMetricsSummaryResponse* response);
Status GetEventProcessorSummary(const TEventProcessorMetricsSummaryRequest& req,
TEventProcessorMetricsSummaryResponse* response);
/// Gets all functions in the catalog matching the parameters in the given
/// TFunctionsRequest.

View File

@@ -85,6 +85,8 @@ DECLARE_bool(symbolize_stacktrace);
DECLARE_string(debug_actions);
DECLARE_int64(thrift_rpc_max_message_size);
DECLARE_int64(thrift_external_rpc_max_message_size);
DECLARE_double(hms_event_polling_interval_s);
DECLARE_bool(catalogd_ha_reset_metadata_on_failover);
DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds "
"between memory maintenance iterations");
@@ -562,6 +564,14 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
FLAGS_thrift_external_rpc_max_message_size, ThriftDefaultMaxMessageSize()));
}
if (!FLAGS_catalogd_ha_reset_metadata_on_failover
&& FLAGS_hms_event_polling_interval_s <= 0) {
CLEAN_EXIT_WITH_ERROR(Substitute(
"Invalid hms_event_polling_interval_s: $0. It should be larger than 0 when "
"--catalogd_ha_reset_metadata_on_failover is false",
FLAGS_hms_event_polling_interval_s));
}
impala::InitGoogleLoggingSafe(argv[0]);
// Breakpad needs flags and logging to initialize.
if (!external_fe) {

View File

@@ -1083,6 +1083,13 @@ struct TCopyTestCaseReq {
1: required string input_path
}
struct TEventProcessorMetricsSummaryRequest {
// Whether to fetch the latest HMS event id using a HMS RPC or using the cached value
// in the EventProcessor. When set to true, 'latest_event_time_s' in the progress info
// will be -1 to save a HMS RPC.
1: required bool get_latest_event_from_hms = false
}
struct TEventBatchProgressInfo {
// Number of original HMS events received in the current batch.
1: required i32 num_hms_events

View File

@@ -116,6 +116,7 @@ import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TEventProcessorCmdParams;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TGetCatalogUsageResponse;
@@ -4057,8 +4058,9 @@ public class CatalogServiceCatalog extends Catalog {
* Gets the events processor summary. Used for populating the contents of the events
* processor detailed view page
*/
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
return metastoreEventProcessor_.getEventProcessorSummary();
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
TEventProcessorMetricsSummaryRequest req) {
return metastoreEventProcessor_.getEventProcessorSummary(req);
}
public TSetEventProcessorStatusResponse setEventProcessorStatus(

View File

@@ -21,6 +21,7 @@ import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.kudu.client.Delete;
@@ -83,7 +84,8 @@ public interface ExternalEventsProcessor {
* Gets a detailed view of the event processor which can be used to populate the
* content of a dedicated page for the event processor
*/
TEventProcessorMetricsSummaryResponse getEventProcessorSummary();
TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
TEventProcessorMetricsSummaryRequest req);
/**
* Gets the {@link MetastoreEventFactory} to be used for creating

View File

@@ -87,6 +87,7 @@ import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TEventBatchProgressInfo;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.thrift.TImpalaTableType;
import org.apache.impala.thrift.TStatus;
@@ -1407,7 +1408,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
}
@Override
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
TEventProcessorMetricsSummaryRequest req) {
TEventProcessorMetricsSummaryResponse summaryResponse =
new TEventProcessorMetricsSummaryResponse();
summaryResponse.setSummary(metrics_.toString());
@@ -1419,6 +1421,17 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
progressInfo.last_synced_event_time_s = lastSyncedEventTimeSecs_.get();
progressInfo.latest_event_id = latestEventId_.get();
progressInfo.latest_event_time_s = latestEventTimeSecs_.get();
if (req.get_latest_event_from_hms) {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
progressInfo.latest_event_id =
metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
progressInfo.latest_event_time_s = -1;
} catch (MetastoreClientInstantiationException | TException e) {
progressInfo.latest_event_id = - 1;
progressInfo.latest_event_time_s = -1;
LOG.warn("Failed to fetch the latest HMS event id. Returning -1", e);
}
}
// Assign these lists to local variables in case they are replaced concurrently.
// It's best effort to make the members in 'progressInfo' consistent but we can't
// guarantee it.

View File

@@ -26,6 +26,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
import org.apache.impala.common.Metrics;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
/**
@@ -98,7 +99,8 @@ public class NoOpEventProcessor implements ExternalEventsProcessor {
}
@Override
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
TEventProcessorMetricsSummaryRequest req) {
return DEFAULT_SUMMARY_RESPONSE;
}

View File

@@ -64,6 +64,7 @@ import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TGetCatalogDeltaRequest;
import org.apache.impala.thrift.TGetCatalogDeltaResponse;
import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
@@ -517,10 +518,19 @@ public class JniCatalog {
return execAndSerialize("getOperationUsage", shortDesc, catalog_::getOperationUsage);
}
public byte[] getEventProcessorSummary() throws ImpalaException, TException {
public byte[] getEventProcessorSummary(byte[] req)
throws ImpalaException, TException {
TEventProcessorMetricsSummaryRequest thriftReq =
new TEventProcessorMetricsSummaryRequest();
JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
String shortDesc = "Getting event processor summary";
if (thriftReq.get_latest_event_from_hms) {
shortDesc += " (get_latest_event_from_hms=true)";
}
return execAndSerialize(
"getEventProcessorSummary", shortDesc, catalog_::getEventProcessorSummary);
"getEventProcessorSummary", shortDesc,
() -> catalog_.getEventProcessorSummary(thriftReq));
}
public byte[] setEventProcessorStatus(byte[] thriftParams)

View File

@@ -146,6 +146,7 @@ import org.apache.impala.thrift.TDropDbParams;
import org.apache.impala.thrift.TDropFunctionParams;
import org.apache.impala.thrift.TDropTableOrViewParams;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
import org.apache.impala.thrift.TFunctionBinaryType;
import org.apache.impala.thrift.THdfsFileFormat;
@@ -1960,7 +1961,7 @@ public class MetastoreEventsProcessorTest {
assertTrue("Event process duration should be greater than zero",
response.getEvents_process_duration_mean() > 0);
TEventProcessorMetricsSummaryResponse summaryResponse =
catalog_.getEventProcessorSummary();
catalog_.getEventProcessorSummary(new TEventProcessorMetricsSummaryRequest());
assertNotNull(summaryResponse);
assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
}
@@ -1991,7 +1992,7 @@ public class MetastoreEventsProcessorTest {
assertTrue("we do not turn off disableHmsSync for table testTblName1",
response.getEvents_skipped() >= numEventsSkippedBefore);
TEventProcessorMetricsSummaryResponse summaryResponse =
catalog_.getEventProcessorSummary();
catalog_.getEventProcessorSummary(new TEventProcessorMetricsSummaryRequest());
assertNotNull(summaryResponse);
assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
@@ -2052,7 +2053,8 @@ public class MetastoreEventsProcessorTest {
assertFalse(response.isSetEvents_received_5min_rate());
assertFalse(response.isSetEvents_received_15min_rate());
TEventProcessorMetricsSummaryResponse summaryResponse =
eventsProcessor_.getEventProcessorSummary();
eventsProcessor_.getEventProcessorSummary(
new TEventProcessorMetricsSummaryRequest());
assertNotNull(summaryResponse);
// Last synced id must be set even when event processor is not active.
assertTrue(response.isSetLast_synced_event_id());
@@ -2075,7 +2077,7 @@ public class MetastoreEventsProcessorTest {
assertNotNull(response);
assertEquals(EventProcessorStatus.DISABLED.toString(), response.getStatus());
TEventProcessorMetricsSummaryResponse summaryResponse =
testCatalog.getEventProcessorSummary();
testCatalog.getEventProcessorSummary(new TEventProcessorMetricsSummaryRequest());
assertNotNull(summaryResponse);
}
/**

View File

@@ -530,6 +530,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
"--warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
@@ -537,9 +538,40 @@ class TestCatalogdHA(CustomClusterTestSuite):
"""Verify that the metadata is warmed up in the standby catalogd."""
for catalogd in self.__get_catalogds():
self._test_warmed_up_tables(catalogd.service)
latest_catalogd = self._test_metadata_after_failover(unique_database, True)
latest_catalogd = self._test_metadata_after_failover(
unique_database, skip_func_test=True)
self._test_warmed_up_tables(latest_catalogd)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
"--catalogd_ha_failover_catchup_timeout_s=2 "
"--warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
def test_failover_catchup_timeout_and_reset(self, unique_database):
self._test_metadata_after_failover(unique_database, skip_func_test=True)
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
"--catalogd_ha_failover_catchup_timeout_s=2 "
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
"--warmup_tables_config_file="
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
start_args="--enable_catalogd_ha")
def test_failover_catchup_timeout_not_reset(self, unique_database):
# Use allow_table_not_exists=True since the table is missing due to catalog not reset.
latest_catalogd = self._test_metadata_after_failover(
unique_database, allow_table_not_exists=True, skip_func_test=True)
# Verify tables are still loaded
self._test_warmed_up_tables(latest_catalogd)
# Run a global IM to bring up 'unique_database' in the new catalogd. Otherwise, the
# cleanup_database step will fail.
self.execute_query("invalidate metadata")
def _test_warmed_up_tables(self, catalogd):
db = "tpcds"
tables = ["customer", "date_dim", "item", "store_sales"]
@@ -547,7 +579,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
catalogd.verify_table_metadata_loaded(db, table)
catalogd.verify_table_metadata_loaded(db, "store", expect_loaded=False)
def _test_metadata_after_failover(self, unique_database, skip_func_test=False):
def _test_metadata_after_failover(self, unique_database,
allow_table_not_exists=False, skip_func_test=False):
"""Verify that the metadata is correct after failover. Returns the current active
catalogd"""
(active_catalogd, standby_catalogd) = self.__get_catalogds()
@@ -581,7 +614,14 @@ class TestCatalogdHA(CustomClusterTestSuite):
self.execute_query_expect_success(
self.client, "select %s.identity_tmp(10)" % unique_database)
self.execute_query_expect_success(self.client, "describe %s.tbl" % unique_database)
# Check if the new active catalogd has the new table in its cache.
try:
self.execute_query("describe %s.tbl" % unique_database)
except Exception as e:
if not allow_table_not_exists:
# Due to IMPALA-14228, the query could still fail. But it's not due to stale
# metadata so allow this until we resolve IMPALA-14228.
assert "Error making an RPC call to Catalog server" in str(e)
return catalogd_service_2
def test_page_with_disable_ha(self):