IMPALA-13536: Fix Workload Management Init with Catalog HA

When running an Impala cluster with catalogd HA enabled, the standby
catalogd would go into a loop waiting for the first catalog update to
arrive repeatedly logging the same error and never joining the server
thread defined in catalogd-main.cc.

Before this patch, when the standby daemon became active, the first
catalogd update was finally received, and the workload management
initialization process ran a second time in the newly active daemon
because this daemon saw that it was active.

This patch modifies the catalogd workload management initialization
code so it waits until the active catalogd has been determined. At
that point, the standby daemon skips workload management
initialization while the active daemon runs it after it receives the
first catalog update.

Testing was accomplished by modifying the workload management
initialization custom cluster tests to assert that the init process
is not re-run when a catalogd switches from standby to active and
also to remove the assumption that the first catalogd was active. The
test_catalog_ha test was deleted since all its assertions are handled
by the setup_method of the new TestWorkloadManagementCatalogHA class.

Ozone tests with and without erasure coding were also ran and passed.

Change-Id: Id3797a0a9cf0b8ae844d9b7d46b607d93824f69a
Reviewed-on: http://gerrit.cloudera.org:8080/22118
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
jasonmfehr
2024-11-25 12:55:51 -08:00
committed by Michael Smith
parent 403519def4
commit 41c145f5ad
6 changed files with 132 additions and 73 deletions

View File

@@ -21,11 +21,14 @@ import os
import re
from subprocess import CalledProcessError
from logging import getLogger
from SystemTables.ttypes import TQueryTableColumn
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.util.workload_management import assert_query
LOG = getLogger(__name__)
class TestWorkloadManagementInitBase(CustomClusterTestSuite):
@@ -463,49 +466,6 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
data = log_results.data[0].split("\t")
assert len(data) == len(log_results.column_labels)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt", start_args="--enable_catalogd_ha",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
disable_log_buffering=True)
def test_catalog_ha(self):
"""Asserts workload management initialization is only done on the active catalogd."""
# Assert the active catalog ran workload management initialization.
self.assert_catalogd_log_contains("INFO",
r"Completed workload management initialization")
# Assert the standby catalog skipped workload management initialization.
self.assert_catalogd_log_contains("INFO", r"workload management", expected_count=0,
node_index=1)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt", start_args="--enable_catalogd_ha",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
disable_log_buffering=True)
def test_catalog_ha_failover(self):
"""Asserts workload management initialization is not run a second time when catalogd
failover happens."""
# Assert the active catalog ran workload management initialization.
self.assert_catalogd_log_contains("INFO",
r"Completed workload management initialization")
# Assert the standby catalog skipped workload management initialization.
self.assert_catalogd_log_contains("INFO", r"workload management initialization",
expected_count=0, node_index=1)
# Kill active catalogd
catalogds = self.cluster.catalogds()
catalogds[0].kill()
# Wait for failover.
catalogds[1].service.wait_for_metric_value("catalog-server.active-status",
expected_value=True, timeout=30)
# Assert workload management initialization did not run a second time.
self.assert_catalogd_log_contains("INFO", r"workload management initialization",
expected_count=0, node_index=1)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -519,23 +479,6 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
self.assert_catalogd_log_contains("INFO",
r"Completed workload management initialization")
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
start_args="--enable_catalogd_ha --enable_statestored_ha",
disable_log_buffering=True)
def test_catalog_statestore_ha(self):
"""Asserts workload management initialization is only done on the active catalogd
when both catalog and statestore ha is enabled."""
# Assert the active catalog ran workload management initialization.
self.assert_catalogd_log_contains("INFO",
r"Completed workload management initialization")
# Assert the standby catalog skipped workload management initialization.
self.assert_catalogd_log_contains("INFO", r"workload management", expected_count=0,
node_index=1)
class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
@@ -624,3 +567,73 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
# Assert the standby catalog skipped workload management initialization.
self.assert_catalogd_log_contains("INFO", r"workload management initialization",
expected_count=0, node_index=1)
class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase):
"""Tests for the workload management initialization process. The setup method of this
class ensures only 1 catalogd ran the workload management initialization process."""
def setup_method(self, method):
super(TestWorkloadManagementCatalogHA, self).setup_method(method)
# Find all catalog instances that have initialized workload management.
init_logs = self.assert_catalogd_ha_contains("INFO",
r"Completed workload management initialization", timeout_s=30)
assert len(init_logs) == 2, "Expected length of catalogd matches to be '2' but " \
"was '{}'".format(len(init_logs))
# Assert only 1 catalog ran workload management initialization.
assert init_logs[0] is None or init_logs[1] is None, "Both catalogds ran workload " \
"management initialization"
# Assert the standby catalog skipped workload management initialization.
self.standby_catalog = 1
self.active_catalog = 0
if init_logs[0] is None:
# Catalogd 1 is the active catalog
self.standby_catalog = 0
self.active_catalog = 1
LOG.info("Found active catalogd is daemon '{}' and standby catalogd is daemon '{}'"
.format(self.active_catalog, self.standby_catalog))
self.assert_catalogd_log_contains("INFO",
r"Skipping workload management initialization since catalogd HA is enabled and "
r"this catalogd is not active", node_index=self.standby_catalog)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt", start_args="--enable_catalogd_ha",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
disable_log_buffering=True)
def test_catalog_ha_failover(self):
"""Asserts workload management initialization is not run a second time when catalogd
failover happens."""
# Kill active catalogd
catalogds = self.cluster.catalogds()
catalogds[0].kill()
# Wait for failover.
catalogds[1].service.wait_for_metric_value("catalog-server.active-status",
expected_value=True, timeout=30)
# Wait for standby catalog to complete its initialization as the active catalogd.
self.assert_catalogd_log_contains("INFO", r'catalog update with \d+ entries is '
r'assembled', expected_count=-1, node_index=self.standby_catalog)
# Assert workload management initialization did not run a second time.
self.assert_catalogd_log_contains("INFO", r"Starting workload management "
r"initialization", expected_count=0, node_index=self.standby_catalog)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
start_args="--enable_catalogd_ha --enable_statestored_ha",
disable_log_buffering=True)
def test_catalog_statestore_ha(self):
"""Asserts workload management initialization is only done on the active catalogd
when both catalog and statestore ha is enabled."""
self.assert_log_contains("statestored", "INFO", r"Registering: catalog", 2, 30)
self.assert_log_contains("statestored_node1", "INFO", r"Registering: catalog", 2, 30)