IMPALA-13201: System Table Queries Execute When Admission Queues are Full

Queries that run only against in-memory system tables are currently
subject to the same admission control process as all other queries.
Since these queries do not use any resources on executors, admission
control does not need to consider the state of executors when
deciding to admit these queries.

This change adds a boolean configuration option 'onlyCoordinators'
to the fair-scheduler.xml file for specifying a request pool only
applies to the coordinators. When a query is submitted to a
coordinator only request pool, then no executors are required to be
running. Instead, all fragment instances are executed exclusively on
the coordinators.

A new member was added to the ClusterMembershipMgr::Snapshot struct
to hold the ExecutorGroup of all coordinators. This object is kept up
to date by processing statestore messages and is used when executing
queries that either require the coordinators (such as queries against
sys.impala_query_live) or that use an only coordinators request pool.

Testing was accomplished by:
1. Adding cluster membership manager ctests to assert cluster
   membership manager correctly builds the list of non-quiescing
   coordinators.
2. RequestPoolService JUnit tests to assert the new optional
   <onlyCoords> config in the fair scheduler xml file is correctly
   parsed.
3. ExecutorGroup ctests modified to assert the new function.
4. Custom cluster admission controller tests to assert queries with a
   coordinator only request pool only run on the active coordinators.

Change-Id: I5e0e64db92bdbf80f8b5bd85d001ffe4c8c9ffda
Reviewed-on: http://gerrit.cloudera.org:8080/22249
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
jasonmfehr
2025-01-14 15:24:48 -08:00
committed by Impala Public Jenkins
parent e83a8e312a
commit aac67a077e
26 changed files with 716 additions and 100 deletions

View File

@@ -2250,8 +2250,14 @@ Status AdmissionController::ComputeGroupScheduleStates(
}
const BackendDescriptorPB& coord_desc = it->second;
vector<const ExecutorGroup*> executor_groups =
GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
vector<const ExecutorGroup*> executor_groups;
if (UNLIKELY(queue_node->pool_cfg.only_coordinators)) {
executor_groups = {&membership_snapshot->all_coordinators};
} else {
executor_groups =
GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
}
if (executor_groups.empty()) {
queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS;
@@ -2261,7 +2267,7 @@ Status AdmissionController::ComputeGroupScheduleStates(
// Collect all coordinators if needed for the request.
ExecutorGroup coords = request.request.include_all_coordinators ?
membership_snapshot->GetCoordinators() : ExecutorGroup("all-coordinators");
membership_snapshot->all_coordinators : ExecutorGroup("all-coordinators");
// We loop over the executor groups in a deterministic order. If
// --balance_queries_across_executor_groups set to true, executor groups with more

View File

@@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <deque>
#include <memory>
#include <sstream>
#include <vector>
#include "common/logging.h"
#include "common/names.h"
@@ -272,9 +274,38 @@ class ClusterMembershipMgrTest : public testing::Test {
}
};
/// Asserts a provided list of expected hostnames are in the all_coordinators
/// ExecutorGroup of the provided ClusterMembershipMgr.
void _assertCoords(
ClusterMembershipMgr& cmm, const std::vector<const char*> expected_hostnames) {
ExecutorGroup::Executors actual_coords =
cmm.GetSnapshot()->all_coordinators.GetAllExecutorDescriptors();
std::ostringstream actual_hostnames;
for (const auto& actual : actual_coords) {
actual_hostnames << " " << actual.address().hostname();
}
ASSERT_EQ(expected_hostnames.size(), actual_coords.size())
<< "Actual hostnames:" << actual_hostnames.str();
for (auto expected : expected_hostnames) {
bool found = false;
for (const auto& actual : actual_coords) {
if (actual.address().hostname() == expected) {
found = true;
break;
}
}
EXPECT_TRUE(found) << "did not find expected coordinator '" << expected
<< "' in actual coordinators:" << actual_hostnames.str();
}
}
/// This test takes two instances of the ClusterMembershipMgr through a common lifecycle.
/// It also serves as an example for how to craft statestore messages and pass them to
/// UpdaUpdateMembership().
/// UpdateMembership().
TEST_F(ClusterMembershipMgrTest, TwoInstances) {
auto b1 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(1));
auto b2 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(2));
@@ -306,6 +337,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
// First manager now has one BE
ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size());
_assertCoords(cmm1, {"host_1"});
_assertCoords(cmm2, {});
// Hook up second callback and iterate with the result of the first manager
cmm2.SetLocalBeDescFn([b2]() { return b2; });
@@ -314,6 +347,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(1, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
_assertCoords(cmm1, {"host_1"});
_assertCoords(cmm2, {"host_1", "host_2"});
// Send the returned update to the first manager, this time no deltas will be returned
*ss_topic_delta = returned_topic_deltas[0];
@@ -321,6 +356,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
_assertCoords(cmm1, {"host_1", "host_2"});
_assertCoords(cmm2, {"host_1", "host_2"});
// Both managers now have the same state. Shutdown one of them and step through
// propagating the update.
@@ -334,6 +371,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
// It will also remove itself from the executor group (but not the current backends).
ASSERT_EQ(1, GetDefaultGroupSize(cmm1));
ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
_assertCoords(cmm1, {"host_1", "host_2"});
_assertCoords(cmm2, {"host_1", "host_2"});
// Propagate the quiescing to the 2nd mgr
*ss_topic_delta = returned_topic_deltas[0];
@@ -343,6 +382,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
_assertCoords(cmm1, {"host_1", "host_2"});
_assertCoords(cmm2, {"host_2"});
// Delete the 1st backend from the 2nd one
ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
@@ -351,6 +392,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
ASSERT_EQ(0, returned_topic_deltas.size());
ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
_assertCoords(cmm1, {"host_1", "host_2"});
_assertCoords(cmm2, {"host_2"});
}
TEST_F(ClusterMembershipMgrTest, IsBlacklisted) {
@@ -447,6 +490,11 @@ TEST_F(ClusterMembershipMgrTest, ExecutorBlacklist) {
DeleteBackend(backends_[2].get());
EXPECT_EQ(NUM_BACKENDS - 1, backends_[0]->cmm->GetSnapshot()->current_backends.size());
EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
// Assert blacklisting executors does not impact the all_coordinators ExecutorGroup.
// host_1 was quiesced and host_2 was deleted, both actions do impact all_coordinators.
_assertCoords(*backends_[0]->cmm, {"host_0"});
_assertCoords(*backends_[1]->cmm, {"host_0", "host_1"});
}
// This test runs a group of 20 backends through their full lifecycle, validating that

View File

@@ -170,21 +170,11 @@ ClusterMembershipMgr::SnapshotPtr ClusterMembershipMgr::GetSnapshot() const {
return state;
}
static bool is_active_coordinator(const BackendDescriptorPB& be) {
static inline bool is_active_coordinator(const BackendDescriptorPB& be) {
return be.has_is_coordinator() && be.is_coordinator() &&
!(be.has_is_quiescing() && be.is_quiescing());
}
ExecutorGroup ClusterMembershipMgr::Snapshot::GetCoordinators() const {
ExecutorGroup coordinators("all-coordinators");
for (const auto& it : current_backends) {
if (is_active_coordinator(it.second)) {
coordinators.AddExecutor(it.second);
}
}
return coordinators;
}
vector<TNetworkAddress> ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses() const {
vector<TNetworkAddress> coordinators;
for (const auto& it : current_backends) {
@@ -197,6 +187,19 @@ vector<TNetworkAddress> ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses(
return coordinators;
}
static inline void _removeCoordIfExists(
const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
const BackendDescriptorPB& be) {
// The BackendDescriptorPB may be incomplete. Use the backend id to retrieve the actual
// backend descriptor so the backend can be removed.
const BackendDescriptorPB* actual_be =
state->all_coordinators.LookUpBackendDesc(be.backend_id());
if (actual_be != nullptr) {
state->all_coordinators.RemoveExecutor(*actual_be);
}
}
void ClusterMembershipMgr::UpdateMembership(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
@@ -298,6 +301,11 @@ void ClusterMembershipMgr::UpdateMembership(
}
}
new_backend_map->erase(item.key);
// If a coordinator is not shutdown gracefully, then it will be deleted here.
if (be_desc.is_coordinator()) {
_removeCoordIfExists(new_state, be_desc);
}
}
continue;
}
@@ -346,32 +354,45 @@ void ClusterMembershipMgr::UpdateMembership(
if (existing.is_quiescing()) DCHECK(be_desc.is_quiescing());
// If the node starts quiescing
if (be_desc.is_quiescing() && !existing.is_quiescing() && existing.is_executor()) {
// If the backend starts quiescing and it is present in the blacklist, remove it
// from the blacklist. If the backend is present in the blacklist, there is no
// need to remove it from the executor group because it has already been removed
bool blacklisted = new_blacklist->FindAndRemove(be_desc)
== ExecutorBlacklist::State::BLACKLISTED;
if (blacklisted) {
VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)";
DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
} else {
// Executor needs to be removed from its groups
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Removing backend " << item.key << " from group "
<< group.DebugString() << " (quiescing)";
RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
if (be_desc.is_quiescing() && !existing.is_quiescing()) {
if (existing.is_executor()) {
// If the backend starts quiescing and it is present in the blacklist, remove it
// from the blacklist. If the backend is present in the blacklist, there is no
// need to remove it from the executor group because it has already been removed
bool blacklisted = new_blacklist->FindAndRemove(be_desc)
== ExecutorBlacklist::State::BLACKLISTED;
if (blacklisted) {
VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)";
DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
} else {
// Executor needs to be removed from its groups
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Removing backend " << item.key << " from group "
<< group.DebugString() << " (quiescing)";
RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
}
}
}
if (existing.is_coordinator()) {
_removeCoordIfExists(new_state, be_desc);
}
}
existing = be_desc;
} else {
// Create
new_backend_map->insert(make_pair(item.key, be_desc));
if (!be_desc.is_quiescing() && be_desc.is_executor()) {
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Adding backend " << item.key << " to group " << group.DebugString();
FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
if (!be_desc.is_quiescing()) {
if (be_desc.is_executor()) {
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Adding backend " << item.key << " to group " <<
group.DebugString();
FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
}
}
if (is_active_coordinator(be_desc)) {
new_state->all_coordinators.AddExecutor(be_desc);
}
}
// Since this backend is new, it cannot already be on the blacklist or probation.
@@ -415,6 +436,13 @@ void ClusterMembershipMgr::UpdateMembership(
}
}
}
// Add ourself to the list of all coordinators.
if (is_active_coordinator(*local_be_desc.get())) {
_removeCoordIfExists(new_state, *local_be_desc);
new_state->all_coordinators.AddExecutor(*local_be_desc);
}
AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}

View File

@@ -86,10 +86,8 @@ class ClusterMembershipMgr {
// Clients can obtain an immutable copy. Class instances can be created through the
// implicitly-defined default and copy constructors.
struct Snapshot {
Snapshot() = default;
Snapshot() : all_coordinators("all-coordinators") {};
Snapshot(const Snapshot&) = default;
/// Returns an executor group of all non-quiescing coordinators in the cluster.
ExecutorGroup GetCoordinators() const;
/// Returns the addresses of all non-quiescing coordinators in the cluster.
std::vector<TNetworkAddress> GetCoordinatorAddresses() const;
/// The current backend descriptor of the local backend.
@@ -111,6 +109,10 @@ class ClusterMembershipMgr {
/// The version of this Snapshot. It is incremented every time the cluster membership
/// changes.
int64_t version = 0;
// Executor group of all non-quiescing coordinators in the cluster. Set during the
// SetState() function.
ExecutorGroup all_coordinators;
};
/// An immutable shared membership snapshot.

View File

@@ -81,5 +81,18 @@ BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset,
return MakeBackendDescriptor(idx, group_desc, port_offset, admit_mem_limit);
}
void AssertLookupById(const BackendDescriptorPB& exec1, const BackendDescriptorPB& exec2,
const ExecutorGroup& group) {
const BackendDescriptorPB* actual_exec1 = group.LookUpBackendDesc(exec1.backend_id());
ASSERT_NE(nullptr, actual_exec1);
ASSERT_EQ(exec1.address().hostname(), actual_exec1->address().hostname());
ASSERT_EQ(exec1.address().port(), actual_exec1->address().port());
const BackendDescriptorPB* actual_exec2 = group.LookUpBackendDesc(exec2.backend_id());
ASSERT_NE(nullptr, actual_exec2);
ASSERT_EQ(exec2.address().hostname(), actual_exec2->address().hostname());
ASSERT_EQ(exec2.address().port(), actual_exec2->address().port());
}
} // end namespace test
} // end namespace impala

View File

@@ -55,5 +55,10 @@ BackendDescriptorPB MakeBackendDescriptor(
BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset = 0,
int64_t admit_mem_limit = 4L * MEGABYTE);
/// Assert the LookupBackendDesc() function returns correct results when passed a
/// backend id.
void AssertLookupById(const BackendDescriptorPB& exec1, const BackendDescriptorPB& exec2,
const ExecutorGroup& group);
} // end namespace test
} // end namespace impala

View File

@@ -31,12 +31,17 @@ using namespace impala::test;
TEST(ExecutorGroupTest, AddExecutors) {
ExecutorGroup group1("group1");
ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
int64_t mem_limit_admission1 = 100L * MEGABYTE;
group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
mem_limit_admission1));
BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* port_offset=*/0,
mem_limit_admission1);
group1.AddExecutor(exec1);
int64_t mem_limit_admission2 = 120L * MEGABYTE;
group1.AddExecutor(MakeBackendDescriptor(2, group1, /* port_offset=*/0,
mem_limit_admission2));
BackendDescriptorPB exec2 = MakeBackendDescriptor(2, group1, /* port_offset=*/0,
mem_limit_admission2);
group1.AddExecutor(exec2);
ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
ASSERT_EQ(2, group1.NumExecutors());
IpAddr backend_ip;
@@ -44,23 +49,32 @@ TEST(ExecutorGroupTest, AddExecutors) {
EXPECT_EQ("10.0.0.1", backend_ip);
ASSERT_TRUE(group1.LookUpExecutorIp("host_2", &backend_ip));
EXPECT_EQ("10.0.0.2", backend_ip);
AssertLookupById(exec1, exec2, group1);
}
/// Test adding multiple backends on the same host.
TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
ExecutorGroup group1("group1");
int64_t mem_limit_admission1 = 120L * MEGABYTE;
group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
mem_limit_admission1));
const BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* port_offset=*/0,
mem_limit_admission1);
group1.AddExecutor(exec1);
int64_t mem_limit_admission2 = 100L * MEGABYTE;
group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1,
mem_limit_admission2));
const BackendDescriptorPB exec2 = MakeBackendDescriptor(1, group1, /* port_offset=*/1,
mem_limit_admission2);
group1.AddExecutor(exec2);
ASSERT_EQ(mem_limit_admission2, group1.GetPerExecutorMemLimitForAdmission());
IpAddr backend_ip;
ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
EXPECT_EQ("10.0.0.1", backend_ip);
const ExecutorGroup::Executors& backend_list = group1.GetExecutorsForHost("10.0.0.1");
EXPECT_EQ(2, backend_list.size());
AssertLookupById(exec1, exec2, group1);
}
/// Test removing a backend.
@@ -112,6 +126,8 @@ TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) {
EXPECT_EQ(1, backend_list.size());
group1.RemoveExecutor(executor1);
ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
ASSERT_EQ(nullptr, group1.LookUpBackendDesc(executor2.backend_id()));
}
/// Test that exercises the size-based group health check.

View File

@@ -196,6 +196,19 @@ const BackendDescriptorPB* ExecutorGroup::LookUpBackendDesc(
return nullptr;
}
const BackendDescriptorPB* ExecutorGroup::LookUpBackendDesc(
const UniqueIdPB& be_id) const {
for (const auto& executor_list : executor_map_) {
for (const auto& backend : executor_list.second){
if (backend.backend_id().hi() == be_id.hi()
&& backend.backend_id().lo() == be_id.lo()) {
return &backend;
}
}
}
return nullptr;
}
int ExecutorGroup::NumExecutors() const {
int count = 0;
for (const auto& executor_list : executor_map_) count += executor_list.second.size();

View File

@@ -96,6 +96,12 @@ class ExecutorGroup {
/// change while it holds the pointer.
const BackendDescriptorPB* LookUpBackendDesc(const NetworkAddressPB& host) const;
/// Looks up the backend descriptor for the executor with an id matching the provided
/// executor id. Returns nullptr if no executor is found. The returned descriptor should
/// not be retained beyond the lifetime of this ExecutorGroup and the caller must make
/// sure that the group does not change while it holds the pointer.
const BackendDescriptorPB* LookUpBackendDesc(const UniqueIdPB& be_id) const;
/// Returns the hash ring associated with this executor group. It's owned by the group
/// and the caller must not hold a reference beyond the groups lifetime.
const HashRing* GetHashRing() const { return &executor_ip_hash_ring_; }

View File

@@ -2504,7 +2504,7 @@ Status ClientRequestState::TryKillQueryRemotely(
// coordinator in the cluster, it will be the status to return.
Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id));
ExecutorGroup all_coordinators =
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->GetCoordinators();
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators;
// Skipping the current impalad.
unique_ptr<ExecutorGroup> other_coordinators{ExecutorGroup::GetFilteredExecutorGroup(
&all_coordinators, {ExecEnv::GetInstance()->krpc_address()})};

View File

@@ -714,11 +714,11 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
}
if (!authenticated) {
if (use_jwt_) {
LOG(INFO) << "Invalid JWT token provided: " << bearer_token;
LOG(INFO) << "Invalid JWT token provided";
total_jwt_token_auth_failure_->Increment(1);
}
if (use_oauth_) {
LOG(INFO) << "Invalid OAuth token provided: " << bearer_token;
LOG(INFO) << "Invalid OAuth token provided";
total_oauth_token_auth_failure_->Increment(1);
}
}

View File

@@ -1147,7 +1147,7 @@ if __name__ == "__main__":
use_exclusive_coordinators, existing_cluster_size)
expected_cluster_size += existing_cluster_size
elif options.add_impalads:
cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
cluster_ops.start_impalads(options.num_coordinators, options.num_coordinators,
options.use_exclusive_coordinators,
existing_cluster_size)
expected_cluster_size += existing_cluster_size
@@ -1171,7 +1171,9 @@ if __name__ == "__main__":
if options.add_impalads:
# TODO: This is a hack to make the waiting logic work. We'd better add a dedicated
# option for adding a new cluster using the existing catalogd and statestore.
# https://issues.apache.org/jira/browse/IMPALA-13755
expected_num_ready_impalads = options.cluster_size
expected_cluster_size = options.cluster_size
impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads)
except Exception as e:
LOG.exception("Error starting cluster")

View File

@@ -232,6 +232,12 @@ struct TPoolConfig {
// If a rule for the user is not present in user_query_limits, then these rules
// are evaluated, if the user is a member of a group.
13: required map<string, i32> group_query_limits
// Specifies the state of the onlyCoordinators configuration element for a request pool.
// When request pools are configured, they can be specified as being only coordinators
// which means that pool only considers resources from the coordinator nodes and queries
// for that pool schedule fragment instances on coordinator nodes only.
14: optional bool only_coordinators
}
struct TParseDateStringResult {

View File

@@ -144,10 +144,10 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
<p>
Here are sample <filepath>fair-scheduler.xml</filepath> and
<filepath>llama-site.xml</filepath> files that define resource pools
<codeph>root.default</codeph>, <codeph>root.development</codeph>, and
<codeph>root.production</codeph>. These files define resource pools for Impala
admission control and are separate from the similar
<codeph>fair-scheduler.xml</codeph>that defines resource pools for YARN.
<codeph>root.default</codeph>, <codeph>root.development</codeph>,
<codeph>root.production</codeph>, and <codeph>root.coords</codeph>.
These files define resource pools for Impala admission control and are separate
from the similar <codeph>fair-scheduler.xml</codeph>that defines resource pools for YARN.
</p>
<p>
@@ -173,6 +173,36 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
to those pools.
</p>
<p>
The <codeph>&lt;onlyCoordinators&gt;</codeph> element is a boolean that defaults
to <codeph>false</codeph>. If this value is set to <codeph>true</codeph>, the
named request pool will contain only the coordinators and none of the executors.
The main purpose of this setting is to enable running queries against the
<codeph>sys.impala_query_live</codeph> table from workload management. Since the
data for this table is stored in the memory of the coordinators, the executors
do not need to be involved if the query only selects from this table.
</p>
<p>
To use an <codeph>&lt;onlyCoordinators&gt;</codeph> request pool, set the
<codeph>REQUEST_POOL</codeph> query option to the name of the
<codeph>&lt;onlyCoordinators&gt;</codeph> request pool. <b>Caution</b> even though
these request pools do not contain executors, they can still run any query.
Thus, while the <codeph>REQUEST_POOL</codeph> query option is set to an only
coordinators request pool, queries have the potential to run the coordinators
out of resources.
</p>
<p>
Caution: care must be taken when naming the <codeph>&lt;onlyCoordinators&gt;</codeph>
request pool. If the name has the same prefix as a named executor group set, then
queries may be automatically routed to the request pool. For example, if the
coordinator is configured with
<codeph>--expected_executor_group_sets=prefix1:10</codeph>, then an only coordinators
request pool named <codeph>prefix1-onlycoords<codeph> will potentially have
queries routed to it.
</p>
<codeblock>&lt;allocations>
&lt;queue name="root">
@@ -189,6 +219,11 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
&lt;maxResources>1000000 mb, 0 vcores&lt;/maxResources>
&lt;aclSubmitApps> ops,admin&lt;/aclSubmitApps>
&lt;/queue>
&lt;queue name="coords">
&lt;maxResources>1000000 mb, 0 vcores&lt;/maxResources>
&lt;aclSubmitApps>ops,admin&lt;/aclSubmitApps>
&lt;onlyCoordinators>true&lt;/onlyCoordinators>
&lt;/queue>
&lt;/queue>
&lt;queuePlacementPolicy>
&lt;rule name="specified" create="false"/>

View File

@@ -230,6 +230,8 @@ import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.RequestPoolService;
import org.apache.impala.util.TResultRowBuilder;
import org.apache.impala.util.TSessionStateUtil;
import static org.apache.impala.yarn.server.resourcemanager.scheduler.fair.
AllocationFileLoaderService.ROOT_POOL_NAME;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduTransaction;
@@ -2196,8 +2198,28 @@ public class Frontend {
Preconditions.checkState(
!clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());
List<TExecutorGroupSet> originalExecutorGroupSets =
ExecutorMembershipSnapshot.getAllExecutorGroupSets();
boolean coordOnlyRequestPool = false;
if (clientSetRequestPool && RequestPoolService.getInstance() != null) {
final String pool_name = StringUtils.prependIfMissing(
queryOptions.getRequest_pool(), ROOT_POOL_NAME + ".");
coordOnlyRequestPool =
RequestPoolService.getInstance().getPoolConfig(pool_name).only_coordinators;
}
List<TExecutorGroupSet> originalExecutorGroupSets;
if (coordOnlyRequestPool) {
// The query is set to use an only coordinators request pool which means that no
// executors will be involved in query execution. Thus, the planner must ignore
// all executor groups and select the default group instead. The backend will ensure
// the query is scheduled on the coordinators.
originalExecutorGroupSets = new ArrayList<>(1);
TExecutorGroupSet all_coords = new TExecutorGroupSet();
originalExecutorGroupSets.add(all_coords);
} else {
originalExecutorGroupSets = ExecutorMembershipSnapshot.getAllExecutorGroupSets();
}
LOG.info("The original executor group sets from executor membership snapshot: "
+ originalExecutorGroupSets);
@@ -2314,6 +2336,9 @@ public class Frontend {
} else if (!Frontend.canStmtBeAutoScaled(req)) {
reason = "query is not auto-scalable";
notScalable = true;
} else if (coordOnlyRequestPool) {
reason = "only coordinators request pool specified";
notScalable = true;
}
if (notScalable) {

View File

@@ -378,6 +378,7 @@ public class RequestPoolService {
result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT);
result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT);
result.setDefault_query_options("");
result.setOnly_coordinators(false);
} else {
// Capture the current conf_ in case it changes while we're using it.
Configuration currentConf = conf_;
@@ -403,6 +404,7 @@ public class RequestPoolService {
getPoolConfigValue(currentConf, pool, MAX_QUERY_CPU_CORE_PER_NODE_LIMIT, 0L));
result.setMax_query_cpu_core_coordinator_limit(getPoolConfigValue(
currentConf, pool, MAX_QUERY_CPU_CORE_COORDINATOR_LIMIT, 0L));
result.setOnly_coordinators(allocationConf_.get().isOnlyCoordinators(pool));
}
if (LOG.isTraceEnabled()) {
LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={},"

View File

@@ -356,7 +356,7 @@ public class TestRequestPoolService {
10000L, "mem_limit=1024m,query_timeout_s=10");
checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
"mem_limit=1024m", 1000, 10, false, 8, 8, null, null);
"mem_limit=1024m", 1000, 10, false, 8, 8, null, null, true);
Map<String, Integer> queueDUserQueryLimits = new HashMap<>();
queueDUserQueryLimits.put("userA", 2);
@@ -377,7 +377,8 @@ public class TestRequestPoolService {
createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
checkPoolAcls("root.userA", asList("userA", "userB", "userZ"), EMPTY_LIST);
checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, null, null);
checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, null, null,
false);
}
@Ignore("IMPALA-4868") @Test
@@ -684,7 +685,7 @@ public class TestRequestPoolService {
String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit,
boolean clamp_mem_limit_query_option, long max_query_cpu_core_per_node_limit,
long max_query_cpu_core_coordinator_limit, Map<String, Integer> userQueryLimits,
Map<String, Integer> groupQueryLimits) {
Map<String, Integer> groupQueryLimits, boolean onlyCoordinators) {
TPoolConfig expectedResult = new TPoolConfig();
expectedResult.setMax_requests(expectedMaxRequests);
expectedResult.setMax_queued(expectedMaxQueued);
@@ -706,6 +707,9 @@ public class TestRequestPoolService {
userQueryLimits != null ? userQueryLimits : Collections.emptyMap());
expectedResult.setGroup_query_limits(
groupQueryLimits != null ? groupQueryLimits : Collections.emptyMap());
expectedResult.setOnly_coordinators(onlyCoordinators);
TPoolConfig poolConfig = poolService_.getPoolConfig(pool);
Assert.assertEquals(
"Unexpected config values for pool " + pool, expectedResult, poolConfig);
@@ -725,7 +729,7 @@ public class TestRequestPoolService {
Map<String, Integer> groupQueryLimits) {
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued, expectedMaxMem,
expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0, 0,
userQueryLimits, groupQueryLimits);
userQueryLimits, groupQueryLimits, false);
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,

View File

@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<allocations>
<queue name="group-set-small">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<!-- Set a huge amount of memory to enable memory based admission control -->
<maxResources>50000000 mb, 0 vcores</maxResources>
</queue>
<queue name="group-set-large">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<!-- Set a huge amount of memory to enable memory based admission control -->
<maxResources>50000000 mb, 0 vcores</maxResources>
</queue>
<queue name="onlycoords">
<maxResources>3000 mb, 0 vcores</maxResources>
<aclSubmitApps>* </aclSubmitApps>
<onlyCoordinators>true</onlyCoordinators>
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false"/>
<rule name="reject" />
</queuePlacementPolicy>
</allocations>

View File

@@ -23,9 +23,11 @@
<queue name="queueC">
<aclSubmitApps>* </aclSubmitApps>
<maxResources>1024 mb, 0 vcores</maxResources>
<onlyCoordinators>true</onlyCoordinators>
</queue>
<queue name="queueD">
<aclSubmitApps>userA,userB </aclSubmitApps>
<onlyCoordinators>false</onlyCoordinators>
<userQueryLimit>
<user>*</user>
<totalCount>3</totalCount>

View File

@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-small</name>
<value>12</value>
</property>
<property>
<name>impala.admission-control.max-query-mem-limit.root.group-set-small</name>
<value>53687091200</value>
</property>
<property>
<name>impala.admission-control.max-mt-dop.root.group-set-small</name>
<value>16</value>
</property>
<property>
<name>impala.admission-control.min-query-mem-limit.root.group-set-small</name>
<value>2147483648</value>
</property>
<property>
<name>impala.admission-control.pool-queue-timeout-ms.root.group-set-small</name>
<value>600000</value>
</property>
<property>
<name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-large</name>
<value>12</value>
</property>
<property>
<name>impala.admission-control.max-query-mem-limit.root.group-set-large</name>
<value>53687091200</value>
</property>
<property>
<name>impala.admission-control.max-mt-dop.root.group-set-large</name>
<value>16</value>
</property>
<property>
<name>impala.admission-control.min-query-mem-limit.root.group-set-large</name>
<value>2147483648</value>
</property>
<property>
<name>impala.admission-control.pool-queue-timeout-ms.root.group-set-large</name>
<value>600000</value>
</property>
<property>
<name>llama.am.throttling.maximum.placed.reservations.onlycoords</name>
<value>1</value>
</property>
<property>
<name>llama.am.throttling.maximum.queued.reservations.onlycoords</name>
<value>5</value>
</property>
<property>
<name>impala.admission-control.pool-queue-timeout-ms.onlycoords</name>
<value>30000</value>
</property>
<property>
<name>impala.admission-control.max-query-mem-limit.onlycoords</name>
<value>1610612736</value><!--1.5GB-->
</property>
<property>
<name>impala.admission-control.min-query-mem-limit.onlycoords</name>
<value>52428800</value><!--50MB-->
</property>
</configuration>

View File

@@ -56,6 +56,9 @@ public class AllocationConfiguration {
private final Map<String, Map<String, Integer>> userQueryLimits;
private final Map<String, Map<String, Integer>> groupQueryLimits;
// Specifies if each queue contains all nodes or only coordinators.
private final Map<String, Boolean> onlyCoordinators;
// Policy for mapping apps to queues
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
@@ -80,6 +83,7 @@ public class AllocationConfiguration {
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<String, Integer>> userQueryLimits,
Map<String, Map<String, Integer>> groupQueryLimits,
Map<String, Boolean> onlyCoordinators,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> nonPreemptableQueues) {
@@ -89,6 +93,7 @@ public class AllocationConfiguration {
this.queueAcls = queueAcls;
this.userQueryLimits = userQueryLimits;
this.groupQueryLimits = groupQueryLimits;
this.onlyCoordinators = onlyCoordinators;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
}
@@ -100,6 +105,7 @@ public class AllocationConfiguration {
queueAcls = new HashMap<>();
userQueryLimits = new HashMap<>();
groupQueryLimits = new HashMap<>();
onlyCoordinators = new HashMap<>();
configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
@@ -202,4 +208,8 @@ public class AllocationConfiguration {
Map<String, Integer> limits = groupQueryLimits.get(queueName);
return limits != null ? limits : Collections.emptyMap();
}
public boolean isOnlyCoordinators(String queueName) {
return onlyCoordinators.getOrDefault(queueName, Boolean.FALSE).booleanValue();
}
}

View File

@@ -59,10 +59,10 @@ import com.google.common.annotations.VisibleForTesting;
@Public
@Unstable
public class AllocationFileLoaderService extends AbstractService {
public static final Log LOG = LogFactory.getLog(
AllocationFileLoaderService.class.getName());
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
@@ -74,26 +74,28 @@ public class AllocationFileLoaderService extends AbstractService {
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
public static final String ROOT_POOL_NAME = "root";
private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
// Path to XML file containing allocations.
// Path to XML file containing allocations.
private File allocFile;
private Listener reloadListener;
@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
private Thread reloadThread;
private volatile boolean running = true;
public AllocationFileLoaderService() {
this(new SystemClock());
}
public AllocationFileLoaderService(Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
@@ -288,6 +290,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>();
Map<String, Map<String, Integer>> groupQueryLimits = new HashMap<>();
Map<String, Boolean> onlyCoordinators = new HashMap<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
@@ -404,8 +407,8 @@ public class AllocationFileLoaderService extends AbstractService {
// Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it.
for (Element element : queueElements) {
String parent = "root";
if (element.getAttribute("name").equalsIgnoreCase("root")) {
String parent = ROOT_POOL_NAME;
if (element.getAttribute("name").equalsIgnoreCase(ROOT_POOL_NAME)) {
if (queueElements.size() > 1) {
throw new AllocationConfigurationException("If configuring root queue,"
+ " no other queues can be placed alongside it.");
@@ -416,7 +419,8 @@ public class AllocationFileLoaderService extends AbstractService {
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues);
userQueryLimits, groupQueryLimits, configuredQueues, onlyCoordinators,
nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
@@ -430,18 +434,18 @@ public class AllocationFileLoaderService extends AbstractService {
}
// Set the min/fair share preemption timeout for the root queue
if (!minSharePreemptionTimeouts.containsKey("root")){
minSharePreemptionTimeouts.put("root",
if (!minSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)){
minSharePreemptionTimeouts.put(ROOT_POOL_NAME,
defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey("root")) {
fairSharePreemptionTimeouts.put("root",
if (!fairSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)) {
fairSharePreemptionTimeouts.put(ROOT_POOL_NAME,
defaultFairSharePreemptionTimeout);
}
// Set the fair share preemption threshold for the root queue
if (!fairSharePreemptionThresholds.containsKey("root")) {
fairSharePreemptionThresholds.put("root",
if (!fairSharePreemptionThresholds.containsKey(ROOT_POOL_NAME)) {
fairSharePreemptionThresholds.put(ROOT_POOL_NAME,
defaultFairSharePreemptionThreshold);
}
@@ -451,7 +455,7 @@ public class AllocationFileLoaderService extends AbstractService {
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, userQueryLimits, groupQueryLimits,
newPlacementPolicy, configuredQueues, nonPreemptableQueues);
onlyCoordinators, newPlacementPolicy, configuredQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@@ -481,6 +485,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Map<String, Integer>> userQueryLimits,
Map<String, Map<String, Integer>> groupQueryLimits,
Map<FSQueueType, Set<String>> configuredQueues,
Map<String, Boolean> onlyCoordinators,
Set<String> nonPreemptableQueues)
throws AllocationConfigurationException {
String queueName = CharMatcher.whitespace().trimFrom(element.getAttribute("name"));
@@ -576,9 +581,13 @@ public class AllocationFileLoaderService extends AbstractService {
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues);
userQueryLimits, groupQueryLimits, configuredQueues, onlyCoordinators,
nonPreemptableQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
} else if ("onlyCoordinators".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
onlyCoordinators.put(queueName, Boolean.parseBoolean(text));
}
}
if (isLeaf) {
@@ -611,14 +620,13 @@ public class AllocationFileLoaderService extends AbstractService {
allocationConfiguration.getConfiguredQueues();
Set<String> parentQueues = configuredQueues.get(FSQueueType.PARENT);
Set<String> leafQueues = configuredQueues.get(FSQueueType.LEAF);
String root = "root";
if (parentQueues.size() == 1 && parentQueues.contains(root)) {
if (parentQueues.size() == 1 && parentQueues.contains(ROOT_POOL_NAME)) {
Map<String, Integer> rootUserQueryLimits =
allocationConfiguration.getUserQueryLimits(root);
allocationConfiguration.getUserQueryLimits(ROOT_POOL_NAME);
Map<String, Integer> rootGroupQueryLimits =
allocationConfiguration.getGroupQueryLimits(root);
allocationConfiguration.getGroupQueryLimits(ROOT_POOL_NAME);
for (String leafQueue : leafQueues) {
if (leafQueue.startsWith(root)) {
if (leafQueue.startsWith(ROOT_POOL_NAME)) {
Map<String, Integer> groupQueryLimits =
allocationConfiguration.getGroupQueryLimits(leafQueue);
Map<String, Integer> userQueryLimits =

View File

@@ -84,6 +84,7 @@ DISABLE_LOG_BUFFERING = 'disable_log_buffering'
# If True, resolves the actual files for all the log symlinks and outputs the resolved
# paths to stderr.
LOG_SYMLINKS = 'log_symlinks'
WORKLOAD_MGMT = 'workload_mgmt'
# Args that accept additional formatting to supply temporary dir path.
ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR])
@@ -154,7 +155,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None,
impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
impalad_graceful_shutdown=False, tmp_dir_placeholders=[],
expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False):
expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False,
workload_mgmt=False):
"""Records arguments to be passed to a cluster by adding them to the decorated
method's func_dict"""
args = dict()
@@ -198,6 +200,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
args[DISABLE_LOG_BUFFERING] = True
if log_symlinks:
args[LOG_SYMLINKS] = True
if workload_mgmt:
args[WORKLOAD_MGMT] = True
def merge_args(args_first, args_last):
result = args_first.copy()
@@ -328,6 +332,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if IMPALAD_TIMEOUT_S in args:
kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S]
if args.get(WORKLOAD_MGMT, False):
if IMPALAD_ARGS or CATALOGD_ARGS in args:
kwargs[WORKLOAD_MGMT] = True
if args.get(EXPECT_CORES, False):
# Make a note of any core files that already exist
possible_cores = find_all_files('*core*')
@@ -399,25 +407,33 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if not self.SHARED_CLUSTER_ARGS:
self.cluster_teardown(method.__name__, method.__dict__)
def wait_for_wm_init_complete(self, timeout_s=120):
def wait_for_wm_init_complete(self, timeout_s=60):
"""
Waits for the catalog to report the workload management initialization process has
completed and for the catalog updates to be received by the coordinators.
completed and for the catalog updates to be received by the coordinators. The input
timeout_s is used as the timeout for three separate function calls. Thus, the
theoretical max amount of time this function could wait is (timeout_s * 3).
"""
self.assert_catalogd_log_contains("INFO", r'Completed workload management '
r'initialization', timeout_s=timeout_s)
catalog_log = self.assert_log_contains_multiline("catalogd", "INFO", r'Completed '
r'workload management initialization.*?A catalog update with \d+ entries is '
r'assembled\. Catalog version: (\d+)', timeout_s)
catalog_log = self.assert_catalogd_log_contains("INFO", r'A catalog update with \d+ '
r'entries is assembled. Catalog version: (\d+)', timeout_s=10, expected_count=-1)
# Assert each coordinator has received a catalog update that was assembled after
# workload management completed.
for idx, _ in enumerate(self.cluster.get_all_coordinators()):
node_name = "impalad"
if idx > 0:
node_name += "_node" + str(idx)
def assert_func():
coord_log = self.assert_impalad_log_contains("INFO", r'Catalog topic update '
r'applied with version: (\d+)', timeout_s=5, expected_count=-1)
return int(coord_log.group(1)) >= int(catalog_log.group(1))
def assert_func():
coord_log = self.assert_log_contains(node_name, "INFO", r'Catalog topic update '
r'applied with version: (\d+)', timeout_s=timeout_s, expected_count=-1)
return int(coord_log.group(1)) >= int(catalog_log.group(1))
assert retry(func=assert_func, max_attempts=10, sleep_time_s=3, backoff=1), \
"Expected a catalog topic update with version '{}' or later, but no such " \
"update was found.".format(catalog_log.group(1))
max_attempts = timeout_s / 3
assert retry(func=assert_func, max_attempts=max_attempts, sleep_time_s=3,
backoff=1), "Expected a catalog topic update with version '{}' or later, but " \
"no such update was found.".format(catalog_log.group(1))
@classmethod
def _stop_impala_cluster(cls):
@@ -514,7 +530,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
impalad_timeout_s=60,
ignore_pid_on_log_rotation=False,
wait_for_backends=True,
log_symlinks=False):
log_symlinks=False,
workload_mgmt=False):
cls.impala_log_dir = impala_log_dir
# We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that
# certain custom startup arguments work and we want to keep them independent of dev
@@ -544,6 +561,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
cmd.append("--impalad_args=--use_local_catalog=1")
cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
if workload_mgmt:
cmd.append("--impalad_args=--enable_workload_mgmt=true")
cmd.append("--catalogd_args=--enable_workload_mgmt=true")
default_query_option_kvs = []
# Put any defaults first, then any arguments after that so they can override defaults.
if default_query_options is not None:

View File

@@ -180,6 +180,14 @@ class ImpalaCluster(object):
LOG.info("Cluster: " + str(self.impalads))
return choice([impalad for impalad in self.impalads if impalad != other_impalad])
def get_all_coordinators(self):
"""Returns a list of all impalads where is_coordinator returns True. If no
coordinators are found, returns an empty list. The returned list is sorted by krpc
port ascending."""
return sorted([imp for imp in self.impalads if imp.is_coordinator()],
key=lambda x: x.service.krpc_port)
def num_responsive_coordinators(self):
"""Find the number of impalad coordinators that can evaluate a test query."""
n = 0

View File

@@ -1529,6 +1529,37 @@ class ImpalaTestSuite(BaseTestSuite):
return self.assert_log_contains(
daemon, level, line_regex, expected_count, timeout_s, dry_run)
def assert_log_contains_multiline(self, daemon, level, line_regex, timeout_s=6):
"""
Asserts the the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains
at least one match of the provided regular expression. The difference with this
function is the regular expression is compiled with the DOTALL flag which causes the
dot operator to also match newlines. Thus, the provided line_regex can match over
multiple lines.
Returns the result of the regular expression search() function or fails an assertion
if the regular expression is not matched in the given timeframe.
"""
if (self._warn_assert_log):
LOG.warning(
"{} calls assert_log_contains() with timeout_s={}. Make sure that glog "
"buffering has been disabled (--logbuflevel=-1), or "
"CustomClusterTestSuite.with_args is set with disable_log_buffering=True, "
"or timeout_s is sufficient.".format(self.__class__.__name__, timeout_s))
pattern = re.compile(line_regex, re.DOTALL)
for i in range(0, timeout_s):
log_file_path = self.__build_log_path(daemon, level)
with open(log_file_path) as log:
ret = pattern.search(log.read())
if ret is not None:
return ret
time.sleep(1)
assert False, "did not find any logfile " \
"contents matching the regex '{}'".format(line_regex)
def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6,
dry_run=False):
"""

View File

@@ -143,9 +143,17 @@ INITIAL_QUEUE_REASON_REGEX = \
# The path to resources directory which contains the admission control config files.
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources")
# SQL statement that selects all records for the active queries table.
ACTIVE_SQL = "select * from sys.impala_query_live"
def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
additional_args="", make_copy=False):
"""Generates impalad startup flags configuring the fair scheduler and llama site path
options and setting logging for admission control to VLOG_ROW.
The specified fair scheduler and llama site files are copied first, and the copies
are used as the value for the relevant startup flags."""
fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
if make_copy:
@@ -1774,6 +1782,232 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
for executor_mem_admitted in mem_admitted['executor']:
assert executor_mem_admitted == 0
def __assert_systables_query(self, profile, expected_coords=None,
expected_frag_counts=None):
"""Asserts the per-host fragment instances are correct in the provided profile."""
if expected_coords is None:
expected_coords = self.cluster.get_all_coordinators()
populate_frag_count = False
if expected_frag_counts is None:
populate_frag_count = True
expected_frag_counts = []
expected = []
for i, val in enumerate(expected_coords):
if populate_frag_count:
if i == 0:
expected_frag_counts.append(2)
else:
expected_frag_counts.append(1)
expected.append("{0}:{1}({2})".format(val.service.hostname, val.service.krpc_port,
expected_frag_counts[i]))
# Assert the correct request pool was used.
req_pool = re.search(r'\n\s+Request Pool:\s+(.*?)\n', profile)
assert req_pool, "Did not find request pool in query profile"
assert req_pool.group(1) == "root.onlycoords"
# Assert the fragment instances only ran on the coordinators.
perhost_frags = re.search(r'\n\s+Per Host Number of Fragment Instances:\s+(.*?)\n',
profile)
assert perhost_frags
sorted_hosts = " ".join(sorted(perhost_frags.group(1).split(" ")))
assert sorted_hosts
assert sorted_hosts == " ".join(expected)
# Assert the frontend selected the first executor group.
expected_verdict = "Assign to first group because only coordinators request pool " \
"specified"
fe_verdict = re.search(r'\n\s+Executor group 1:\n\s+Verdict: (.*?)\n', profile)
assert fe_verdict, "No frontend executor group verdict found."
assert fe_verdict.group(1) == expected_verdict, "Incorrect verdict found"
def __run_assert_systables_query(self, vector, expected_coords=None,
expected_frag_counts=None, query=ACTIVE_SQL):
"""Runs a query using an only coordinators request pool and asserts the per-host
fragment instances are correct. This function can only be called from tests that
configured the cluster to use 'fair-scheduler-onlycoords.xml' and
'llama-site-onlycoords.xml'."""
vector.set_exec_option('request_pool', 'onlycoords')
result = self.execute_query_using_vector(query, vector)
assert result.success
self.__assert_systables_query(result.runtime_profile, expected_coords,
expected_frag_counts)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_happy_path(self, vector):
"""Asserts queries set to use an only coordinators request pool run all the fragment
instances on all coordinators and no executors even if the query includes
non-system tables."""
self.wait_for_wm_init_complete()
# Execute a query that only selects from a system table using a request pool that is
# only coordinators.
self.__run_assert_systables_query(vector)
# Execute a query that joins a non-system table with a system table using a request
# pool that is only coordinators. All fragment instances will run on the coordinators
# without running any on the executors.
self.__run_assert_systables_query(
vector=vector,
expected_frag_counts=[4, 2, 2,],
query="select a.test_name, b.db_user from functional.jointbl a inner join "
"sys.impala_query_live b on a.test_name = b.db_name"),
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=3,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_no_executors(self, vector):
"""Asserts queries that only select from the active queries table run even if no
executors are running."""
self.wait_for_wm_init_complete()
self.__run_assert_systables_query(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_one_quiescing_coord(self, vector):
"""Asserts quiescing coordinators do not run fragment instances for queries that only
select from the active queries table."""
self.wait_for_wm_init_complete()
# Quiesce the second coordinator.
all_coords = self.cluster.get_all_coordinators()
coord_to_quiesce = all_coords[1]
self.execute_query_expect_success(self.client, ": shutdown('{}:{}')".format(
coord_to_quiesce.service.hostname, coord_to_quiesce.service.krpc_port))
# Ensure only two coordinators process a system tables query.
self.__run_assert_systables_query(
vector=vector,
expected_coords=[all_coords[0], all_coords[2]])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_one_coord_terminate(self, vector):
"""Asserts a force terminated coordinator is eventually removed from the list of
active coordinators."""
self.wait_for_wm_init_complete()
# Abruptly end the third coordinator.
all_coords = self.cluster.get_all_coordinators()
coord_to_term = all_coords[2]
coord_to_term.kill()
vector.set_exec_option('request_pool', 'onlycoords')
client = self.default_impala_client(vector.get_value('protocol'))
done_waiting = False
iterations = 0
while not done_waiting and iterations < 20:
try:
result = self.execute_query_using_client(client, ACTIVE_SQL, vector)
assert result.success
done_waiting = True
except Exception as e:
# Since the coordinator was not gracefully shut down, it never had a change to
# send a quiescing message. Thus, the statestore will take some time to detect
# that coordinator is gone. During that time, queries again system tables will
# fail as the now terminated coordinator will still be sent rpcs.
if re.search(r"Exec\(\) rpc failed: Network error: "
r"Client connection negotiation failed: client connection to .*?:{}: "
r"connect: Connection refused".format(coord_to_term.service.krpc_port),
str(e)):
# Expected error, coordinator down not yet detected.
iterations += 1
sleep(3)
else:
raise e
assert done_waiting
self.__assert_systables_query(result.runtime_profile, [all_coords[0], all_coords[1]])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_add_coord(self, vector):
self.wait_for_wm_init_complete()
# Add a coordinator to the cluster.
cluster_size = len(self.cluster.impalads)
self._start_impala_cluster(
options=[
"--impalad_args=s{}".format(impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml"))],
add_impalads=True,
cluster_size=6,
num_coordinators=1,
use_exclusive_coordinators=True,
wait_for_backends=False,
workload_mgmt=True)
self.assert_log_contains("impalad_node" + str(cluster_size), "INFO",
"join Impala Service pool")
# Assert the new coordinator ran a fragment instance.
self.__run_assert_systables_query(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=1,
workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-onlycoords.xml",
llama_site_file="llama-site-onlycoords.xml",
additional_args="--expected_executor_group_sets=root.group-set-small:1,"
"root.group-set-large:2 "
"--num_expected_executors=2 --executor_groups=coordinator"),
statestored_args=_STATESTORED_ARGS)
def test_coord_only_pool_exec_groups(self, vector):
"""Asserts queries using only coordinators request pools can run successfully when
executor groups are configured."""
self.wait_for_wm_init_complete()
# Assert queries can be run when no executors are started.
self.__run_assert_systables_query(vector)
# Add a single executor for the small executor group set.
self._start_impala_cluster(
options=[
"--impalad_args=--executor_groups=root.group-set-small-group-000:1"],
add_executors=True,
cluster_size=1,
wait_for_backends=False)
self.cluster.statestored.service.wait_for_live_subscribers(3, timeout=30)
self.__run_assert_systables_query(vector)
# Add two executors for the large executor group set.
self._start_impala_cluster(
options=[
"--impalad_args=--executor_groups=root.group-set-small-group-000:2"],
add_executors=True,
cluster_size=2,
wait_for_backends=False)
self.cluster.statestored.service.wait_for_live_subscribers(5, timeout=30)
self.__run_assert_systables_query(vector)
class TestAdmissionControllerWithACService(TestAdmissionController):
"""Runs all of the tests from TestAdmissionController but with the second impalad in the