mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13850 (part 1): Wait until CatalogD active before resetting
In HA mode, CatalogD initialization can fail to complete within reasonable time. Log messages showed that CatalogD is blocked trying to acquire "CatalogServer.catalog_lock_" when calling CatalogServer::UpdateActiveCatalogd() during statestore subscriber registration. catalog_lock_ was held by GatherCatalogUpdatesThread which is calling GetCatalogDelta(), which waits for the java lock versionLock_ which is held by the thread doing CatalogServiceCatalog.reset(). This patch remove catalog reset in JniCatalog constructor. In turn, catalogd-server.cc is now responsible to trigger the metadata reset (Invaidate Metadata) only if: 1. It is the active CatalogD, and 2. Gathering thread has collect the first topic update or CatalogD is set with catalog_topic_mode other than "minimal". The later prerequisite is to ensure that all coordinators are not blocked waiting for full topic update in on-demand metadata mode. This is all managed by a new thread method TriggerResetMetadata that monitor and trigger the initial reset metadata. Note that this is a behavior change in on-demand catalog mode (catalog_topic_mode=minimal). Previously, on-demand catalog mode will send full database list in its first catalog topic update. This behavior change is OK since coordinator can request metadata on-demand. After this patch, catalog-server.active-status and /healthz page can turn into true and OK respectively even if the very first metadata reset is still ongoing. Observer that cares about having fully populated metadata should check other metrics such as catalog.num-db, catalog.num-tables, or /catalog page content. Updated start-impala-cluster.py readiness check to wait for at least 1 table to be seen by coordinators, except during create-load-data.sh execution (there is no table yet) and when use_local_catalog=true (local catalog cache does not start with any table). Modified startup flag checking from reading the actual command line args to reading the '/varz?json' page of the daemon. Cleanup impala_service.py to fix some flake8 issues. Slightly update TestLocalCatalogCompactUpdates::test_restart_catalogd so that unique_database cleanup is successful. Testing: - Refactor test_catalogd_ha.py to reduce repeated code, use unique_database fixture, and additionally validate /healthz page of both active and standby catalogd. Changed it to test using hs2 protocol by default. - Run and pass test_catalogd_ha.py and test_concurrent_ddls.py. - Pass core tests. Change-Id: I58cc66dcccedb306ff11893f2916ee5ee6a3efc1 Reviewed-on: http://gerrit.cloudera.org:8080/22634 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Riza Suminto <riza.suminto@cloudera.com>
This commit is contained in:
@@ -510,9 +510,13 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
|
||||
CatalogServer::CatalogServer(MetricGroup* metrics)
|
||||
: protocol_version_(CatalogServiceVersion::V2),
|
||||
thrift_iface_(new CatalogServiceThriftIf(this)),
|
||||
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
|
||||
is_active_(!FLAGS_enable_catalogd_ha), is_ha_determined_(!FLAGS_enable_catalogd_ha),
|
||||
topic_updates_ready_(false), last_sent_catalog_version_(0L),
|
||||
thrift_serializer_(FLAGS_compact_catalog_topic),
|
||||
metrics_(metrics),
|
||||
is_active_(!FLAGS_enable_catalogd_ha),
|
||||
is_ha_determined_(!FLAGS_enable_catalogd_ha),
|
||||
topic_updates_ready_(false),
|
||||
last_sent_catalog_version_(0L),
|
||||
triggered_first_reset_(false),
|
||||
catalog_objects_max_version_(0L) {
|
||||
topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
|
||||
CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
|
||||
@@ -552,7 +556,6 @@ Status CatalogServer::Start() {
|
||||
TNetworkAddress server_address = MakeNetworkAddress(FLAGS_hostname,
|
||||
FLAGS_catalog_service_port);
|
||||
|
||||
// This will trigger a full Catalog metadata load.
|
||||
catalog_.reset(new Catalog());
|
||||
#ifndef NDEBUG
|
||||
if (FLAGS_stress_catalog_startup_delay_ms > 0) {
|
||||
@@ -562,6 +565,8 @@ Status CatalogServer::Start() {
|
||||
RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-update-gathering-thread",
|
||||
&CatalogServer::GatherCatalogUpdatesThread, this,
|
||||
&catalog_update_gathering_thread_));
|
||||
RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-first-reset-metadata-thread",
|
||||
&CatalogServer::TriggerResetMetadata, this, &catalog_first_reset_metadata_thread_));
|
||||
RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-metrics-refresh-thread",
|
||||
&CatalogServer::RefreshMetrics, this, &catalog_metrics_refresh_thread_));
|
||||
|
||||
@@ -596,7 +601,7 @@ Status CatalogServer::Start() {
|
||||
if (FLAGS_force_catalogd_active && !IsActive()) {
|
||||
// If both catalogd are started with 'force_catalogd_active' as true in short time,
|
||||
// the second election overwrite the first election. The one which registering with
|
||||
// statstore first will be inactive.
|
||||
// statestore first will be inactive.
|
||||
LOG(WARNING) << "Could not start CatalogD as active instance";
|
||||
}
|
||||
|
||||
@@ -714,17 +719,7 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
|
||||
// Clear pending topic updates.
|
||||
pending_topic_updates_.clear();
|
||||
if (FLAGS_catalogd_ha_reset_metadata_on_failover) {
|
||||
// Reset all metadata when the catalogd becomes active.
|
||||
TResetMetadataRequest req;
|
||||
TResetMetadataResponse resp;
|
||||
req.__set_header(TCatalogServiceRequestHeader());
|
||||
req.header.__set_want_minimal_response(false);
|
||||
req.__set_is_refresh(false);
|
||||
req.__set_sync_ddl(false);
|
||||
Status status = catalog_->ResetMetadata(req, &resp);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Failed to reset metadata triggered by catalogd failover.";
|
||||
}
|
||||
triggered_first_reset_ = false;
|
||||
} else {
|
||||
// Refresh DataSource objects when the catalogd becomes active.
|
||||
Status status = catalog_->RefreshDataSources();
|
||||
@@ -755,19 +750,64 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
|
||||
is_ha_determined_ = true;
|
||||
}
|
||||
|
||||
[[noreturn]] void CatalogServer::TriggerResetMetadata() {
|
||||
while (true) {
|
||||
bool must_reset = false;
|
||||
{
|
||||
unique_lock<mutex> unique_lock(catalog_lock_);
|
||||
while (!must_reset) {
|
||||
catalog_update_cv_.NotifyOne();
|
||||
catalog_update_cv_.Wait(unique_lock);
|
||||
must_reset = is_active_ && !triggered_first_reset_;
|
||||
}
|
||||
}
|
||||
|
||||
// Run ResetMetadata without holding 'catalog_lock_' so that it does not block
|
||||
// gathering thread from starting. Note that gathering thread will still compete
|
||||
// for CatalogServiceCatalog.versionLock_.writeLock() in JVM.
|
||||
VLOG(1) << "Triggering first catalog invalidation.";
|
||||
TResetMetadataRequest req;
|
||||
TResetMetadataResponse resp;
|
||||
req.__set_header(TCatalogServiceRequestHeader());
|
||||
req.header.__set_want_minimal_response(true); // not using response. minimal is OK.
|
||||
req.__set_is_refresh(false);
|
||||
req.__set_sync_ddl(false);
|
||||
Status status = catalog_->ResetMetadata(req, &resp);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Catalog server failed to run first catalog invalidation. "
|
||||
<< "Please run 'invalidate metadata' manually.";
|
||||
}
|
||||
{
|
||||
// Mark to true, regardless of status.
|
||||
unique_lock<mutex> unique_lock(catalog_lock_);
|
||||
triggered_first_reset_ = true;
|
||||
catalog_update_cv_.NotifyOne();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool CatalogServer::IsActive() {
|
||||
lock_guard<mutex> l(catalog_lock_);
|
||||
return is_active_;
|
||||
}
|
||||
|
||||
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
|
||||
// catalog_topic_mode=minimal does not require initial reset to happen ahead of catalog
|
||||
// update gathering because coordinator will request metadata on-demand.
|
||||
bool require_initial_reset = FLAGS_catalog_topic_mode != "minimal";
|
||||
while (true) {
|
||||
unique_lock<mutex> unique_lock(catalog_lock_);
|
||||
// Protect against spurious wake-ups by checking the value of topic_updates_ready_.
|
||||
// It is only safe to continue on and update the shared pending_topic_updates_
|
||||
// when topic_updates_ready_ is false, otherwise we may be in the middle of
|
||||
// processing a heartbeat.
|
||||
while (topic_updates_ready_) {
|
||||
// If require_initial_reset is True, this thread need to let TriggerResetMetadata
|
||||
// thread to proceed first.
|
||||
while (topic_updates_ready_ || (require_initial_reset && !triggered_first_reset_)) {
|
||||
if (!triggered_first_reset_) {
|
||||
// Wake up TriggerResetMetadata thread.
|
||||
catalog_update_cv_.NotifyOne();
|
||||
}
|
||||
catalog_update_cv_.Wait(unique_lock);
|
||||
}
|
||||
|
||||
@@ -785,6 +825,8 @@ bool CatalogServer::IsActive() {
|
||||
} else if (current_catalog_version != last_sent_catalog_version_) {
|
||||
// If there has been a change since the last time the catalog was queried,
|
||||
// call into the Catalog to find out what has changed.
|
||||
VLOG(2) << "Catalog version changed from " << last_sent_catalog_version_ << " to "
|
||||
<< current_catalog_version << ". Gathering catalog delta.";
|
||||
TGetCatalogDeltaResponse resp;
|
||||
status = catalog_->GetCatalogDelta(this, last_sent_catalog_version_, &resp);
|
||||
if (!status.ok()) {
|
||||
@@ -796,6 +838,7 @@ bool CatalogServer::IsActive() {
|
||||
|
||||
topic_processing_time_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
|
||||
topic_updates_ready_ = true;
|
||||
if (!triggered_first_reset_) catalog_update_cv_.NotifyOne();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -210,6 +210,9 @@ class CatalogServer {
|
||||
IntGauge* num_hms_clients_idle_metric_;
|
||||
IntGauge* num_hms_clients_in_use_metric_;
|
||||
|
||||
/// Thread that polls and execute first reset metadata operation.
|
||||
std::unique_ptr<Thread> catalog_first_reset_metadata_thread_;
|
||||
|
||||
/// Thread that polls the catalog for any updates.
|
||||
std::unique_ptr<Thread> catalog_update_gathering_thread_;
|
||||
|
||||
@@ -253,8 +256,13 @@ class CatalogServer {
|
||||
/// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_.
|
||||
int64_t last_sent_catalog_version_;
|
||||
|
||||
/// Mark if the first metadata reset has been triggered.
|
||||
/// Protected by the catalog_lock_.
|
||||
bool triggered_first_reset_;
|
||||
|
||||
/// The max catalog version in pending_topic_updates_. Set by the
|
||||
/// catalog_update_gathering_thread_ and protected by catalog_lock_.
|
||||
/// Value -1 means catalog_update_gathering_thread_ has not set it.
|
||||
int64_t catalog_objects_max_version_;
|
||||
|
||||
/// Called during each Statestore heartbeat and is responsible for updating the current
|
||||
@@ -392,6 +400,10 @@ class CatalogServer {
|
||||
void HadoopVarzHandler(const Webserver::WebRequest& req,
|
||||
rapidjson::Document* document);
|
||||
|
||||
/// Thread method to issue first ResetMetadata request when this CatalogD becomes
|
||||
/// active.
|
||||
[[noreturn]] void TriggerResetMetadata();
|
||||
|
||||
/// Indicates if the catalog has finished initialization. If last_sent_catalog_version_
|
||||
/// is greater than 0, returns `true`, otherwise returns `false`.
|
||||
inline bool IsCatalogInitialized();
|
||||
|
||||
@@ -205,6 +205,9 @@ parser.add_option("--use_calcite_planner", default="False", type="choice",
|
||||
choices=["true", "True", "false", "False"],
|
||||
help="If true, use the Calcite planner for query optimization "
|
||||
"instead of the Impala planner")
|
||||
parser.add_option("--wait_num_table", default=1, type="int",
|
||||
help="If starting cluster, wait until coordinator's catalog.num-tables "
|
||||
"reach this value or more.")
|
||||
|
||||
# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
|
||||
# replica initialization. The ith delay is applied to the ith impalad.
|
||||
@@ -1253,7 +1256,9 @@ if __name__ == "__main__":
|
||||
# https://issues.apache.org/jira/browse/IMPALA-13755
|
||||
expected_num_ready_impalads = options.cluster_size
|
||||
expected_cluster_size = options.cluster_size
|
||||
impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads)
|
||||
LOG.info("wait_num_table={}".format(options.wait_num_table))
|
||||
impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads,
|
||||
wait_num_table=options.wait_num_table)
|
||||
except Exception as e:
|
||||
LOG.exception("Error starting cluster")
|
||||
sys.exit(1)
|
||||
|
||||
@@ -28,7 +28,6 @@ import static org.apache.impala.service.CatalogOpExecutor.FETCHED_HMS_TABLE;
|
||||
import static org.apache.impala.service.CatalogOpExecutor.FETCHED_LATEST_HMS_EVENT_ID;
|
||||
import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_READ_LOCK;
|
||||
import static org.apache.impala.service.CatalogOpExecutor.GOT_TABLE_WRITE_LOCK;
|
||||
import static org.apache.impala.thrift.TCatalogObjectType.DATABASE;
|
||||
import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION;
|
||||
import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
|
||||
|
||||
@@ -132,7 +131,6 @@ import org.apache.impala.thrift.TResetMetadataRequest;
|
||||
import org.apache.impala.thrift.TSetEventProcessorStatusResponse;
|
||||
import org.apache.impala.thrift.TStatus;
|
||||
import org.apache.impala.thrift.TSystemTableName;
|
||||
import org.apache.impala.thrift.TStatus;
|
||||
import org.apache.impala.thrift.TTable;
|
||||
import org.apache.impala.thrift.TTableName;
|
||||
import org.apache.impala.thrift.TTableType;
|
||||
@@ -2327,6 +2325,9 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
newDbCache.put(dbName, invalidatedDb.first);
|
||||
tblsToBackgroundLoad.addAll(invalidatedDb.second);
|
||||
}
|
||||
|
||||
DebugUtils.executeDebugAction(BackendConfig.INSTANCE.debugActions(),
|
||||
DebugUtils.RESET_METADATA_LOOP_LOCKED);
|
||||
}
|
||||
}
|
||||
dbCache_.set(newDbCache);
|
||||
@@ -3573,10 +3574,8 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
// changed from active to standby, or from standby to active. Inactive catalogd
|
||||
// does not receive catalog topic updates from the statestore. To avoid waiting
|
||||
// indefinitely, throw exception if its service id has been changed.
|
||||
if (!Strings.isNullOrEmpty(BackendConfig.INSTANCE.debugActions())) {
|
||||
DebugUtils.executeDebugAction(
|
||||
BackendConfig.INSTANCE.debugActions(), DebugUtils.WAIT_SYNC_DDL_VER_DELAY);
|
||||
}
|
||||
DebugUtils.executeDebugAction(
|
||||
BackendConfig.INSTANCE.debugActions(), DebugUtils.WAIT_SYNC_DDL_VER_DELAY);
|
||||
if (!serviceId.equals(JniCatalog.getServiceId())) {
|
||||
String errorMsg = "Couldn't retrieve the catalog topic update for the " +
|
||||
"SYNC_DDL operation since HA role of this catalog instance has been " +
|
||||
|
||||
@@ -102,7 +102,6 @@ import org.apache.impala.util.NoOpEventSequence;
|
||||
import org.apache.impala.util.PatternMatcher;
|
||||
import org.apache.impala.util.TUniqueIdUtil;
|
||||
import org.apache.impala.util.ThreadNameAnnotator;
|
||||
import org.apache.impala.util.TUniqueIdUtil;
|
||||
import org.apache.thrift.TBase;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
@@ -189,11 +188,7 @@ public class JniCatalog {
|
||||
catalog_.setCatalogMetastoreServer(catalogMetastoreServer_);
|
||||
catalogMetastoreServer_.start();
|
||||
|
||||
try {
|
||||
catalog_.reset(NoOpEventSequence.INSTANCE);
|
||||
} catch (CatalogException e) {
|
||||
LOG.error("Error initializing Catalog. Please run 'invalidate metadata'", e);
|
||||
}
|
||||
// catalog-server.cc is responsible to call catalog_.reset() for the first time.
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -104,6 +104,10 @@ public class DebugUtils {
|
||||
// test failure for IMPALA-13126.
|
||||
public static final String MOCK_WRITE_LOCK_FAILURE = "mock_write_lock_failure";
|
||||
|
||||
// debug action lable inside CatalogServiceCatalog.reset() Db loop section that hold
|
||||
// the write lock.
|
||||
public static final String RESET_METADATA_LOOP_LOCKED = "reset_metadata_loop_locked";
|
||||
|
||||
/**
|
||||
* Returns true if the label of action is set in the debugActions
|
||||
*/
|
||||
|
||||
2
testdata/bin/create-load-data.sh
vendored
2
testdata/bin/create-load-data.sh
vendored
@@ -168,6 +168,8 @@ function start-impala {
|
||||
# Disable strict datafile location checks for Iceberg tables
|
||||
DATAFILE_LOCATION_CHECK="-iceberg_allow_datafiles_in_table_location_only=false"
|
||||
START_CLUSTER_ARGS_INT+=("--catalogd_args=$DATAFILE_LOCATION_CHECK")
|
||||
# No need to wait for min num table.
|
||||
START_CLUSTER_ARGS_INT+=("--wait_num_table=-1")
|
||||
if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
|
||||
START_CLUSTER_ARGS_INT+=("--impalad_args=--abort_on_config_error=false -s 1")
|
||||
else
|
||||
|
||||
@@ -204,7 +204,8 @@ class ImpalaCluster(object):
|
||||
client.close()
|
||||
return n
|
||||
|
||||
def wait_until_ready(self, expected_num_impalads=1, expected_num_ready_impalads=None):
|
||||
def wait_until_ready(self, expected_num_impalads=1, expected_num_ready_impalads=None,
|
||||
wait_num_table=-1):
|
||||
"""Waits for this 'cluster' to be ready to submit queries.
|
||||
|
||||
A cluster is deemed "ready" if:
|
||||
@@ -219,6 +220,7 @@ class ImpalaCluster(object):
|
||||
This information is retrieved by querying the statestore debug webpage
|
||||
and each individual impalad's metrics webpage.
|
||||
"""
|
||||
start_time = time.time()
|
||||
self.wait_for_num_impalads(expected_num_impalads)
|
||||
|
||||
# TODO: fix this for coordinator-only nodes as well.
|
||||
@@ -242,9 +244,16 @@ class ImpalaCluster(object):
|
||||
|
||||
# Wait for coordinators to start.
|
||||
for impalad in self.impalads:
|
||||
if impalad._get_arg_value("is_coordinator", default="true") != "true": continue
|
||||
if impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) != 0: continue
|
||||
impalad.wait_for_coordinator_services(sleep_interval, check_processes_still_running)
|
||||
# lookup /varz page. webserver should up already.
|
||||
flags = impalad.service.get_flag_current_values()
|
||||
if flags['is_coordinator'] != 'true':
|
||||
continue
|
||||
if flags['stress_catalog_init_delay_ms'] != '0':
|
||||
continue
|
||||
if flags['use_local_catalog'] != 'false':
|
||||
wait_num_table = -1
|
||||
impalad.wait_for_coordinator_services(sleep_interval, check_processes_still_running,
|
||||
wait_num_table=wait_num_table)
|
||||
# Decrease sleep_interval after first coordinator ready as the others are also
|
||||
# likely to be (nearly) ready.
|
||||
sleep_interval = 0.2
|
||||
@@ -257,6 +266,7 @@ class ImpalaCluster(object):
|
||||
impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
|
||||
timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=sleep_interval,
|
||||
early_abort_fn=check_processes_still_running)
|
||||
LOG.info("Total wait: {:.2f}s".format(time.time() - start_time))
|
||||
|
||||
def wait_for_num_impalads(self, num_impalads, retries=10):
|
||||
"""Checks that at least 'num_impalads' impalad processes are running, along with
|
||||
@@ -553,7 +563,7 @@ class BaseImpalaProcess(Process):
|
||||
"""Return the port for the webserver of this process."""
|
||||
return int(self._get_port('webserver_port', self._get_default_webserver_port()))
|
||||
|
||||
def set_jvm_log_level(self, class_name, level="info"):
|
||||
def set_jvm_log_level(self, class_name, level="info"): # noqa: U100
|
||||
"""Helper method to set JVM log level for certain class name.
|
||||
Some daemon might not have JVM in it."""
|
||||
raise NotImplementedError()
|
||||
@@ -638,7 +648,8 @@ class ImpaladProcess(BaseImpalaProcess):
|
||||
early_abort_fn()
|
||||
sleep(sleep_interval)
|
||||
|
||||
def wait_for_coordinator_services(self, sleep_interval, early_abort_fn):
|
||||
def wait_for_coordinator_services(self, sleep_interval, early_abort_fn,
|
||||
wait_num_table=-1):
|
||||
"""Waits for client ports to be opened. Assumes that the webservice ports are open."""
|
||||
start_time = time.time()
|
||||
LOG.info(
|
||||
@@ -649,17 +660,20 @@ class ImpaladProcess(BaseImpalaProcess):
|
||||
beeswax_port_is_open = self.service.beeswax_port_is_open()
|
||||
hs2_port_is_open = self.service.hs2_port_is_open()
|
||||
hs2_http_port_is_open = self.service.hs2_http_port_is_open()
|
||||
if beeswax_port_is_open and hs2_port_is_open and hs2_http_port_is_open:
|
||||
return
|
||||
early_abort_fn()
|
||||
# The coordinator is likely to wait for the catalog update. Fetch the number
|
||||
# of catalog objects.
|
||||
# Fetch the number of catalog objects.
|
||||
num_dbs, num_tbls = self.service.get_metric_values(
|
||||
["catalog.num-databases", "catalog.num-tables"])
|
||||
if (beeswax_port_is_open and hs2_port_is_open and hs2_http_port_is_open
|
||||
and num_tbls >= wait_num_table):
|
||||
return
|
||||
early_abort_fn()
|
||||
# The coordinator is likely to wait for the catalog update.
|
||||
LOG.info(("Client services not ready. Waiting for catalog cache: "
|
||||
"({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
|
||||
"({num_dbs} DBs / {num_tbls} tables / wait_num_table={wait_num_table}). "
|
||||
"Trying again ...").format(
|
||||
num_dbs=num_dbs,
|
||||
num_tbls=num_tbls))
|
||||
num_tbls=num_tbls,
|
||||
wait_num_table=wait_num_table))
|
||||
sleep(sleep_interval)
|
||||
|
||||
raise RuntimeError(
|
||||
|
||||
@@ -79,9 +79,10 @@ class BaseImpalaService(object):
|
||||
def read_debug_webpage(self, page_name, timeout=10, interval=1):
|
||||
return self.open_debug_webpage(page_name, timeout=timeout, interval=interval).text
|
||||
|
||||
def get_debug_webpage_json(self, page_name):
|
||||
def get_debug_webpage_json(self, page_name, timeout=10, interval=1):
|
||||
"""Returns the json for the given Impala debug webpage, eg. '/queries'"""
|
||||
return json.loads(self.read_debug_webpage(page_name + "?json"))
|
||||
return json.loads(self.read_debug_webpage(
|
||||
page_name + "?json", timeout=timeout, interval=interval))
|
||||
|
||||
def dump_debug_webpage_json(self, page_name, filename):
|
||||
"""Dumps the json for a given Impalad debug webpage to the specified file.
|
||||
@@ -101,19 +102,27 @@ class BaseImpalaService(object):
|
||||
if default_values is None:
|
||||
default_values = [None for m in metric_names]
|
||||
assert len(metric_names) == len(default_values)
|
||||
metrics = json.loads(self.read_debug_webpage('jsonmetrics?json'))
|
||||
metrics = self.get_debug_webpage_json('jsonmetrics')
|
||||
return [metrics.get(metric_name, default_value)
|
||||
for metric_name, default_value in zip(metric_names, default_values)]
|
||||
|
||||
def get_flag_current_value(self, flag):
|
||||
"""Returns the value of the the given flag name from the Impala /varz debug webpage.
|
||||
If the flag does not exist it returns None."""
|
||||
varz = json.loads(self.read_debug_webpage("varz?json"))
|
||||
varz = self.get_debug_webpage_json("varz")
|
||||
for var in varz.get("flags"):
|
||||
if var["name"] == flag:
|
||||
return var["current"]
|
||||
return None
|
||||
|
||||
def get_flag_current_values(self):
|
||||
"""Returns the value of all flags from the Impala /varz debug webpage."""
|
||||
flags = dict()
|
||||
varz = self.get_debug_webpage_json("varz")
|
||||
for var in varz.get("flags"):
|
||||
flags[var['name']] = var['current']
|
||||
return flags
|
||||
|
||||
def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interval=1,
|
||||
allow_greater=False):
|
||||
start_time = time()
|
||||
@@ -262,14 +271,15 @@ class ImpaladService(BaseImpalaService):
|
||||
|
||||
def get_num_known_live_executors(self, timeout=30, interval=1,
|
||||
include_shutting_down=True):
|
||||
return self.get_num_known_live_backends(include_shutting_down=include_shutting_down,
|
||||
return self.get_num_known_live_backends(timeout=timeout, interval=interval,
|
||||
include_shutting_down=include_shutting_down,
|
||||
only_executors=True)
|
||||
|
||||
def get_num_known_live_backends(self, timeout=30, interval=1,
|
||||
include_shutting_down=True, only_coordinators=False, only_executors=False):
|
||||
LOG.debug("Getting num_known_live_backends from %s:%s" %
|
||||
(self.hostname, self.webserver_port))
|
||||
result = json.loads(self.read_debug_webpage('backends?json', timeout, interval))
|
||||
result = self.get_debug_webpage_json('backends', timeout, interval)
|
||||
count = 0
|
||||
for backend in result['backends']:
|
||||
if backend['is_quiescing'] and not include_shutting_down:
|
||||
@@ -286,15 +296,15 @@ class ImpaladService(BaseImpalaService):
|
||||
group's executors."""
|
||||
LOG.debug("Getting executor groups from %s:%s" %
|
||||
(self.hostname, self.webserver_port))
|
||||
result = json.loads(self.read_debug_webpage('backends?json', timeout, interval))
|
||||
result = self.get_debug_webpage_json('backends', timeout, interval)
|
||||
groups = defaultdict(list)
|
||||
for backend in result['backends']:
|
||||
groups[backend['executor_groups']].append(backend['krpc_address'])
|
||||
return groups
|
||||
|
||||
def get_queries_json(self):
|
||||
def get_queries_json(self, timeout=30, interval=1):
|
||||
"""Return the full JSON from the /queries page."""
|
||||
return json.loads(self.read_debug_webpage('queries?json', timeout=30, interval=1))
|
||||
return self.get_debug_webpage_json('queries', timeout=timeout, interval=interval)
|
||||
|
||||
def get_query_locations(self):
|
||||
# Returns a dictionary of the format <host_address, num_of_queries_running_there>
|
||||
@@ -305,17 +315,17 @@ class ImpaladService(BaseImpalaService):
|
||||
|
||||
def get_in_flight_queries(self, timeout=30, interval=1):
|
||||
"""Returns the number of in flight queries."""
|
||||
return self.get_queries_json()['in_flight_queries']
|
||||
return self.get_queries_json(timeout=timeout, interval=interval)['in_flight_queries']
|
||||
|
||||
def get_completed_queries(self, timeout=30, interval=1):
|
||||
"""Returns the number of completed queries."""
|
||||
result = json.loads(self.read_debug_webpage('queries?json', timeout, interval))
|
||||
result = self.get_debug_webpage_json('queries', timeout, interval)
|
||||
return result['completed_queries']
|
||||
|
||||
def _get_pool_counter(self, pool_name, counter_name, timeout=30, interval=1):
|
||||
"""Returns the value of the field 'counter_name' in pool 'pool_name' or 0 if the pool
|
||||
doesn't exist."""
|
||||
result = json.loads(self.read_debug_webpage('admission?json', timeout, interval))
|
||||
result = self.get_debug_webpage_json('admission', timeout, interval)
|
||||
pools = result['resource_pools']
|
||||
for pool in pools:
|
||||
if pool['pool_name'] == pool_name:
|
||||
@@ -341,7 +351,8 @@ class ImpaladService(BaseImpalaService):
|
||||
LOG.info("Getting num_in_flight_queries from %s:%s" %
|
||||
(self.hostname, self.webserver_port))
|
||||
result = self.read_debug_webpage('inflight_query_ids?raw', timeout, interval)
|
||||
return None if result is None else len([l for l in result.split('\n') if l])
|
||||
return None if result is None else len(
|
||||
[line for line in result.split('\n') if line])
|
||||
|
||||
def wait_for_num_in_flight_queries(self, expected_val, timeout=10):
|
||||
"""Waits for the number of in-flight queries to reach a certain value"""
|
||||
@@ -381,7 +392,8 @@ class ImpaladService(BaseImpalaService):
|
||||
"""Fetches the raw contents of the query's runtime profile webpage.
|
||||
Fails an assertion if Impala's webserver is unavailable or the query's
|
||||
profile page doesn't exist."""
|
||||
return self.read_debug_webpage("query_profile?query_id=%s&raw" % (query_id))
|
||||
return self.read_debug_webpage(
|
||||
"query_profile?query_id=%s&raw" % (query_id), timeout=timeout, interval=interval)
|
||||
|
||||
def get_query_status(self, query_id):
|
||||
"""Gets the 'Query Status' section of the query's runtime profile."""
|
||||
@@ -409,8 +421,7 @@ class ImpaladService(BaseImpalaService):
|
||||
client.handle_id(query_handle), target_state, query_state)
|
||||
return
|
||||
|
||||
def wait_for_query_status(self, client, query_id, expected_content,
|
||||
timeout=30, interval=1):
|
||||
def wait_for_query_status(self, query_id, expected_content, timeout=30, interval=1):
|
||||
"""Polls for the query's status in the query profile web page to contain the
|
||||
specified content. Returns False if the timeout was reached before a successful
|
||||
match, True otherwise."""
|
||||
|
||||
@@ -26,6 +26,8 @@ from builtins import round
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.environ import build_flavor_timeout
|
||||
from tests.common.impala_connection import ERROR
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.test_vector import HS2
|
||||
from tests.util.filesystem_utils import IS_S3, get_fs_path
|
||||
from time import sleep
|
||||
|
||||
@@ -49,9 +51,14 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
VARZ_URL = "http://localhost:{0}/varz"
|
||||
CATALOG_HA_INFO_URL = "http://localhost:{0}/catalog_ha_info"
|
||||
JSON_METRICS_URL = "http://localhost:{0}/jsonmetrics"
|
||||
HEALTHZ_URL = "http://localhost:{0}/healthz"
|
||||
|
||||
SS_TEST_PORT = ["25010"]
|
||||
|
||||
@classmethod
|
||||
def default_test_protocol(cls):
|
||||
return HS2
|
||||
|
||||
# Verify port of the active catalogd of statestore is matching with the catalog
|
||||
# service port of the given catalogd service.
|
||||
def __verify_statestore_active_catalogd_port(self, catalogd_service):
|
||||
@@ -59,7 +66,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
active_catalogd_address = \
|
||||
statestore_service.get_metric_value("statestore.active-catalogd-address")
|
||||
_, catalog_service_port = active_catalogd_address.split(":")
|
||||
assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port())
|
||||
assert int(catalog_service_port) == catalogd_service.get_catalog_service_port()
|
||||
|
||||
# Verify port of the active catalogd of impalad is matching with the catalog
|
||||
# service port of the given catalogd service.
|
||||
@@ -68,36 +75,57 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
active_catalogd_address = \
|
||||
impalad_service.get_metric_value("catalog.active-catalogd-address")
|
||||
_, catalog_service_port = active_catalogd_address.split(":")
|
||||
assert(int(catalog_service_port) == catalogd_service.get_catalog_service_port())
|
||||
assert int(catalog_service_port) == catalogd_service.get_catalog_service_port()
|
||||
|
||||
def __run_simple_queries(self, sync_ddl=False):
|
||||
try:
|
||||
if sync_ddl:
|
||||
self.execute_query_expect_success(self.client, "set SYNC_DDL=1")
|
||||
self.execute_query_expect_success(
|
||||
self.client, "drop table if exists test_catalogd_ha")
|
||||
self.execute_query_expect_success(
|
||||
self.client, "create table if not exists test_catalogd_ha (id int)")
|
||||
self.execute_query_expect_success(
|
||||
self.client, "insert into table test_catalogd_ha values(1), (2), (3)")
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select count(*) from test_catalogd_ha")
|
||||
finally:
|
||||
self.execute_query_expect_success(
|
||||
self.client, "drop table if exists test_catalogd_ha")
|
||||
def __run_simple_queries(self, unique_database, sync_ddl=False):
|
||||
opts = {'sync_ddl': sync_ddl}
|
||||
self.execute_query_expect_success(self.client, "USE " + unique_database, opts)
|
||||
self.execute_query_expect_success(
|
||||
self.client, "drop table if exists test_catalogd_ha", opts)
|
||||
self.execute_query_expect_success(
|
||||
self.client, "create table if not exists test_catalogd_ha (id int)", opts)
|
||||
self.execute_query_expect_success(
|
||||
self.client, "insert into table test_catalogd_ha values(1), (2), (3)", opts)
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select count(*) from test_catalogd_ha", opts)
|
||||
self.execute_query_expect_success(
|
||||
self.client, "drop table if exists test_catalogd_ha", opts)
|
||||
self.execute_query_expect_success(self.client, "USE default", opts)
|
||||
|
||||
def __get_catalogds(self):
|
||||
"""Return tuple of (active_catalogd, standby_catalogd)."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert len(catalogds) == 2
|
||||
# Assert that /healthz page is OK.
|
||||
for catalogd in catalogds:
|
||||
port = catalogd.get_webserver_port()
|
||||
page = requests.get(self.HEALTHZ_URL.format(port))
|
||||
assert page.status_code == requests.codes.ok
|
||||
page = requests.head(self.HEALTHZ_URL.format(port))
|
||||
assert page.status_code == requests.codes.ok
|
||||
first_impalad = self.cluster.get_first_impalad()
|
||||
page = requests.head(self.HEALTHZ_URL.format(first_impalad.get_webserver_port()))
|
||||
assert page.status_code == requests.codes.ok
|
||||
|
||||
active_catalogd = catalogds[0]
|
||||
standby_catalogd = catalogds[1]
|
||||
if not active_catalogd.service.get_metric_value("catalog-server.active-status"):
|
||||
active_catalogd, standby_catalogd = standby_catalogd, active_catalogd
|
||||
assert active_catalogd.service.get_metric_value("catalog-server.active-status")
|
||||
assert not standby_catalogd.service.get_metric_value("catalog-server.active-status")
|
||||
return (active_catalogd, standby_catalogd)
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_catalogd_ha_with_two_catalogd(self):
|
||||
def test_catalogd_ha_with_two_catalogd(self, unique_database):
|
||||
self.__test_catalogd_ha_with_two_catalogd(unique_database)
|
||||
|
||||
def __test_catalogd_ha_with_two_catalogd(self, unique_database):
|
||||
"""The test case for cluster started with catalogd HA enabled."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -106,28 +134,28 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__run_simple_queries(unique_database)
|
||||
# Verify simple queries with sync_ddl as 1.
|
||||
self.__run_simple_queries(sync_ddl=True)
|
||||
self.__run_simple_queries(unique_database, sync_ddl=True)
|
||||
|
||||
# Restart one coordinator. Verify it get active catalogd address from statestore.
|
||||
self.cluster.impalads[0].restart()
|
||||
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.ready',
|
||||
self.cluster.impalads[1].restart()
|
||||
self.cluster.impalads[1].service.wait_for_metric_value('impala-server.ready',
|
||||
expected_value=1, timeout=30)
|
||||
self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--enable_catalogd_ha=true "
|
||||
"--use_subscriber_id_as_catalogd_priority=true "
|
||||
"--catalogd_ha_preemption_wait_period_ms=200",
|
||||
catalogd_args="--enable_catalogd_ha=true")
|
||||
def test_catalogd_ha_with_one_catalogd(self):
|
||||
def test_catalogd_ha_with_one_catalogd(self, unique_database):
|
||||
"""The test case for cluster with only one catalogd when catalogd HA is enabled."""
|
||||
# Verify the catalogd instances is created as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 1)
|
||||
assert len(catalogds) == 1
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -136,34 +164,44 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__run_simple_queries(unique_database)
|
||||
|
||||
def __test_catalogd_auto_failover(self):
|
||||
def __test_catalogd_auto_failover(self, unique_database):
|
||||
"""Stop active catalogd and verify standby catalogd becomes active.
|
||||
Restart original active catalogd. Verify that statestore does not resume its
|
||||
active role."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
|
||||
active role. If test_query_fail_during_failover is True, run a query during failover
|
||||
and comfirm that it is fail."""
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
catalogd_service_2 = standby_catalogd.service
|
||||
statestore_service = self.cluster.statestored.service
|
||||
|
||||
# Assert that cluster is set up with configs needed to run this test.
|
||||
assert 1000 >= int(statestore_service.get_flag_current_value(
|
||||
'active_catalogd_designation_monitoring_interval_ms'))
|
||||
assert 1000 >= int(statestore_service.get_flag_current_value(
|
||||
'statestore_heartbeat_frequency_ms'))
|
||||
|
||||
start_count_clear_topic_entries = statestore_service.get_metric_value(
|
||||
"statestore.num-clear-topic-entries-requests")
|
||||
|
||||
# Kill active catalogd
|
||||
catalogds[0].kill()
|
||||
active_catalogd.kill()
|
||||
|
||||
# Tes run a DDL query after active_catalogd killed.
|
||||
# This query should fail if coordinator has not receive update from StatestoreD about
|
||||
# the standby_catalogd becomes active.
|
||||
self.execute_query_expect_failure(
|
||||
self.client, "create table {}.table_creation_during_failover (id int)".format(
|
||||
unique_database))
|
||||
|
||||
# Wait for long enough for the statestore to detect the failure of active catalogd
|
||||
# and assign active role to standby catalogd.
|
||||
catalogd_service_2.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=True, timeout=30)
|
||||
assert(catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0)
|
||||
assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
assert catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0
|
||||
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -172,9 +210,9 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__run_simple_queries(unique_database)
|
||||
# Verify simple queries with sync_ddl as 1.
|
||||
self.__run_simple_queries(sync_ddl=True)
|
||||
self.__run_simple_queries(unique_database, sync_ddl=True)
|
||||
|
||||
end_count_clear_topic_entries = statestore_service.get_metric_value(
|
||||
"statestore.num-clear-topic-entries-requests")
|
||||
@@ -182,11 +220,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
|
||||
# Restart original active catalogd. Verify that statestore does not resume it as
|
||||
# active to avoid flip-flop.
|
||||
catalogds[0].start(wait_until_ready=True)
|
||||
active_catalogd.start(wait_until_ready=True)
|
||||
sleep(1)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
assert(not catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
assert not catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -197,66 +235,79 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
|
||||
"--statestore_heartbeat_frequency_ms=1000",
|
||||
"--statestore_heartbeat_frequency_ms=1000 "
|
||||
"--active_catalogd_designation_monitoring_interval_ms=1000",
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_catalogd_auto_failover(self):
|
||||
def test_catalogd_auto_failover(self, unique_database):
|
||||
"""Tests for Catalog Service auto fail over without failed RPCs."""
|
||||
self.__test_catalogd_auto_failover()
|
||||
self.__test_catalogd_auto_failover(unique_database)
|
||||
|
||||
statestore_service = self.cluster.statestored.service
|
||||
successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-successful-update-catalogd-rpc")
|
||||
failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-failed-update-catalogd-rpc")
|
||||
assert(successful_update_catalogd_rpc_num >= 6)
|
||||
assert(failed_update_catalogd_rpc_num == 0)
|
||||
assert successful_update_catalogd_rpc_num >= 6
|
||||
assert failed_update_catalogd_rpc_num == 0
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
|
||||
"--statestore_heartbeat_frequency_ms=1000 "
|
||||
"--active_catalogd_designation_monitoring_interval_ms=1000 "
|
||||
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0",
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_catalogd_auto_failover_with_failed_rpc(self):
|
||||
def test_catalogd_auto_failover_with_failed_rpc(self, unique_database):
|
||||
"""Tests for Catalog Service auto fail over with failed RPCs."""
|
||||
self.__test_catalogd_auto_failover()
|
||||
self.__test_catalogd_auto_failover(unique_database)
|
||||
|
||||
statestore_service = self.cluster.statestored.service
|
||||
successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-successful-update-catalogd-rpc")
|
||||
failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-failed-update-catalogd-rpc")
|
||||
assert(successful_update_catalogd_rpc_num >= 6)
|
||||
assert(failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num)
|
||||
assert successful_update_catalogd_rpc_num >= 6
|
||||
assert failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num
|
||||
|
||||
def __test_catalogd_manual_failover(self):
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
|
||||
"--statestore_heartbeat_frequency_ms=1000 "
|
||||
"--active_catalogd_designation_monitoring_interval_ms=1000 "
|
||||
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:SLEEP@3000",
|
||||
# minicluster has 68 Db when this test is written. So total sleep is ~3.4s.
|
||||
catalogd_args="--debug_actions=reset_metadata_loop_locked:SLEEP@50",
|
||||
start_args="--enable_catalogd_ha")
|
||||
@UniqueDatabase.parametrize(name_prefix='aa_test_catalogd_auto_failover_slow')
|
||||
def test_catalogd_auto_failover_slow(self, unique_database):
|
||||
"""Tests for Catalog Service auto fail over with both slow metadata reset and slow
|
||||
statestore update. Set 'aa_' as unique_database prefix to make the database among
|
||||
the earliest in reset metadata order."""
|
||||
self.__test_catalogd_auto_failover(unique_database)
|
||||
|
||||
def __test_catalogd_manual_failover(self, unique_database):
|
||||
"""Stop active catalogd and verify standby catalogd becomes active.
|
||||
Restart original active catalogd with force_catalogd_active as true. Verify that
|
||||
statestore resume it as active.
|
||||
"""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
catalogd_service_2 = standby_catalogd.service
|
||||
|
||||
statestore_service = self.cluster.statestored.service
|
||||
start_count_clear_topic_entries = statestore_service.get_metric_value(
|
||||
"statestore.num-clear-topic-entries-requests")
|
||||
|
||||
# Kill active catalogd
|
||||
catalogds[0].kill()
|
||||
active_catalogd.kill()
|
||||
|
||||
# Wait for long enough for the statestore to detect the failure of active catalogd
|
||||
# and assign active role to standby catalogd.
|
||||
catalogd_service_2.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=True, timeout=30)
|
||||
assert(catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0)
|
||||
assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
assert catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0
|
||||
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -266,7 +317,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
|
||||
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__run_simple_queries(unique_database)
|
||||
|
||||
end_count_clear_topic_entries = statestore_service.get_metric_value(
|
||||
"statestore.num-clear-topic-entries-requests")
|
||||
@@ -275,15 +326,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
|
||||
# Restart original active catalogd with force_catalogd_active as true.
|
||||
# Verify that statestore resume it as active.
|
||||
catalogds[0].start(wait_until_ready=True,
|
||||
additional_args="--force_catalogd_active=true")
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
active_catalogd.start(wait_until_ready=True,
|
||||
additional_args="--force_catalogd_active=true")
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
catalogd_service_1.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=True, timeout=15)
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
|
||||
sleep(sleep_time_s)
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
assert not catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -301,17 +352,17 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
"--statestore_heartbeat_frequency_ms=1000",
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_catalogd_manual_failover(self):
|
||||
def test_catalogd_manual_failover(self, unique_database):
|
||||
"""Tests for Catalog Service manual fail over without failed RPCs."""
|
||||
self.__test_catalogd_manual_failover()
|
||||
self.__test_catalogd_manual_failover(unique_database)
|
||||
|
||||
statestore_service = self.cluster.statestored.service
|
||||
successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-successful-update-catalogd-rpc")
|
||||
failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-failed-update-catalogd-rpc")
|
||||
assert(successful_update_catalogd_rpc_num >= 10)
|
||||
assert(failed_update_catalogd_rpc_num == 0)
|
||||
assert successful_update_catalogd_rpc_num >= 10
|
||||
assert failed_update_catalogd_rpc_num == 0
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
|
||||
@@ -319,17 +370,17 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
"--debug_actions=SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT:FAIL@1.0",
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_catalogd_manual_failover_with_failed_rpc(self):
|
||||
def test_catalogd_manual_failover_with_failed_rpc(self, unique_database):
|
||||
"""Tests for Catalog Service manual fail over with failed RPCs."""
|
||||
self.__test_catalogd_manual_failover()
|
||||
self.__test_catalogd_manual_failover(unique_database)
|
||||
|
||||
statestore_service = self.cluster.statestored.service
|
||||
successful_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-successful-update-catalogd-rpc")
|
||||
failed_update_catalogd_rpc_num = statestore_service.get_metric_value(
|
||||
"statestore.num-failed-update-catalogd-rpc")
|
||||
assert(successful_update_catalogd_rpc_num >= 10)
|
||||
assert(failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num)
|
||||
assert successful_update_catalogd_rpc_num >= 10
|
||||
assert failed_update_catalogd_rpc_num == successful_update_catalogd_rpc_num
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
|
||||
@@ -339,23 +390,17 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
def test_manual_failover_with_coord_ignore_notification(self):
|
||||
"""Tests for Catalog Service manual failover with coordinators to ignore failover
|
||||
notification."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
|
||||
# Restart standby catalogd with force_catalogd_active as true.
|
||||
catalogds[1].kill()
|
||||
catalogds[1].start(wait_until_ready=True,
|
||||
standby_catalogd.kill()
|
||||
standby_catalogd.start(wait_until_ready=True,
|
||||
additional_args="--force_catalogd_active=true")
|
||||
# Wait until original active catalogd becomes in-active.
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_1.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=False, timeout=15)
|
||||
assert(not catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert not catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
|
||||
# Run query to create a table. Coordinator still send request to catalogd_service_1
|
||||
# so that the request will be rejected.
|
||||
@@ -367,16 +412,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
|
||||
start_args="--enable_catalogd_ha",
|
||||
disable_log_buffering=True)
|
||||
def test_restart_statestore(self):
|
||||
def test_restart_statestore(self, unique_database):
|
||||
"""The test case for restarting statestore after the cluster is created with
|
||||
catalogd HA enabled."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -393,8 +433,11 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
expected_value=5, timeout=wait_time_s)
|
||||
sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
|
||||
sleep(sleep_time_s)
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
if not active_catalogd.service.get_metric_value("catalog-server.active-status"):
|
||||
active_catalogd, standby_catalogd = standby_catalogd, active_catalogd
|
||||
assert active_catalogd.service.get_metric_value("catalog-server.active-status")
|
||||
assert not standby_catalogd.service.get_metric_value("catalog-server.active-status")
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
@@ -403,7 +446,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__run_simple_queries(unique_database)
|
||||
|
||||
unexpected_msg = re.compile("Ignore the update of active catalogd since more recent "
|
||||
"update has been processed ([0-9]+ vs [0-9]+)")
|
||||
@@ -413,34 +456,13 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
@CustomClusterTestSuite.with_args(
|
||||
catalogd_args="--force_catalogd_active=true",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_two_catalogd_with_force_active(self):
|
||||
def test_two_catalogd_with_force_active(self, unique_database):
|
||||
"""The test case for cluster started with catalogd HA enabled and
|
||||
both catalogds started with 'force_catalogd_active' as true.
|
||||
Verify that one and only one catalogd is active."""
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
|
||||
sleep(sleep_time_s)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
!= catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
|
||||
# Verify ports of the active catalogd of statestore and impalad are matching with
|
||||
# the catalog service port of the current active catalogd.
|
||||
if catalogd_service_1.get_metric_value("catalog-server.active-status"):
|
||||
self.__verify_statestore_active_catalogd_port(catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
|
||||
else:
|
||||
self.__verify_statestore_active_catalogd_port(catalogd_service_2)
|
||||
self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
|
||||
self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
|
||||
self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
|
||||
|
||||
# Verify simple queries are ran successfully.
|
||||
self.__run_simple_queries()
|
||||
self.__test_catalogd_ha_with_two_catalogd(unique_database)
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
|
||||
@@ -450,32 +472,26 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
def test_catalogd_failover_with_sync_ddl(self, unique_database):
|
||||
"""Tests for Catalog Service force fail-over when running DDL with SYNC_DDL
|
||||
enabled."""
|
||||
# Verify two catalogd instances are created with one as active.
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_1 = active_catalogd.service
|
||||
|
||||
# Run DDL with SYNC_DDL enabled.
|
||||
client = self.cluster.impalads[0].service.create_hs2_client()
|
||||
assert client is not None
|
||||
try:
|
||||
client.set_configuration_option('sync_ddl', 1)
|
||||
client.set_configuration_option('sync_ddl', '1')
|
||||
ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
|
||||
handle = client.execute_async(ddl_query.format(database=unique_database))
|
||||
|
||||
# Restart standby catalogd with force_catalogd_active as true.
|
||||
start_s = time.time()
|
||||
catalogds[1].kill()
|
||||
catalogds[1].start(wait_until_ready=True,
|
||||
standby_catalogd.kill()
|
||||
standby_catalogd.start(wait_until_ready=True,
|
||||
additional_args="--force_catalogd_active=true")
|
||||
# Wait until original active catalogd becomes in-active.
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_1.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=False, timeout=15)
|
||||
assert(not catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert not catalogd_service_1.get_metric_value("catalog-server.active-status")
|
||||
elapsed_s = time.time() - start_s
|
||||
assert elapsed_s < SYNC_DDL_DELAY_S, \
|
||||
"Catalogd failover took %s seconds to complete" % (elapsed_s)
|
||||
@@ -492,12 +508,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_metadata_after_failover(self, unique_database):
|
||||
"""Verify that the metadata is correct after failover."""
|
||||
catalogds = self.cluster.catalogds()
|
||||
assert(len(catalogds) == 2)
|
||||
catalogd_service_1 = catalogds[0].service
|
||||
catalogd_service_2 = catalogds[1].service
|
||||
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
|
||||
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_2 = standby_catalogd.service
|
||||
|
||||
create_func_impala = ("create function {database}.identity_tmp(bigint) "
|
||||
"returns bigint location '{location}' symbol='Identity'")
|
||||
@@ -508,15 +520,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.client, "select %s.identity_tmp(10)" % unique_database)
|
||||
|
||||
# Kill active catalogd
|
||||
catalogds[0].kill()
|
||||
active_catalogd.kill()
|
||||
|
||||
# Wait for long enough for the statestore to detect the failure of active catalogd
|
||||
# and assign active role to standby catalogd.
|
||||
catalogd_service_2.wait_for_metric_value(
|
||||
"catalog-server.active-status", expected_value=True, timeout=30)
|
||||
assert(catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0)
|
||||
assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
|
||||
assert catalogd_service_2.get_metric_value(
|
||||
"catalog-server.ha-number-active-status-change") > 0
|
||||
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select %s.identity_tmp(10)" % unique_database)
|
||||
|
||||
@@ -146,12 +146,25 @@ class TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
|
||||
# catalog pushes a new topic update.
|
||||
self.cluster.catalogd.start()
|
||||
NUM_ATTEMPTS = 30
|
||||
database_found = False
|
||||
view_not_found = False
|
||||
for attempt in range(NUM_ATTEMPTS):
|
||||
try:
|
||||
self.assert_impalad_log_contains('WARNING', 'Detected catalog service restart')
|
||||
err = self.execute_query_expect_failure(client, "select * from %s" % view)
|
||||
assert "Could not resolve table reference" in str(err)
|
||||
break
|
||||
if not view_not_found:
|
||||
self.assert_impalad_log_contains(
|
||||
'WARNING', 'Detected catalog service restart')
|
||||
err = self.execute_query_expect_failure(
|
||||
client, "select * from {}".format(view))
|
||||
assert "Could not resolve table reference" in str(err)
|
||||
view_not_found = True
|
||||
if not database_found:
|
||||
# This part is only needed to ensure unique_database cleanup is successful.
|
||||
self.execute_query_unchecked(
|
||||
client, "show tables in {}".format(unique_database))
|
||||
database_found = True
|
||||
if view_not_found and database_found:
|
||||
# All assertions passed.
|
||||
break
|
||||
except Exception as e:
|
||||
assert attempt < NUM_ATTEMPTS - 1, str(e)
|
||||
time.sleep(1)
|
||||
|
||||
@@ -153,7 +153,7 @@ class TestProcessFailures(CustomClusterTestSuite):
|
||||
# case the query status should include the failed (or unreachable) worker.
|
||||
query_id = client.handle_id(handle)
|
||||
error_state = "Failed due to unreachable impalad"
|
||||
assert impalad.service.wait_for_query_status(client, query_id, error_state)
|
||||
assert impalad.service.wait_for_query_status(query_id, error_state)
|
||||
assert error_msg_startswith(client.get_log(handle), error_state, query_id)
|
||||
|
||||
# Assert that the query status on the query profile web page contains the expected
|
||||
|
||||
Reference in New Issue
Block a user