IMPALA-12304: Fix the sequence number issue for update_catalogd RPC

Subscriber will re-register to statestore when statestore is restarted.
When the statestore is restarted, its sending sequence number for
update_catalogd RPC is reset. The subscribers need to reset their last
receiving sequence number of update_catalogd RPC when they successfully
re-register with statestore. Otherwise, subscribers may miss some RPCs
after statestore is restarted.

Could find related error messages in catalogd log file when run
test_catalogd_ha.py::TestCatalogdHA::test_restart_statestore.
Verified that no such error messages in catalogd log after the fix.

Made a small optimization for statestore not to wake up the thread
for update_catalogd RPC if there is no change for elected active
catalogd and there is no RPC failure in last round.

Testing:
 - Passed the core test.

Change-Id: I21c1e6f6d8b047a37c7db2b7995b7ff74e317226
Reviewed-on: http://gerrit.cloudera.org:8080/20247
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This commit is contained in:
wzhou-code
2023-07-21 12:02:56 -07:00
committed by Wenzhe Zhou
parent ae95c43eda
commit af3f56e6d1
3 changed files with 13 additions and 1 deletions

View File

@@ -465,6 +465,9 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* has_active_catalogd,
*active_catalogd_registration = response.catalogd_registration;
}
}
// Reset last received sequence number of update_catalogd RPC since statestore
// could be restarted.
last_update_catalogd_seq_ = 0;
}
heartbeat_interval_timer_.Start();
return status;

View File

@@ -1236,7 +1236,11 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
while (1) {
{
unique_lock<mutex> l(*catalog_manager_.GetLock());
update_catalod_cv_.WaitFor(l, timeout_us);
if (rpc_receivers.empty()) {
update_catalod_cv_.Wait(l);
} else {
update_catalod_cv_.WaitFor(l, timeout_us);
}
}
SendUpdateCatalogdNotification(&last_sending_sequence, rpc_receivers);
}

View File

@@ -17,6 +17,7 @@
from __future__ import absolute_import, division, print_function
import logging
import re
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout
@@ -336,3 +337,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
# Verify simple query is ran successfully.
self.execute_query_expect_success(
self.client, "select count(*) from functional.alltypes")
unexpected_msg = re.compile(
"unexpected sequence number: [0-9]+, was expecting greater than [0-9]+")
self.assert_catalogd_log_contains("INFO", unexpected_msg, expected_count=0)