IMPALA-8806: Add metrics to improve observability of executor groups

This patch adds 3 metrics under a new metric group called
"cluster-membership" that keep track of the number of executor groups
that have at least one live executor, number of executor groups that are
in a healthy state and the number of backends registered with the
statestore.

Testing:
Modified tests to use these metrics for verification.

Change-Id: I7745ea1c7c6778d3fb5e59adbc873697beb0f3b9
Reviewed-on: http://gerrit.cloudera.org:8080/13979
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Bikramjeet Vig
2019-07-29 16:58:11 -07:00
committed by Impala Public Jenkins
parent 6a31be8dd7
commit b5193f36de
9 changed files with 177 additions and 48 deletions

View File

@@ -217,7 +217,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
}
cluster_membership_mgr_.reset(new ClusterMembershipMgr(
statestore_subscriber_->id(), statestore_subscriber_.get()));
statestore_subscriber_->id(), statestore_subscriber_.get(), metrics_.get()));
admission_controller_.reset(
new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),

View File

@@ -26,6 +26,7 @@
#include "service/impala-server.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/metrics.h"
using std::mt19937;
using std::uniform_int_distribution;
@@ -69,6 +70,7 @@ class ClusterMembershipMgrTest : public testing::Test {
/// A struct to hold information related to a simulated backend during the test.
struct Backend {
string backend_id;
std::unique_ptr<MetricGroup> metric_group;
std::unique_ptr<ClusterMembershipMgr> cmm;
std::shared_ptr<TBackendDescriptor> desc;
};
@@ -175,7 +177,9 @@ class ClusterMembershipMgrTest : public testing::Test {
/// this method.
void CreateCMM(Backend* be) {
ASSERT_TRUE(IsInVector(be, offline_));
be->cmm = make_unique<ClusterMembershipMgr>(be->backend_id, nullptr);
be->metric_group = make_unique<MetricGroup>("test");
be->cmm = make_unique<ClusterMembershipMgr>(
be->backend_id, nullptr, be->metric_group.get());
RemoveFromVector(be, &offline_);
starting_.push_back(be);
}
@@ -268,8 +272,10 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
ClusterMembershipMgr cmm1(b1->address.hostname, nullptr);
ClusterMembershipMgr cmm2(b2->address.hostname, nullptr);
MetricGroup tmp_metrics1("test-metrics1");
MetricGroup tmp_metrics2("test-metrics2");
ClusterMembershipMgr cmm1(b1->address.hostname, nullptr, &tmp_metrics1);
ClusterMembershipMgr cmm2(b2->address.hostname, nullptr, &tmp_metrics2);
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};

View File

@@ -19,6 +19,7 @@
#include "common/logging.h"
#include "common/names.h"
#include "util/metrics.h"
#include "util/test-info.h"
namespace {
@@ -44,12 +45,22 @@ ExecutorGroup* FindOrInsertExecutorGroup(const TExecutorGroupDesc& group,
namespace impala {
ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
StatestoreSubscriber* subscriber) :
current_membership_(std::make_shared<const Snapshot>()),
static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
static const string HEALTHY_EXEC_GROUP_KEY(
"cluster-membership.executor-groups.total-healthy");
static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
ClusterMembershipMgr::ClusterMembershipMgr(
string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
: current_membership_(std::make_shared<const Snapshot>()),
statestore_subscriber_(subscriber),
thrift_serializer_(/* compact= */ false),
local_backend_id_(move(local_backend_id)) {
DCHECK(metrics != nullptr);
MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
}
Status ClusterMembershipMgr::Init() {
@@ -311,6 +322,8 @@ void ClusterMembershipMgr::UpdateMembership(
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}
UpdateMetrics(*new_backend_map, *new_executor_groups);
// Don't send updates or update the current membership if the statestore is in its
// post-recovery grace period.
if (ss_is_recovering) {
@@ -527,4 +540,23 @@ bool ClusterMembershipMgr::CheckConsistency(const BackendIdMap& current_backends
return true;
}
void ClusterMembershipMgr::UpdateMetrics(
const BackendIdMap& current_backends, const ExecutorGroups& executor_groups) {
int total_live_executor_groups = 0;
int total_healthy_executor_groups = 0;
for (const auto& group_it : executor_groups) {
const ExecutorGroup& group = group_it.second;
if (group.IsHealthy()) {
total_live_executor_groups++;
total_healthy_executor_groups++;
} else if (group.NumHosts() > 0) {
total_live_executor_groups++;
}
}
DCHECK_GE(total_live_executor_groups, total_healthy_executor_groups);
total_live_executor_groups_->SetValue(total_live_executor_groups);
total_healthy_executor_groups_->SetValue(total_healthy_executor_groups);
total_backends_->SetValue(current_backends.size());
}
} // end namespace impala

View File

@@ -32,6 +32,7 @@
#include "scheduling/executor-group.h"
#include "statestore/statestore-subscriber.h"
#include "util/container-util.h"
#include "util/metrics-fwd.h"
namespace impala {
@@ -127,7 +128,8 @@ class ClusterMembershipMgr {
/// locks are held when calling this callback.
typedef std::function<Status(const TUpdateExecutorMembershipRequest&)> UpdateFrontendFn;
ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber);
ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber,
MetricGroup* metrics);
/// Initializes instances of this class. This only sets up the statestore subscription.
/// Callbacks to the local ImpalaServer and Frontend must be registered in separate
@@ -205,11 +207,20 @@ class ClusterMembershipMgr {
bool CheckConsistency(const BackendIdMap& current_backends,
const ExecutorGroups& executor_groups, const ExecutorBlacklist& executor_blacklist);
/// Updates the membership metrics.
void UpdateMetrics(const BackendIdMap& current_backends,
const ExecutorGroups& executor_groups);
/// Ensures that only one thread is processing a membership update at a time, either
/// from a statestore update or a blacklisting decision. Must be taken before any other
/// locks in this class.
boost::mutex update_membership_lock_;
/// Membership metrics
IntCounter* total_live_executor_groups_ = nullptr;
IntCounter* total_healthy_executor_groups_ = nullptr;
IntCounter* total_backends_ = nullptr;
/// The snapshot of the current cluster membership. When receiving changes to the
/// executors configuration from the statestore we will make a copy of the stored
/// object, apply the updates to the copy and atomically swap the contents of this

View File

@@ -694,10 +694,10 @@ void SchedulerWrapper::InitializeScheduler() {
<< "hosts.";
const Host& scheduler_host = plan_.cluster().hosts()[0];
string scheduler_backend_id = scheduler_host.ip;
cluster_membership_mgr_.reset(new ClusterMembershipMgr(scheduler_backend_id, nullptr));
cluster_membership_mgr_->SetLocalBeDescFn([scheduler_host]() {
return BuildBackendDescriptor(scheduler_host);
});
cluster_membership_mgr_.reset(
new ClusterMembershipMgr(scheduler_backend_id, nullptr, &metrics_));
cluster_membership_mgr_->SetLocalBeDescFn(
[scheduler_host]() { return BuildBackendDescriptor(scheduler_host); });
Status status = cluster_membership_mgr_->Init();
DCHECK(status.ok()) << "Cluster membership manager init failed in test";
scheduler_.reset(new Scheduler(&metrics_, nullptr));

View File

@@ -242,8 +242,12 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
"queries from clients. If false, it will refuse client connections.");
DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
"fragments.");
DEFINE_string(executor_groups, "", "List of executor groups, separated by comma. "
"Currently only a single group may be specified.");
DEFINE_string(executor_groups, "",
"List of executor groups, separated by comma. Each executor group specification can "
"optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
"default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
"contains at least that number of executors for the group will it be considered "
"healthy for admission. Currently only a single group may be specified.");
// TODO: can we automatically choose a startup grace period based on the max admission
// control queue timeout + some margin for error?

View File

@@ -2441,5 +2441,34 @@
"units": "NONE",
"kind": "GAUGE",
"key": "events-processor.events-received-15min-rate"
},
{
"description": "Total number of executor groups that have at least one executor",
"contexts": [
"IMPALAD"
],
"label": "Total number of executor groups that have at least one executor",
"units": "NONE",
"kind": "COUNTER",
"key": "cluster-membership.executor-groups.total"
},
{
"description": "Total number of executor groups that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
"contexts": [
"IMPALAD"
],
"label": "Total number of executor groups that are in a healthy state",
"units": "NONE",
"kind": "COUNTER",
"key": "cluster-membership.executor-groups.total-healthy"
},{
"description": "Total number of backends registered with the statestore",
"contexts": [
"IMPALAD"
],
"label": "Total number of backends registered with the statestore",
"units": "NONE",
"kind": "COUNTER",
"key": "cluster-membership.backends.total"
}
]

View File

@@ -42,8 +42,8 @@ class TestAutoScaling(CustomClusterTestSuite):
def _get_total_admitted_queries(self):
return self.impalad_test_service.get_total_admitted_queries("default-pool")
def _get_num_executors(self):
return self.impalad_test_service.get_num_known_live_backends(only_executors=True)
def _get_num_backends(self):
return self.impalad_test_service.get_metric_value("cluster-membership.backends.total")
def _get_num_running_queries(self):
return self.impalad_test_service.get_num_running_queries("default-pool")
@@ -67,9 +67,12 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for workers to spin up
assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
assert any(self._get_num_backends() >= cluster_size or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") >= 1
# Wait until we admitted at least 10 queries
assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -77,26 +80,30 @@ class TestAutoScaling(CustomClusterTestSuite):
"Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
# Wait for second executor group to start
num_expected = 2 * GROUP_SIZE
assert any(self._get_num_executors() == num_expected or sleep(1)
cluster_size = (2 * GROUP_SIZE) + 1
assert any(self._get_num_backends() >= cluster_size or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not reach %s within %s s" % (
num_expected, self.STATE_CHANGE_TIMEOUT_S)
cluster_size, self.STATE_CHANGE_TIMEOUT_S)
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") >= 2
# Wait for query rate to surpass the maximum for a single executor group plus 20%
min_query_rate = 1.2 * EXECUTOR_SLOTS
assert any(workload.get_query_rate() > min_query_rate or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Query rate did not surpass %s within %s s" % (
num_expected, self.STATE_CHANGE_TIMEOUT_S)
cluster_size, self.STATE_CHANGE_TIMEOUT_S)
LOG.info("Stopping workload")
workload.stop()
# Wait for workers to spin down
assert any(self._get_num_executors() == 0 or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
self.impalad_test_service.wait_for_metric_value(
"cluster-membership.backends.total", 1,
timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total") == 0
finally:
if workload:
@@ -122,9 +129,10 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for workers to spin up
assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
self.impalad_test_service.wait_for_metric_value(
"cluster-membership.backends.total", cluster_size,
timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
# Wait until we admitted at least 10 queries
assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -144,15 +152,18 @@ class TestAutoScaling(CustomClusterTestSuite):
"Unexpected number of running queries: %s" % num_running
# Check that only a single group started
assert self._get_num_executors() == GROUP_SIZE
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
LOG.info("Stopping workload")
workload.stop()
# Wait for workers to spin down
assert any(self._get_num_executors() == 0 or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
self.impalad_test_service.wait_for_metric_value(
"cluster-membership.backends.total", 1,
timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total") == 0
finally:
if workload:
@@ -179,22 +190,23 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for first executor to start up
assert any(self._get_num_executors() >= 1 or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
self.impalad_test_service.wait_for_metric_value(
"cluster-membership.executor-groups.total", 1,
timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
# Wait for remaining executors to start up and make sure that no queries are
# admitted during startup
end_time = time() + self.STATE_CHANGE_TIMEOUT_S
startup_complete = False
cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
while time() < end_time:
num_admitted = self._get_total_admitted_queries()
num_backends = self._get_num_executors()
if num_backends < GROUP_SIZE:
num_backends = self._get_num_backends()
if num_backends < cluster_size:
assert num_admitted == 0, "%s/%s backends started but %s queries have " \
"already been admitted." % (num_backends, GROUP_SIZE, num_admitted)
"already been admitted." % (num_backends, cluster_size, num_admitted)
if num_admitted > 0:
assert num_backends == GROUP_SIZE
assert num_backends == cluster_size
startup_complete = True
break
sleep(1)
@@ -205,9 +217,11 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.stop()
# Wait for workers to spin down
assert any(self._get_num_executors() == 0 or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
self.impalad_test_service.wait_for_metric_value(
"cluster-membership.backends.total", 1,
timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
assert self.impalad_test_service.get_metric_value(
"cluster-membership.executor-groups.total") == 0
finally:
if workload:

View File

@@ -42,6 +42,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
self.num_groups = 1
self.num_executors = 1
super(TestExecutorGroups, self).setup_method(method)
self.coordinator = self.cluster.impalads[0]
def _group_name(self, name):
# By convention, group names must start with their associated resource pool name
@@ -88,6 +89,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"""Tests that a query submitted to a coordinator with no executor group times out."""
result = self.execute_query_expect_failure(self.client, "select sleep(2)")
assert "Admission for query exceeded timeout" in str(result)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
@pytest.mark.execute_serially
def test_single_group(self):
@@ -95,6 +98,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select count(*) from functional.alltypestiny"
self._add_executor_group("group1", 2)
self.execute_query_expect_success(self.client, QUERY)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_executor_group_starts_while_qeueud(self):
@@ -105,8 +110,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
handle = client.execute_async(QUERY)
profile = client.get_runtime_profile(handle)
assert "No healthy executor groups found for pool" in profile
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
self._add_executor_group("group1", 2)
client.wait_for_finished_timeout(handle, 20)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_executor_group_health(self):
@@ -114,14 +123,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select count(*) from functional.alltypestiny"
# Start cluster and group
self._add_executor_group("group1", 2)
self.coordinator.service.wait_for_metric_value(
"cluster-membership.executor-groups.total-healthy", 1)
client = self.client
# Run query to validate
self.execute_query_expect_success(client, QUERY)
# Kill an executor
coordinator = self.cluster.impalads[0]
executor = self.cluster.impalads[1]
executor.kill()
coordinator.service.wait_for_num_known_live_backends(2)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
timeout=20)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
# Run query and observe timeout
handle = client.execute_async(QUERY)
profile = client.get_runtime_profile(handle)
@@ -132,6 +145,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
client.wait_for_finished_timeout(handle, 20)
# Run query and observe success
self.execute_query_expect_success(client, QUERY)
assert self.coordinator.service.wait_for_metric_value(
"cluster-membership.executor-groups.total-healthy", 1)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
@@ -146,19 +161,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
profile = client.get_runtime_profile(q2)
assert "Initial admission queue reason: number of running queries" in profile, profile
# Kill an executor
coordinator = self.cluster.impalads[0]
executor = self.cluster.impalads[1]
executor.kill()
coordinator.service.wait_for_num_known_live_backends(2)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
# Wait for q1 to finish (sleep runs on the coordinator)
client.wait_for_finished_timeout(q1, 20)
# Check that q2 still hasn't run
profile = client.get_runtime_profile(q2)
assert "Admission result: Queued" in profile, profile
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
# Restore executor group health
executor.start()
# Query should now finish
client.wait_for_finished_timeout(q2, 20)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_max_concurrent_queries(self):
@@ -184,6 +202,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
where month < 3 and id + random() < sleep(500);"
self._add_executor_group("group1", 2, max_concurrent_queries=1)
self._add_executor_group("group2", 2, max_concurrent_queries=1)
self.coordinator.service.wait_for_metric_value(
"cluster-membership.executor-groups.total-healthy", 2)
client = self.client
q1 = client.execute_async(QUERY)
client.wait_for_admission_control(q1)
@@ -257,6 +277,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select sleep(4)"
# Start first executor
self._add_executor_group("group1", 3, num_executors=1)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total") == 1
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
# Run query and observe that it gets queued
client = self.client
handle = client.execute_async(QUERY)
@@ -266,10 +291,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
initial_state = client.get_state(handle)
# Start another executor and observe that the query stays queued
self._add_executor_group("group1", 3, num_executors=1)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 3)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0
profile = client.get_runtime_profile(handle)
assert client.get_state(handle) == initial_state
# Start the remaining executor and observe that the query finishes
self._add_executor_group("group1", 3, num_executors=1)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 4)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
client.wait_for_finished_timeout(handle, 20)
@pytest.mark.execute_serially
@@ -283,16 +314,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Run query to make sure things work
QUERY = "select count(*) from functional.alltypestiny"
self.execute_query_expect_success(self.client, QUERY)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 1
# Kill executors to make group empty
impalads = self.cluster.impalads
impalads[1].kill()
impalads[2].kill()
impalads[0].service.wait_for_num_known_live_backends(1)
self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
# Run query to make sure it times out
result = self.execute_query_expect_failure(self.client, QUERY)
print str(result)
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
"pool default-pool. Queued reason: No healthy executor groups " \
"found for pool default-pool."
assert expected_error in str(result)
assert self.coordinator.service.get_metric_value(
"cluster-membership.executor-groups.total-healthy") == 0