IMPALA-12345: Add user quotas to Admission Control

Allow administrators to configure per user limits on queries that can
run in the Impala system.

In order to do this, there are two parts. Firstly we must track the
total counts of queries in the system on a per-user basis. Secondly
there must be a user model that allows rules that control per-user
limits on the number of queries that can be run.

In a Kerberos environment the user names that are used for both the user
model and at runtime are short user names, e.g. testuser when the
Kerberos principal is testuser/scm@EXAMPLE.COM

TPoolStats (the data that is shared between Admission Control instances)
is extended to include a map from user name to a count of queries
running. This (along with some derived data structures) is updated when
queries are queued and when they are released from Admission Control.
This lifecycle is slightly different from other TPoolStats data which
usually tracks data about queries that are running. Queries can be
rejected because of user quotas at submission time. This is done for
two reasons: (1) queries can only be admitted from the front of the
queue and we do not want to block other queries due to quotas, and
(2) it is easy for users to understand what is going on when queries
are rejected at submission time.

Note that when running in configurations without an Admission Daemon
then Admission Control does not have perfect information about the
system and over-admission is possible for User-Level Admission Quotas
in the same way that it is for other Admission Control controls.

The User Model is implemented by extending the format of the
fair-scheduler.xml file. The rules controlling the per-user limits are
specified in terms of user or group names.

Two new elements ‘userQueryLimit’ and ‘groupQueryLimit’ can be added to
the fair-scheduler.xml file. These elements can be placed on the root
configuration, which applies to all pools, or the pool configuration.
The ‘userQueryLimit’ element has 2 child elements: "user"
and "totalCount". The 'user' element contains the short names of users,
and can be repeated, or have the value "*" for a wildcard name which
matches all users. The ‘groupQueryLimit’ element has 2 child
elements: "group" and "totalCount". The 'group' element contains group
names.

The root level rules and pool level rules must both be passed for a new
query to be queued. The rules dictate a maximum number of queries that
can run by a user. When evaluating rules at either the root level, or
at the pool level, when a rule matches a user then there is no more
evaluation done.

To support reading the ‘userQueryLimit’ and ‘groupQueryLimit’ fields the
RequestPoolService is enhanced.

If user quotas are enabled for a pool then a list of the users with
running or queued queries in that pool is visible on the coordinator
webui admission control page.

More comprehensive documentation of the user model will be provided in
IMPALA-12943

TESTING

New end-to-end tests are added to test_admission_controller.py, and
admission-controller-test is extended to provide unit tests for the
user model.

Change-Id: I4c33f3f2427db57fb9b6c593a4b22d5029549b41
Reviewed-on: http://gerrit.cloudera.org:8080/21616
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Andrew Sherman
2024-03-26 18:29:57 -07:00
committed by Impala Public Jenkins
parent 68c42a5d66
commit de6b902581
33 changed files with 2401 additions and 496 deletions

View File

@@ -310,7 +310,7 @@ DEFINE_string(blacklisted_dbs, "sys,information_schema",
" create, or drop databases which are blacklisted.");
DEFINE_string(blacklisted_tables, "",
"Comma separated full names (in format: <db>.<table>) of blacklisted tables. "
"Configure which tables to be skipped for loading (in startup and reseting metadata "
"Configure which tables to be skipped for loading (in startup and resetting metadata "
"of the table). Users can't access, create, or drop tables which are blacklisted");
DEFINE_double_hidden(invalidate_tables_gc_old_gen_full_threshold, 0.6, "The threshold "
@@ -352,9 +352,9 @@ DEFINE_int32(num_check_authorization_threads, 1,
"The number of threads used to check authorization for the user when executing show "
"tables/databases. This configuration is applicable only when authorization is "
"enabled. A value of 1 disables multi-threaded execution for checking authorization."
"However, a small value of larger than 1 may limit the parallism of FE requests when "
"checking authorization with a high concurrency. The value must be in the range of "
"1 to 128.");
"However, a small value of larger than 1 may limit the parallelism of FE requests "
"when checking authorization with a high concurrency. The value must be in the range "
"of 1 to 128.");
DEFINE_bool_hidden(use_customized_user_groups_mapper_for_ranger, false,
"If true, use the customized user-to-groups mapper when performing authorization via"

View File

@@ -16,8 +16,11 @@
// under the License.
#include "testutil/gtest-util.h"
#include "testutil/scoped-flag-setter.h"
#include "common/init.h"
#include "common/logging.h"
#include "kudu/rpc/sasl_common.h"
#include "kudu/security/init.h"
#include "kudu/security/test/mini_kdc.h"
#include "rpc/authentication.h"
#include "rpc/authentication-util.h"
@@ -26,7 +29,6 @@
#include "util/kudu-status-util.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/thread.h"
#include <ldap.h>
@@ -316,6 +318,76 @@ TEST(Auth, GetXFFOriginClientAddress) {
ASSERT_TRUE(status.ok());
}
// Checks that GetEffectiveUser() returns the expected values.
void assertEffectiveUser(
const string& connected_user, const string& delegated_user, const string& expected) {
TSessionState session;
if (!connected_user.empty()) session.__set_connected_user(connected_user);
if (!delegated_user.empty()) session.__set_delegated_user(delegated_user);
ASSERT_EQ(GetEffectiveUser(session), expected);
}
// Checks that GetEffectiveShortUser() returns the expected values.
void assertEffectiveShortUser(
const string& connected_user, const string& delegated_user, const string& expected) {
TSessionState session;
if (!connected_user.empty()) session.__set_connected_user(connected_user);
if (!delegated_user.empty()) session.__set_delegated_user(delegated_user);
string returned_user;
ASSERT_OK(GetEffectiveShortUser(session, &returned_user));
ASSERT_EQ(returned_user, expected)
<< "connected=" << connected_user << " delegated=" << delegated_user;
}
// Unit test for GetEffectiveShortUser().
TEST(Auth, UserUtilities) {
// Enable Kerberos so we can test kerberos names in GetEffectiveShortUser().
auto enable_kerberos =
ScopedFlagSetter<string>::Make(&FLAGS_principal, "service_name/_HOST@some.realm");
// Usernames that are not mutated by GetEffectiveShortUser().
const char* unchanged_usernames[] = {
"andrew", "andrew_sherman", "andrew-sherman", "Andrew"};
for (auto& name : unchanged_usernames) {
assertEffectiveUser(name, "", name);
assertEffectiveShortUser(name, "", name);
}
// Test GetEffectiveShortUser() maps Kerberos usernames to the expected short name.
std::pair<const char*, const char*> kerberos_name_mappings[] = {
{"impala@ROOT.COMOPS.SITE", "impala"},
{"changepw/kdc1.example.com@example.com", "changepw"},
{"krbtgt/EAST.EXAMPLE.COM@WEST.EXAMPLE.COM", "krbtgt"},
{"User1/admin/STAFF/employees@WEST.EXAMPLE.COM", "User1"},
{"employees@", "employees"}
};
for (const auto& pair : kerberos_name_mappings) {
assertEffectiveShortUser(pair.first, "", pair.second);
}
// Test GetEffectiveUser() logic.
assertEffectiveUser("connected1", "delegated1", "delegated1");
assertEffectiveUser("connected1", "", "connected1");
assertEffectiveUser("impala@ROOT.COMOPS.SITE", "", "impala@ROOT.COMOPS.SITE");
// Illegal kerberos user names.
// These all fail kudu::security::MapPrincipalToLocalName().
const char* illegal_kerberos_usernames[] = {
"buggy@EAST.EXAMPLE.COM/WEST.EXAMPLE.COM",
"two_ats@kdc1.example.com@EXAMPLE.COM",
"two_slash/kdc1.example.com@/EXAMPLE.COM",
"/User1/admin/STAFF/employees@WEST.EXAMPLE.COM",
};
for (auto& bad_name : illegal_kerberos_usernames) {
TSessionState session;
session.__set_connected_user(bad_name);
string unused;
stringstream ss;
ss << "Could not parse Kerberos name " << bad_name;
ASSERT_ERROR_MSG(GetEffectiveShortUser(session, &unused), ss.str());
}
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -97,7 +97,7 @@ enum class AdmissionOutcome {
/// A pool may be configured to allow a maximum amount of memory resources to be
/// 'reserved' by requests admitted to that pool. While Impala does not yet truly
/// 'reserve' the memory at admission (i.e. Impala does not yet guarantee the memory for
/// a request, it is still possible to overadmit such that multiple queries think they
/// a request, it is still possible to over-admit such that multiple queries think they
/// have reserved the same memory), the admission controller uses several metrics to
/// estimate the available memory and admit only when it thinks the necessary memory is
/// available. Future work will enable real reservations, but this is a much larger
@@ -151,10 +151,7 @@ enum class AdmissionOutcome {
/// there is no latency, but this does not account for memory from requests admitted
/// by other impalads).
/// c) Num Admitted: the number of queries that have been admitted and are therefore
/// considered to be currently running. Note that there is currently no equivalent to
/// the reserved memory reporting, i.e. hosts do not report the actual number of
/// queries that are currently executing (IMPALA-8762). This prevents using multiple
/// coordinators with executor groups.
/// considered to be currently running.
///
/// As described, both the 'reserved' and 'admitted' mem accounting mechanisms have
/// different advantages and disadvantages. The 'reserved' mem accounting works well in
@@ -165,6 +162,27 @@ enum class AdmissionOutcome {
/// are used or, if there is a wide distribution of requests across impalads, the rate of
/// submission is low enough that new state is able to be updated by the statestore.
///
/// User Quotas:
/// In addition to the checks described before, User Quotas can be configured to limit
/// the number of concurrent queries that can be run by a user.
///
/// The User Model for User Quotas is implemented by rules in the fair-scheduler.xml
/// configuration file. The rules can be set at the root level, or at the pool level.
/// The root level rules and pool level rules must both be passed for a new query to be
/// queued. The rules dictate a maximum number of queries that can run.
///
/// At the root level and at the pool level there are 3 types of rules. These rules have
/// a precedence. The rules are evaluated in this order:
/// 1) Rules that specify a user name
/// 2) Rules that specify a group name
/// 3) Wildcard rules that match any user
/// When evaluating rules at either the root level, or at the pool level, when a rule
/// matches a user then there is no more evaluation done. So, for example, if there is a
/// rule at the pool level about the specific user sunil, and a rule about the user
/// group workers (which group includes sunil), and if a query is submitted by
/// sunil, then once the user-specific rule has been evaluated, the rule about the user
/// group workers is skipped.
///
/// Releasing Queries:
/// When queries complete they must be explicitly released from the admission controller
/// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release
@@ -184,13 +202,13 @@ enum class AdmissionOutcome {
/// resources released, regardless of any failures.
/// There are a few failure cases to consider:
/// - ReleaseQuery rpc fails: coordinators periodically send a list of registered query
/// ids via a heartbeat rpc, allowing the admission contoller to clean up any queries
/// ids via a heartbeat rpc, allowing the admission controller to clean up any queries
/// that are not in that list.
/// - Coordinator fails: the admission control service uses the statestore to detect when
/// a coordinator has been removed from the cluster membership and releases all queries
/// that were running at that coordinator.
/// - RelaseQueryBackends rpc fails: when ReleaseQuery is eventually called (as guaranteed
/// by the above), it will automatically release any remaining backends.
/// - ReleaseQueryBackends rpc fails: when ReleaseQuery is eventually called (as
/// guaranteed by the above), it will automatically release any remaining backends.
///
/// Releasing Backends releases the admitted memory used by that Backend and decrements
/// the number of running queries on the host running that Backend. Releasing a query does
@@ -298,7 +316,7 @@ enum class AdmissionOutcome {
/// and admit requests. Instead, we use a simple heuristic to try to dequeue a number of
/// requests proportional to the number of requests that are waiting in each individual
/// admission controller to the total number of requests queued across all admission
/// controllers (i.e. impalads). This limits the amount of overadmission that may result
/// controllers (i.e. impalads). This limits the amount of over-admission that may result
/// from a large amount of resources becoming available at the same time. When there are
/// requests queued in multiple pools on the same host, the admission controller simply
/// iterates over the pools in pool_stats_ and attempts to dequeue from each. This is fine
@@ -410,7 +428,7 @@ class AdmissionController {
std::vector<UniqueIdPB> CleanupQueriesForHost(
const UniqueIdPB& coord_id, const std::unordered_set<UniqueIdPB>& query_ids);
/// Relases the resources for any queries currently running on coordinators that do not
/// Releases the resources for any queries currently running on coordinators that do not
/// appear in 'current_backends'. Called in response to statestore updates. Returns a
/// map from the backend id of any coordinator detected to have failed to a list of
/// queries that were released for that coordinator.
@@ -446,7 +464,7 @@ class AdmissionController {
// currently registered backends.
typedef std::unordered_map<std::string, THostStats> PerHostStats;
// Populates the input map with the per host memory reserved and admitted in the
// Populates the input map with the per-host memory reserved and admitted in the
// following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
// Only used for populating the 'backends' debug page.
void PopulatePerHostMemReservedAndAdmitted(PerHostStats* host_stats);
@@ -458,6 +476,14 @@ class AdmissionController {
std::string GetStalenessDetail(const std::string& prefix,
int64_t* ms_since_last_update = nullptr);
/// Holder class for parameters used with user quotas.
/// This is only valid if 'track_per_user' is true.
struct PerUserTracking {
const std::string& user;
bool was_queued;
bool track_per_user;
};
private:
class PoolStats;
friend class PoolStats;
@@ -511,6 +537,66 @@ class AdmissionController {
/// executor groups).
IntCounter* total_dequeue_failed_coordinator_limited_ = nullptr;
/// A typedef for a holder of per-user loads.
/// This matches the thrift-generated type of user_loads in TPoolStats.
/// There are a few helper functions for this type.
/// Key is user name, value is a count of queries.
typedef std::map<std::string, int64_t> UserLoads;
/// Helper function on UserLoads that increments the value associated with the given
/// key by 1, and returns the new value.
static int64_t IncrementCount(UserLoads& loads, const std::string& key);
/// Helper function on UserLoads that returns a dump of the contents of the object.
static std::string DebugString(const UserLoads& loads);
/// A Holder for aggregated per-user loads.
/// This is a wrapper around a UserLoads object.
class AggregatedUserLoads {
public:
AggregatedUserLoads() = default;
/// Insert a new key-value pair into the map.
void insert(const std::string& key, int64_t value);
/// Return the integer value corresponding to the given key, or 0 if the key does not
/// exist.
int64_t get(const std::string& key) const;
/// Increment the value associated with the given key by 1.
int64_t increment(const std::string& key);
/// Decrement the value associated with the given key by 1, and return the new value.
int64_t decrement(const std::string& key);
/// Return the number of keys. For testing only.
int64_t size() const;
/// Clear all values.
void clear();
/// Clear the value for a key.
void clear_key(const std::string& key);
/// Merge in loads from a UserLoads object.
void add_loads(const UserLoads& loads);
/// Export user names to a metrics set.
void export_users(SetMetric<std::string>* metrics) const;
/// Return a dump of the contents of the object
[[nodiscard]] std::string DebugString() const;
/// Return the underlying UserLoads object
[[nodiscard]] const UserLoads& get_user_loads() const { return loads_; }
private:
UserLoads loads_;
FRIEND_TEST(AdmissionControllerTest, AggregatedUserLoads);
friend class AdmissionControllerTest;
};
/// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
class PoolStats {
public:
@@ -530,6 +616,7 @@ class AdmissionController {
IntGauge* agg_num_running;
IntGauge* agg_num_queued;
IntGauge* agg_mem_reserved;
SetMetric<string>* agg_current_users;
IntGauge* local_mem_admitted;
/// The following mirror the current values of local_stats_.
@@ -538,6 +625,7 @@ class AdmissionController {
IntGauge* local_num_queued;
IntGauge* local_backend_mem_reserved;
IntGauge* local_backend_mem_usage;
SetMetric<string>* local_current_users;
/// Metrics exposing the pool settings.
IntGauge* pool_max_mem_resources;
@@ -573,13 +661,20 @@ class AdmissionController {
// ADMISSION LIFECYCLE METHODS
/// Updates the pool stats when the request represented by 'state' is admitted.
void AdmitQueryAndMemory(const ScheduleState& state, bool is_trivial);
void AdmitQueryAndMemory(
const ScheduleState& state, bool is_trivial, PerUserTracking& per_user_tracking);
/// Updates the pool stats except the memory admitted stat.
void ReleaseQuery(int64_t peak_mem_consumption, bool is_trivial);
/// The 'user' parameter is empty unless user quotas are configured.
void ReleaseQuery(
int64_t peak_mem_consumption, bool is_trivial, const std::string& user);
/// Releases the specified memory from the pool stats.
void ReleaseMem(int64_t mem_to_release);
/// Updates the pool stats when the request represented by 'state' is queued.
void Queue();
/// Increment per-user stats for a user.
void IncrementPerUser(const std::string& user);
/// Decrement per-user stats for a user.
void DecrementPerUser(const std::string& user);
/// Updates the pool stats when the request represented by 'state is dequeued.
void Dequeue(bool timed_out);
@@ -602,18 +697,19 @@ class AdmissionController {
typedef boost::unordered_map<std::string, int64_t> HostMemMap;
/// Called after updating local_stats_ and remote_stats_ to update the aggregate
/// values of agg_num_running_, agg_num_queued_, and agg_mem_reserved_. The in/out
/// values of agg_num_running_, agg_num_queued_, agg_mem_reserved_
/// and agg_user_loads_. The in/out
/// parameter host_mem_reserved is a map from host id to memory reserved used to
/// aggregate the mem reserved values across all pools for each host. Used by
/// UpdateClusterAggregates() to update host_mem_reserved_; it provides the host
/// aggregates when called over all pools.
/// aggregates when called over all pools. Must hold admission_ctrl_lock_.
void UpdateAggregates(HostMemMap* host_mem_reserved);
const TPoolStats& local_stats() const { return local_stats_; }
// A map from the id of a host to the TPoolStats about that host.
typedef boost::unordered_map<std::string, TPoolStats> RemoteStatsMap;
const RemoteStatsMap& remote_stats() const { return remote_stats_; }
const RemoteStatsMap& remote_stats() const { return remote_stats_; }
/// Return the TPoolStats for a remote host in remote_stats_ if it can be found.
/// Return nullptr otherwise.
@@ -641,6 +737,14 @@ class AdmissionController {
/// average of wait time.
void ResetInformationalStats();
/// Return the count of queries for the given user in the PoolStats object.
int64_t GetUserLoad(const string& user);
/// Return the embedded AggregatedUserLoads object that tracks per-user loads.
[[nodiscard]] AggregatedUserLoads& get_aggregated_user_loads() {
return agg_user_loads_;
}
const std::string& name() const { return name_; }
/// The max number of running trivial queries that can be allowed at the same time.
@@ -664,6 +768,11 @@ class AdmissionController {
/// other hosts. Updated only by UpdateAggregates().
int64_t agg_mem_reserved_;
// Aggregate (across all coordinators) per-user loads in this pool.
// Updated by UpdateAggregates(), and kept up to date by IncrementPerUser and
// DecrementPerUser().
AdmissionController::AggregatedUserLoads agg_user_loads_;
/// Number of running trivial queries in this pool that have been admitted by this
/// local coordinator. The purpose of it is to control the concurrency of running
/// trivial queries in case they may consume too many resources because trivial
@@ -712,16 +821,20 @@ class AdmissionController {
// Append a string about the memory consumption part of a TPoolStats object to 'ss'.
static void AppendStatsForConsumedMemory(
std::stringstream& ss, const TPoolStats& stats);
std::stringstream& ss, const TPoolStats& stats);
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, AggregatedUserLoads);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, DequeueLoop);
FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, QueryRejection);
FRIEND_TEST(AdmissionControllerTest, QuotaExamples);
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
FRIEND_TEST(AdmissionControllerTest, EraseHostStats);
FRIEND_TEST(AdmissionControllerTest, UserAndGroupQuotas);
friend class AdmissionControllerTest;
};
@@ -730,14 +843,14 @@ class AdmissionController {
// pools in a host. The string is composed of up to 5 sections, where
// each section is about one pool describing the following: the top queries
// and stats about all queries in the pool. The sum of all top queries
// in these secions is at most 5.
// in these sections is at most 5.
std::string GetLogStringForTopNQueriesOnHost(const std::string& host_id);
// Return a string reporting top 5 queries with most memory consumed among all
// hosts in the pool. The string is composed of up to 5 sections, where
// each is about one host describing the following: the top queries and
// and stats about them in the host. The sum of all top queries
// in these secions is at most 5.
// stats about them in the host. The sum of all top queries
// in these sections is at most 5.
std::string GetLogStringForTopNQueriesInPool(const std::string& pool_name);
/// Map of pool names to pool stats. Accessed via GetPoolStats().
@@ -745,6 +858,10 @@ class AdmissionController {
typedef boost::unordered_map<std::string, PoolStats> PoolStatsMap;
PoolStatsMap pool_stats_;
/// User loads aggregated across all pools. Updated by UpdateClusterAggregates().
/// Protected by admission_ctrl_lock_.
AggregatedUserLoads root_agg_user_loads_;
/// This struct groups together a schedule and the executor group that it was scheduled
/// on. It is used to attempt admission without rescheduling the query in case the
/// cluster membership has not changed. Users of the struct must make sure that
@@ -794,6 +911,9 @@ class AdmissionController {
string pool_name;
TPoolConfig pool_cfg;
/// Config of the root pool this query will be scheduled on.
TPoolConfig root_cfg;
/// END: Members that are valid for new objects after initialization
/////////////////////////////////////////
@@ -876,12 +996,15 @@ class AdmissionController {
/// The executor group this query was scheduled on.
std::string executor_group;
/// Map from backend addresses to the resouces this query was allocated on them. When
/// Map from backend addresses to the resources this query was allocated on them. When
/// backends are released, they are removed from this map.
std::unordered_map<NetworkAddressPB, BackendAllocation> per_backend_resources;
/// Indicate whether the query is admitted as a trivial query.
bool is_trivial;
/// The effective user running the query. Set only if user quotas are configured.
std::string user;
};
/// Map from host id to a map from query id of currently running queries to information
@@ -917,21 +1040,21 @@ class AdmissionController {
std::string request_queue_topic_name_;
/// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting
/// name in 'pool_name' and the resulting config in 'pool_config'.
Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* pool_name,
TPoolConfig* pool_config);
/// name in 'pool_name' the resulting config in 'pool_config', and the
/// root config in 'root_config'.
Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, string* pool_name,
TPoolConfig* pool_config, TPoolConfig* root_config);
/// Statestore subscriber callback that sends outgoing topic deltas (see
/// AddPoolUpdates()) and processes incoming topic deltas, updating the PoolStats
/// state.
void UpdatePoolStats(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
/// AddPoolAndPerHostStatsUpdates()) and processes incoming topic deltas, updating the
/// PoolStats state.
void UpdatePoolStats(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
std::vector<TTopicDelta>* subscriber_topic_updates);
/// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
/// since the last call to AddPoolUpdates(). Also adds the complete local view of
/// per-host statistics. Called by UpdatePoolStats() before UpdateClusterAggregates().
/// Must hold admission_ctrl_lock_.
/// since the last call to AddPoolAndPerHostStatsUpdates(). Also adds the complete local
/// view of per-host statistics. Called by UpdatePoolStats() before
/// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
void AddPoolAndPerHostStatsUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
/// Updates the remote stats with per-host topic_updates coming from the statestore.
@@ -972,15 +1095,16 @@ class AdmissionController {
/// method returns false and sets queue_node->not_admitted_reason.
/// The is_trivial is set to true when is_trivial is not null if the query is admitted
/// as a trivial query.
bool FindGroupToAdmitOrReject(
const ClusterMembershipMgr::SnapshotPtr& membership_snapshot,
const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats,
QueueNode* queue_node, bool& coordinator_resource_limited,
bool FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr& membership_snapshot,
const TPoolConfig& pool_config, const TPoolConfig& root_cfg, bool admit_from_queue,
PoolStats* pool_stats, QueueNode* queue_node, bool& coordinator_resource_limited,
bool* is_trivial = nullptr);
/// Dequeues the queued queries when notified by dequeue_cv_ and admits them if they
/// have not been cancelled yet.
void DequeueLoop();
/// Attempt to Dequeue a single query.
void TryDequeue();
/// Returns true if schedule can be admitted to the pool with pool_cfg.
/// admit_from_queue is true if attempting to admit from the queue. Otherwise, returns
@@ -990,8 +1114,16 @@ class AdmissionController {
/// enough memory resources available for the query. Caller owns not_admitted_reason and
/// not_admitted_details. Must hold admission_ctrl_lock_.
bool CanAdmitRequest(const ScheduleState& state, const TPoolConfig& pool_cfg,
bool admit_from_queue, string* not_admitted_reason, string* not_admitted_details,
bool& coordinator_resource_limited);
const TPoolConfig& root_cfg, bool admit_from_queue, string* not_admitted_reason,
string* not_admitted_details, bool& coordinator_resource_limited);
/// Returns true if User Quotas allow the schedule to be admitted to the pool with
/// config 'pool_cfg' and root config 'root_cfg'.
/// If no quotas are configured then True is returned.
/// User and group Quotas are checked on the pool and the root pool.
/// Caller owns not_admitted_reason
bool CanAdmitQuota(const ScheduleState& state, const TPoolConfig& pool_cfg,
const TPoolConfig& root_cfg, string* not_admitted_reason);
/// Returns true if the query can be admitted as a trivial query, therefore it can
/// bypass the admission control immediately.
@@ -1033,7 +1165,8 @@ class AdmissionController {
/// Updates the memory admitted and the num of queries running for each backend in
/// 'state'. Also updates the stats of its associated resource pool. Used only when
/// the 'state' is admitted.
void UpdateStatsOnAdmission(const ScheduleState& state, bool is_trivial);
void UpdateStatsOnAdmission(
const ScheduleState& state, bool is_trivial, PerUserTracking& per_user_tracking);
/// Updates the memory admitted and the num of queries running for each backend in
/// 'state' which have been release/completed. The list of completed backends is
@@ -1102,7 +1235,7 @@ class AdmissionController {
/// Sets the per host mem limit and mem admitted in the schedule and does the necessary
/// accounting and logging on successful submission.
/// Caller must hold 'admission_ctrl_lock_'.
void AdmitQuery(QueueNode* node, bool was_queued, bool is_trivial);
void AdmitQuery(QueueNode* node, string& user, bool was_queued, bool is_trivial);
/// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller.
/// Is a helper method used by both PoolToJson() and AllPoolsToJson()
@@ -1145,6 +1278,30 @@ class AdmissionController {
/// Returns the maximum number of requests that can be queued in the pool.
static int64_t GetMaxQueuedForPool(const TPoolConfig& pool_config);
/// Return True if the pool configuration contains a User Quota.
static bool HasQuotaConfig(const TPoolConfig& pool_cfg);
/// Returns true if this query has sufficient user and group quotas in the specified
/// pool with config 'pool_cfg'.
/// Returns true if quotas are not configured.
/// Must hold admission_ctrl_lock_.
bool HasSufficientPoolQuotas(const string& user, const TPoolConfig& pool_cfg,
const string& pool_level, int64_t user_load, string* quota_exceeded_reason) const;
/// Check that the query will not exceed a per-user limit for the delegated user.
/// Returns True if there is sufficient quota or if no per-user quota is configured.
/// When a rule is evaluated, and passed, then *key_matched is set to True.
static bool HasSufficientUserQuota(const string& user, const TPoolConfig& pool_cfg,
const string& pool_name, int64_t user_load, string* quota_exceeded_reason,
bool use_wildcard, bool* key_matched);
/// Check that the query will not exceed a per-group limit for the delegated user.
/// Returns True if there is sufficient quota or if no per-group quota is configured.
/// When a rule is evaluated, and passed, then *key_matched is set to True.
bool HasSufficientGroupQuota(const string& user, const TPoolConfig& pool_cfg,
const string& pool_name, int64_t user_load, string* quota_exceeded_reason,
bool* key_matched) const;
/// Returns available memory and slots of the executor group.
const std::pair<int64_t, int64_t> GetAvailableMemAndSlots(
const ExecutorGroup& group) const;
@@ -1160,10 +1317,6 @@ class AdmissionController {
/// Returns the current size of the cluster.
int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot);
/// Returns the size of executor group 'group_name' in 'membership_snapshot'.
int64_t GetExecutorGroupSize(const ClusterMembershipMgr::Snapshot& membership_snapshot,
const std::string& group_name);
/// Get the amount of memory to admit for the Backend with the given
/// BackendScheduleState. This method may return different values depending on if the
/// Backend is an Executor or a Coordinator.
@@ -1211,8 +1364,8 @@ class AdmissionController {
std::vector<Item>& listOfTopNs, std::vector<int>& indices, int indent) const;
// Report the topN queries in a string and append it to 'ss'. These queries are
// a subset of items in listOfTopNs whose indices are in 'indices'. One query Id
// together its mem consumed is reported per line. 'indent' provides the initial
// a subset of items in listOfTopNs whose indices are in 'indices'. One query id
// together with its mem consumed is reported per line. 'indent' provides the initial
// value of indentation. 'total_mem_consumed' contains the total memory consumed,
// from which a fraction of mem consumed by the topN queries can be reported.
void ReportTopNQueriesAtIndices(std::stringstream& ss, std::vector<Item>& listOfTopNs,
@@ -1222,18 +1375,22 @@ class AdmissionController {
void ReleaseQueryBackendsLocked(const UniqueIdPB& query_id, const UniqueIdPB& coord_id,
const vector<NetworkAddressPB>& host_addr);
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, AggregatedUserLoads);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots);
FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlotsDefault);
FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
FRIEND_TEST(AdmissionControllerTest, QueryRejection);
FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState);
FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState);
FRIEND_TEST(AdmissionControllerTest, DequeueLoop);
FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
FRIEND_TEST(AdmissionControllerTest, PoolStats);
FRIEND_TEST(AdmissionControllerTest, QueryRejection);
FRIEND_TEST(AdmissionControllerTest, QuotaExamples);
FRIEND_TEST(AdmissionControllerTest, Simple);
FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
FRIEND_TEST(AdmissionControllerTest, EraseHostStats);
FRIEND_TEST(AdmissionControllerTest, UserAndGroupQuotas);
friend class AdmissionControllerTest;
};

View File

@@ -290,6 +290,7 @@ class ClusterMembershipMgr {
friend class impala::test::SchedulerWrapper;
friend class ClusterMembershipMgrUnitTest_TestPopulateExpectedExecGroupSets_Test;
friend class AdmissionControllerTest;
};
/// Helper method to populate a thrift request object 'update_req' for cluster membership

View File

@@ -17,25 +17,24 @@
#include "scheduling/request-pool-service.h"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <list>
#include <string>
#include <boost/algorithm/string/join.hpp>
#include <gutil/strings/substitute.h>
#include "common/constant-strings.h"
#include "common/logging.h"
#include "common/names.h"
#include "rpc/jni-thrift-util.h"
#include "service/query-options.h"
#include "util/auth-util.h"
#include "util/backend-gflag-util.h"
#include "util/collection-metrics.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
#include "util/test-info.h"
#include "util/time.h"
#include "common/names.h"
using namespace impala;
DEFINE_bool(require_username, false, "Requires that a user be provided in order to "
@@ -115,10 +114,12 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
jmethodID start_id; // JniRequestPoolService.start(), only called in this method.
JniMethodDescriptor methods[] = {
{"<init>", "(Ljava/lang/String;Ljava/lang/String;Z)V", &ctor_},
{"<init>", "([BLjava/lang/String;Ljava/lang/String;Z)V", &ctor_},
{"start", "()V", &start_id},
{"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
{"getPoolConfig", "([B)[B", &get_pool_config_id_}};
{"getPoolConfig", "([B)[B", &get_pool_config_id_},
{"getHadoopGroups", "([B)[B", &get_hadoop_groups_id_}
};
JNIEnv* jni_env = JniUtil::GetJNIEnv();
jni_request_pool_service_class_ =
@@ -130,6 +131,9 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
JniUtil::LoadJniMethod(jni_env, jni_request_pool_service_class_, &(methods[i])));
}
jbyteArray cfg_bytes;
ABORT_IF_ERROR(GetThriftBackendGFlagsForJNI(jni_env, &cfg_bytes));
jstring fair_scheduler_config_path =
jni_env->NewStringUTF(FLAGS_fair_scheduler_allocation_path.c_str());
ABORT_IF_EXC(jni_env);
@@ -139,7 +143,7 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
jboolean is_be_test = TestInfo::is_be_test();
jobject jni_request_pool_service = jni_env->NewObject(jni_request_pool_service_class_,
ctor_, fair_scheduler_config_path, llama_site_path, is_be_test);
ctor_, cfg_bytes, fair_scheduler_config_path, llama_site_path, is_be_test);
ABORT_IF_EXC(jni_env);
ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(
jni_env, jni_request_pool_service, &jni_request_pool_service_));
@@ -205,3 +209,9 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
if (FLAGS_disable_pool_mem_limits) pool_config->__set_max_mem_resources(-1);
return Status::OK();
}
Status RequestPoolService::GetHadoopGroups(
const TGetHadoopGroupsRequest& request, TGetHadoopGroupsResponse* response) {
return JniUtil::CallJniMethod(
jni_request_pool_service_, get_hadoop_groups_id_, request, response);
}

View File

@@ -54,6 +54,10 @@ class RequestPoolService {
/// is ignored.
Status GetPoolConfig(const std::string& pool_name, TPoolConfig* pool_config);
/// Returns (in the output parameter) the list of groups for the given user.
Status GetHadoopGroups(const TGetHadoopGroupsRequest& request,
TGetHadoopGroupsResponse* response);
private:
/// Metric measuring the time ResolveRequestPool() takes, in milliseconds.
StatsMetric<double>* resolve_pool_ms_metric_;
@@ -77,6 +81,7 @@ class RequestPoolService {
jobject jni_request_pool_service_;
jmethodID resolve_request_pool_id_; // RequestPoolService.resolveRequestPool()
jmethodID get_pool_config_id_; // RequestPoolService.getPoolConfig()
jmethodID get_hadoop_groups_id_; // RequestPoolService.getHadoopGroups()
jmethodID ctor_;
};

View File

@@ -478,29 +478,29 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
<< "<" << TNetworkAddressToString(state->network_address) << ">.";
}
void ImpalaServer::DecrementCount(
std::map<std::string, int64>& loads, const std::string& key) {
int64_t ImpalaServer::DecrementCount(
std::map<std::string, int64_t>& loads, const std::string& key) {
// Check if key is present as dereferencing the map will insert it.
// FIXME C++20: use contains().
if (!loads.count(key)) {
string msg = Substitute("Missing key $0 when decrementing count", key);
LOG(WARNING) << msg;
DCHECK(false) << msg;
return;
return 0;
}
int64& current_value = loads[key];
int64_t& current_value = loads[key];
if (current_value == 1) {
// Remove the entry from the map if the current_value will go to zero.
loads.erase(key);
return;
return 0;
}
if (current_value < 1) {
// Don't allow decrement below zero.
string msg = Substitute("Attempt to decrement below zero with key $0 ", key);
LOG(WARNING) << msg;
return;
return 0;
}
loads[key]--;
return --loads[key];
}
void ImpalaServer::DecrementSessionCount(const string& user_name) {
@@ -515,7 +515,7 @@ Status ImpalaServer::IncrementAndCheckSessionCount(const string& user_name) {
lock_guard<mutex> l(per_user_session_count_lock_);
// Only check user limit if there is already a session for the user.
if (per_user_session_count_map_.count(user_name)) {
int64 load = per_user_session_count_map_[user_name];
int64_t load = per_user_session_count_map_[user_name];
if (load >= FLAGS_max_hs2_sessions_per_user) {
const string& err_msg =
Substitute("Number of sessions for user $0 exceeds coordinator limit $1",

View File

@@ -716,9 +716,11 @@ class ImpalaServer : public ImpalaServiceIf,
TQueryOptions QueryOptions();
};
/// Helper function that decrements the value associated with the given key.
/// Removes the entry from the map if the value becomes zero.
static void DecrementCount(std::map<std::string, int64>& loads, const std::string& key);
/// Helper function that decrements the value associated with the given key, and
/// returns the new value. If the value becomes zero, then the entry is removed from
/// the map
static int64_t DecrementCount(
std::map<std::string, int64_t>& loads, const std::string& key);
private:
struct ExpirationEvent;
@@ -1438,7 +1440,7 @@ class ImpalaServer : public ImpalaServiceIf,
std::mutex connection_to_sessions_map_lock_;
/// A map from user to a count of sessions created by the user.
typedef std::map<std::string, int64> SessionCounts;
typedef std::map<std::string, int64_t> SessionCounts;
SessionCounts per_user_session_count_map_;
/// Protects per_user_session_count_map_. See "Locking" in the class comment for lock

View File

@@ -18,6 +18,7 @@
#include "util/auth-util.h"
#include <ostream>
#include <regex>
#include <boost/algorithm/string/classification.hpp>
@@ -47,6 +48,29 @@ const string& GetEffectiveUser(const TSessionState& session) {
return session.connected_user;
}
Status GetEffectiveShortUser(const TSessionState& session, std::string* short_name) {
const string& effective_user = GetEffectiveUser(session);
if (IsKerberosEnabled() && (effective_user.find('@') != std::string::npos)) {
// Regex to match kerberos names, based on org.apache.hadoop.security.KerberosName.
static const std::regex kerberos_name("([^/@]*)(/([^/@]*))*@([^/@]*)");
std::smatch groups;
if (std::regex_match(effective_user, groups, kerberos_name)) {
DCHECK_GE(groups.size(), 2);
// The first group contains the name.
if (!groups[1].str().empty()) {
*short_name = groups[1].str();
return Status::OK();
}
}
stringstream ss;
ss << "Could not parse Kerberos name " << effective_user;
return Status::Expected(ss.str());
} else {
*short_name = effective_user;
}
return Status::OK();
}
const string& GetEffectiveUser(const ImpalaServer::SessionState& session) {
return session.do_as_user.empty() ? session.connected_user : session.do_as_user;
}

View File

@@ -41,6 +41,13 @@ const std::string& GetEffectiveUser(const TSessionState& session);
/// Same behavior as the function above with different input parameter type.
const std::string& GetEffectiveUser(const ImpalaServer::SessionState& session);
/// Semantically the same as the above GetEffectiveUser(), but obtains the short form of
/// the name. If Kerberos is enabled and the name is a Kerberos principal then it is
/// mapped to a short username (e.g., the testuser in testuser/scm@EXAMPLE.COM).
/// Fills a reference to the "effective user" from the specified session
/// in 'short_name'.
Status GetEffectiveShortUser(const TSessionState& session, std::string* short_name);
/// Checks if 'user' can access the runtime profile or execution summary of a
/// statement by comparing 'user' with the user that run the statement, 'effective_user',
/// and checking if 'effective_user' is authorized to access the profile, as indicated by

View File

@@ -131,6 +131,7 @@ DECLARE_int32(dbcp_max_conn_pool_size);
DECLARE_int32(dbcp_max_wait_millis_for_conn);
DECLARE_int32(dbcp_data_source_idle_timeout_s);
DECLARE_bool(enable_catalogd_ha);
DECLARE_string(injected_group_members_debug_only);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -500,6 +501,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_dbcp_data_source_idle_timeout(FLAGS_dbcp_data_source_idle_timeout_s);
cfg.__set_data_stream_sender_buffer_size_used_by_planner(
FLAGS_data_stream_sender_buffer_size_used_by_planner);
cfg.__set_injected_group_members_debug_only(FLAGS_injected_group_members_debug_only);
#ifdef NDEBUG
cfg.__set_is_release_build(true);
#else

View File

@@ -48,8 +48,9 @@ template <typename T>
class SetMetric : public Metric {
public:
static SetMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::set<T>& value) {
return metrics->RegisterMetric(new SetMetric(MetricDefs::Get(key), value));
const std::set<T>& value, const std::string& metric_def_arg = "") {
return metrics->RegisterMetric(
new SetMetric(MetricDefs::Get(key, metric_def_arg), value));
}
SetMetric(const TMetricDef& def, const std::set<T>& value)

View File

@@ -762,6 +762,7 @@ class RuntimeProfile : public RuntimeProfileBase {
private:
friend class AggregatedRuntimeProfile;
friend class AdmissionControllerTest;
/// A set of bucket counters registered in this runtime profile.
std::set<std::vector<Counter*>*> bucketing_counters_;

View File

@@ -314,4 +314,6 @@ struct TBackendGflags {
141: required i64 data_stream_sender_buffer_size_used_by_planner
142: required bool disable_reading_puffin_stats
143: required string injected_group_members_debug_only
}

View File

@@ -221,6 +221,17 @@ struct TPoolConfig {
// of the coordinators.
// 0 indicates no limit. Default value is set as 0.
11: required i64 max_query_cpu_core_coordinator_limit = 0;
// Map from user name to a per user limit on the number of queries.
// A user name of "*" can be used as a wildcard to limit the number of concurrent
// queries that can be queued by any user.
// If a user name is present this overrides any wildcard limit.
12: required map<string, i32> user_query_limits
// A map is from group name to a per user limit on the number of queries.
// If a rule for the user is not present in user_query_limits, then these rules
// are evaluated, if the user is a member of a group.
13: required map<string, i32> group_query_limits
}
struct TParseDateStringResult {

View File

@@ -77,6 +77,9 @@ struct TPoolStats {
// a pool. These queries must be tracked by some query mem trackers. In comparison,
// num_admitted_running tracks the number of queries admitted in a host.
8: required i64 num_running;
// Per-user count of queries that are queued or running.
9: required map<string, i64> user_loads
}
struct THostStats {

View File

@@ -289,6 +289,16 @@
"kind": "GAUGE",
"key": "admission-controller.agg-num-running.$0"
},
{
"description": "Resource Pool $0 Aggregate Users",
"contexts": [
"RESOURCE_POOL"
],
"label": "Resource Pool $0 Aggregate Users",
"units": "NONE",
"kind": "SET",
"key": "admission-controller.agg-current-users.$0"
},
{
"description": "Total number of requests admitted to pool $0",
"contexts": [
@@ -369,6 +379,16 @@
"kind": "GAUGE",
"key": "admission-controller.local-num-admitted-running.$0"
},
{
"description": "Resource Pool $0 Local Users",
"contexts": [
"RESOURCE_POOL"
],
"label": "Resource Pool $0 Local Users",
"units": "NONE",
"kind": "SET",
"key": "admission-controller.local-current-users.$0"
},
{
"description": "Total number of requests queued in pool $0",
"contexts": [

View File

@@ -30,6 +30,7 @@ import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
@@ -381,4 +382,32 @@ public class JniUtil {
}
return sb.toString();
}
/**
* Evaluate the groups that the user is in when injected groups are being used.
* @param flags input string which is the format of the backend
* 'injected_group_members_debug_only' flag
* @param username the username
* @return a list of group names
*/
public static List<String> decodeInjectedGroups(String flags, String username) {
List<String> groups = new ArrayList<>();
if (flags == null || username == null) { return groups; }
for (String group : flags.split(";")) {
String[] parts = group.split(":");
if (parts.length != 2) {
throw new IllegalStateException(
"group " + group + " is malformed in injected groups string '" + flags + "'");
}
String groupName = parts[0];
for (String member : parts[1].split(",")) {
if (member.equals(username)) {
groups.add(groupName);
break; // Skip to the next group after finding the user.
}
}
}
return groups;
}
}

View File

@@ -545,4 +545,8 @@ public class BackendConfig {
public boolean isCatalogdHAEnabled() {
return backendCfg_.enable_catalogd_ha;
}
public String getInjectedGroupMembersDebugOnly() {
return backendCfg_.injected_group_members_debug_only;
}
}

View File

@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -39,13 +40,10 @@ import org.apache.impala.authentication.saml.WrappedWebContext;
import org.apache.impala.authorization.AuthorizationFactory;
import org.apache.impala.authorization.ImpalaInternalAdminUser;
import org.apache.impala.authorization.User;
import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.FeDataSource;
import org.apache.impala.catalog.FeDb;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
@@ -56,10 +54,8 @@ import org.apache.impala.service.Frontend.PlanCtx;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TBuildTestDescriptorTableParams;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDescribeDbParams;
import org.apache.impala.thrift.TDescribeOutputStyle;
import org.apache.impala.thrift.TDescribeResult;
import org.apache.impala.thrift.TDescribeTableParams;
import org.apache.impala.thrift.TDescriptorTable;
@@ -85,7 +81,6 @@ import org.apache.impala.thrift.TLoadDataReq;
import org.apache.impala.thrift.TLoadDataResp;
import org.apache.impala.thrift.TLogLevel;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TConvertTableRequest;
import org.apache.impala.thrift.TQueryCompleteContext;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TResultSet;
@@ -106,6 +101,7 @@ import org.apache.impala.thrift.TWrappedHttpResponse;
import org.apache.impala.util.AuthorizationUtil;
import org.apache.impala.util.ExecutorMembershipSnapshot;
import org.apache.impala.util.GlogAppender;
import org.apache.impala.util.JniRequestPoolService;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.TSessionStateUtil;
import org.apache.log4j.Appender;
@@ -119,11 +115,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.IllegalArgumentException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* JNI-callable interface onto a wrapped Frontend instance. The main point is to serialise
@@ -705,28 +701,7 @@ public class JniFrontend {
* Returns the list of Hadoop groups for the given user name.
*/
public byte[] getHadoopGroups(byte[] serializedRequest) throws ImpalaException {
TGetHadoopGroupsRequest request = new TGetHadoopGroupsRequest();
JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
TGetHadoopGroupsResponse result = new TGetHadoopGroupsResponse();
try {
result.setGroups(GROUPS.getGroups(request.getUser()));
} catch (IOException e) {
// HACK: https://issues.apache.org/jira/browse/HADOOP-15505
// There is no easy way to know if no groups found for a user
// other than reading the exception message.
if (e.getMessage().startsWith("No groups found for user")) {
result.setGroups(Collections.<String>emptyList());
} else {
LOG.error("Error getting Hadoop groups for user: " + request.getUser(), e);
throw new InternalException(e.getMessage());
}
}
try {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(result);
} catch (TException e) {
throw new InternalException(e.getMessage());
}
return JniRequestPoolService.getHadoopGroupsInternal(serializedRequest);
}
/**

View File

@@ -19,10 +19,17 @@ package org.apache.impala.util;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TErrorCode;
import org.apache.hadoop.security.Groups;
import org.apache.impala.thrift.TGetHadoopGroupsRequest;
import org.apache.impala.thrift.TGetHadoopGroupsResponse;
import org.apache.impala.thrift.TPoolConfigParams;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
@@ -35,6 +42,10 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* JNI interface for RequestPoolService.
*/
@@ -47,6 +58,10 @@ public class JniRequestPoolService {
// A single instance is created by the backend and lasts the duration of the process.
private final RequestPoolService requestPoolService_;
private static final Configuration CONF = new Configuration();
private static final Groups GROUPS = Groups.getUserToGroupsMappingService(CONF);
/**
* Creates a RequestPoolService instance with a configuration containing the specified
* fair-scheduler.xml and llama-site.xml.
@@ -54,9 +69,13 @@ public class JniRequestPoolService {
* @param fsAllocationPath path to the fair scheduler allocation file.
* @param sitePath path to the configuration file.
*/
JniRequestPoolService(
final String fsAllocationPath, final String sitePath, boolean isBackendTest) {
public JniRequestPoolService(byte[] thriftBackendConfig, final String fsAllocationPath,
final String sitePath, boolean isBackendTest) throws ImpalaException {
Preconditions.checkNotNull(fsAllocationPath);
TBackendGflags cfg = new TBackendGflags();
JniUtil.deserializeThrift(protocolFactory_, cfg, thriftBackendConfig);
BackendConfig.create(cfg, false);
requestPoolService_ =
RequestPoolService.getInstance(fsAllocationPath, sitePath, isBackendTest);
}
@@ -109,4 +128,48 @@ public class JniRequestPoolService {
throw new InternalException(e.getMessage());
}
}
/**
* Returns the list of Hadoop groups for the given user name.
*/
public static byte[] getHadoopGroupsInternal(byte[] serializedRequest)
throws ImpalaException {
TGetHadoopGroupsRequest request = new TGetHadoopGroupsRequest();
JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
TGetHadoopGroupsResponse result = new TGetHadoopGroupsResponse();
String user = request.getUser();
String injectedGroups = BackendConfig.INSTANCE.getInjectedGroupMembersDebugOnly();
if (StringUtils.isEmpty(injectedGroups)) {
try {
result.setGroups(GROUPS.getGroups(user));
} catch (IOException e) {
// HACK: https://issues.apache.org/jira/browse/HADOOP-15505
// There is no easy way to know if no groups found for a user
// other than reading the exception message.
if (e.getMessage().startsWith("No groups found for user")) {
result.setGroups(Collections.emptyList());
} else {
LOG.error("Error getting Hadoop groups for user: " + request.getUser(), e);
throw new InternalException(e.getMessage());
}
}
} else {
List<String> groups = JniUtil.decodeInjectedGroups(injectedGroups, user);
LOG.info("getHadoopGroups returns injected groups " + groups + " for user " + user);
result.setGroups(groups);
}
try {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(result);
} catch (TException e) {
throw new InternalException(e.getMessage());
}
}
/**
* Returns the list of Hadoop groups for the given user name.
*/
public byte[] getHadoopGroups(byte[] serializedRequest) throws ImpalaException {
return getHadoopGroupsInternal(serializedRequest);
}
}

View File

@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,7 +69,7 @@ import com.google.common.collect.Lists;
* Yarn {@link AllocationFileLoaderService} and the Llama configuration uses a subclass of
* the {@link FileWatchService}. There are two different mechanisms because there is
* different parsing/configuration code for the allocation file and the Llama
* configuration (which is a regular Hadoop conf file so it can use the
* configuration (which is a regular Hadoop conf file, so it can use the
* {@link Configuration} class). start() and stop() will start/stop watching and reloading
* both of these files.
*
@@ -138,7 +139,7 @@ public class RequestPoolService {
@VisibleForTesting
final AllocationFileLoaderService allocLoader_;
// Provides access to the fair scheduler allocation file. An AtomicReference becaus it
// Provides access to the fair scheduler allocation file. An AtomicReference because it
// is reset when the allocation configuration file changes and other threads access it.
private final AtomicReference<AllocationConfiguration> allocationConf_;
@@ -369,6 +370,11 @@ public class RequestPoolService {
long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
result.setMax_mem_resources(
maxMemoryMb == Integer.MAX_VALUE ? -1 : maxMemoryMb * ByteUnits.MEGABYTE);
Map<String, Integer> userQueryLimits = allocationConf_.get().getUserQueryLimits(pool);
result.setUser_query_limits(userQueryLimits);
Map<String, Integer> groupQueryLimits =
allocationConf_.get().getGroupQueryLimits(pool);
result.setGroup_query_limits(groupQueryLimits);
if (conf_ == null) {
result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT);
result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT);
@@ -404,12 +410,15 @@ public class RequestPoolService {
+ " max_queued={}, queue_timeout_ms={}, default_query_options={},"
+ " max_query_mem_limit={}, min_query_mem_limit={},"
+ " clamp_mem_limit_query_option={}, max_query_cpu_core_per_node_limit={},"
+ " max_query_cpu_core_coordinator_limit={}",
+ " max_query_cpu_core_coordinator_limit={}"
+ " user_query_limits={}"
+ " group_query_limits={}",
pool, result.max_mem_resources, result.max_requests, result.max_queued,
result.queue_timeout_ms, result.default_query_options,
result.max_query_mem_limit, result.min_query_mem_limit,
result.clamp_mem_limit_query_option, result.max_query_cpu_core_per_node_limit,
result.max_query_cpu_core_coordinator_limit);
result.max_query_cpu_core_coordinator_limit, result.user_query_limits,
result.group_query_limits);
}
return result;
}
@@ -502,4 +511,4 @@ public class RequestPoolService {
Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv());
return allocationConf_.get();
}
}
}

View File

@@ -17,7 +17,9 @@
package org.apache.impala.util;
import static org.apache.impala.common.JniUtil.decodeInjectedGroups;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.JniUtil;
@@ -25,6 +27,8 @@ import org.apache.impala.thrift.TCacheJarParams;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.Test;
import java.util.List;
/**
* Unit tests for JniUtil functions.
*/
@@ -43,4 +47,44 @@ public class JniUtilTest {
JniUtil.deserializeThrift(protocolFactory_, deserializedTestObj, testObjBytes);
assertEquals(deserializedTestObj.hdfs_location, "test string");
}
static private void assertSingleGroup(String group, List<String> list) {
assertEquals(1, list.size());
assertEquals(group, list.get(0));
}
static private void assertEmpty(List<String> list) { assertTrue(list.isEmpty()); }
/**
* Unit test for {@link JniUtil#decodeInjectedGroups(String, String)}
*/
@Test
public void testDecodeInjectedGroups() {
assertEmpty(decodeInjectedGroups(null, "andrew"));
assertEmpty(decodeInjectedGroups("a_group", null));
String admissionTestFlags = "group0:userA;"
+ "group1:user1,user3;"
+ "dev:alice,deborah;"
+ "it:bob,fiona;"
+ "support:claire,geeta,howard;";
assertEmpty(decodeInjectedGroups(admissionTestFlags, "boris"));
assertSingleGroup("group1", decodeInjectedGroups(admissionTestFlags, "user1"));
assertSingleGroup("group1", decodeInjectedGroups(admissionTestFlags, "user3"));
assertSingleGroup("dev", decodeInjectedGroups(admissionTestFlags, "deborah"));
assertSingleGroup("dev", decodeInjectedGroups(admissionTestFlags, "alice"));
assertSingleGroup("it", decodeInjectedGroups(admissionTestFlags, "fiona"));
assertSingleGroup("it", decodeInjectedGroups(admissionTestFlags, "bob"));
assertSingleGroup("support", decodeInjectedGroups(admissionTestFlags, "claire"));
assertSingleGroup("support", decodeInjectedGroups(admissionTestFlags, "geeta"));
assertSingleGroup("support", decodeInjectedGroups(admissionTestFlags, "howard"));
String multiGroupString = "group1:user1;group2:user1,user2,user3";
List<String> groups = decodeInjectedGroups(multiGroupString, "user1");
assertEquals(2, groups.size());
assertTrue(groups.contains("group1"));
assertTrue(groups.contains("group2"));
}
}

View File

@@ -21,6 +21,16 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.AfterClass;
@@ -31,9 +41,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static java.util.Arrays.asList;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
import static org.apache.impala.yarn.server.resourcemanager.scheduler.fair.
AllocationFileLoaderService.addQueryLimits;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
@@ -45,10 +59,16 @@ import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
import org.apache.impala.thrift.TResolveRequestPoolResult;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.
AllocationConfigurationException;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.xml.sax.SAXException;
/**
* Unit tests for the user to pool resolution, authorization, and getting configuration
* parameters via {@link RequestPoolService}. Sets a configuration file and ensures the
@@ -57,12 +77,14 @@ import com.google.common.io.Files;
* the updated values are returned.
* TODO: Move tests to C++ to test the API that's actually used.
*/
@SuppressWarnings("ArraysAsListWithZeroOrOneArgument")
public class TestRequestPoolService {
// Pool definitions and includes memory resource limits, copied to a temporary file
private static final String ALLOCATION_FILE = "fair-scheduler-test.xml";
// A second allocation file which overwrites the temporary file to check for changes.
private static final String ALLOCATION_FILE_MODIFIED = "fair-scheduler-test2.xml";
private static final String ALLOCATION_FILE_EXTRA = "fair-scheduler-test3.xml";
private static final String ALLOCATION_FILE_EMPTY = "fair-scheduler-empty.xml";
private static final String ALLOCATION_FILE_GROUP_RULE = "fair-scheduler-group-rule.xml";
@@ -78,6 +100,8 @@ public class TestRequestPoolService {
// changing the file.
private static final long CHECK_INTERVAL_MS = 100L;
public static final List<String> EMPTY_LIST = Collections.emptyList();
// Temp folder where the config files are copied so we can modify them in place.
// The JUnit @Rule creates and removes the temp folder between every test.
@Rule
@@ -191,29 +215,64 @@ public class TestRequestPoolService {
@Test
public void testPoolAcls() throws Exception {
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertFalse(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "root"));
checkPoolAcls("root.queueA", asList("userA", "userB", "userZ"), EMPTY_LIST);
checkPoolAcls("root.queueB", asList("userB", "root"), asList("userA", "userZ"));
checkPoolAcls("root.queueD", asList("userB", "userA"), asList("userZ"));
}
/**
* Check that the access to the pool is as expected.
* @param queueName name of queue.
* @param allowedUsers a List of users that should have access
* @param deniedUsers a List of users that should be denied access
*/
private void checkPoolAcls(String queueName, List<String> allowedUsers,
List<String> deniedUsers) throws InternalException {
for (String allowed : allowedUsers) {
Assert.assertTrue(poolService_.hasAccess(queueName, allowed));
}
for (String denied : deniedUsers) {
Assert.assertFalse(poolService_.hasAccess(queueName, denied));
}
}
@Test
public void testPoolLimitConfigs() throws Exception {
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
checkPoolConfigResult("root", 15, 50, -1, 30000L, "mem_limit=1024m");
Map<String, Integer> rootUserQueryLimits = new HashMap<>();
rootUserQueryLimits.put("userB", 6);
rootUserQueryLimits.put("*", 10);
Map<String, Integer> rootGroupQueryLimits = new HashMap<>();
rootGroupQueryLimits.put("group3", 5);
checkPoolConfigResult("root", 15, 50, -1, 30000L, "mem_limit=1024m",
rootUserQueryLimits, rootGroupQueryLimits);
checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE,
10000L, "mem_limit=1024m,query_timeout_s=10");
checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
"mem_limit=1024m", 1000, 10, false, 8, 8);
"mem_limit=1024m", 1000, 10, false, 8, 8, null, null);
Map<String, Integer> queueDUserQueryLimits = new HashMap<>();
queueDUserQueryLimits.put("userA", 2);
queueDUserQueryLimits.put("userF", 2);
queueDUserQueryLimits.put("userG", 101);
queueDUserQueryLimits.put("*", 3);
Map<String, Integer> queueDGroupQueryLimits = new HashMap<>();
queueDGroupQueryLimits.put("group1", 1);
queueDGroupQueryLimits.put("group2", 1);
checkPoolConfigResult("root.queueD", 5, 10, -1, 30000L, "mem_limit=1024m",
queueDUserQueryLimits, queueDGroupQueryLimits);
}
@Test
public void testDefaultConfigs() throws Exception {
createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.userA", "userA"));
checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0);
checkPoolAcls("root.userA", asList("userA", "userB", "userZ"), EMPTY_LIST);
checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, null, null);
}
@Ignore("IMPALA-4868") @Test
@@ -255,6 +314,61 @@ public class TestRequestPoolService {
checkModifiedConfigResults();
}
/**
* Validate reading user and group quotas
*/
@Test
public void testReadUserGroupQuotas() throws Exception {
createPoolService(ALLOCATION_FILE_EXTRA, null);
TPoolConfig rootConfig = poolService_.getPoolConfig("root");
Map<String, Integer> rootUserExpected = new HashMap<String, Integer>() {
{
put("*", 8);
put("howard", 4);
}
};
Assert.assertEquals(rootUserExpected, rootConfig.user_query_limits);
Map<String, Integer> rootGroupExpected = new HashMap<String, Integer>() {
{ put("support", 6); }
};
Assert.assertEquals(rootGroupExpected, rootConfig.group_query_limits);
TPoolConfig smallConfig = poolService_.getPoolConfig("root.group-set-small");
Map<String, Integer> smallUserExpected = new HashMap<String, Integer>() {
{
put("*", 1);
put("alice", 4);
}
};
Assert.assertEquals(smallUserExpected, smallConfig.user_query_limits);
Map<String, Integer> smallGroupExpected = new HashMap<String, Integer>() {
{
put("support", 5);
put("dev", 5);
put("it", 2);
}
};
Assert.assertEquals(smallGroupExpected, smallConfig.group_query_limits);
TPoolConfig largeConfig = poolService_.getPoolConfig("root.group-set-large");
Map<String, Integer> largeUserExpected = new HashMap<String, Integer>() {
{
put("*", 1);
put("alice", 4);
put("claire", 3);
}
};
Assert.assertEquals(largeUserExpected, largeConfig.user_query_limits);
Map<String, Integer> largeGroupExpected = new HashMap<String, Integer>() {
{
put("support", 1);
put("dev", 2);
}
};
Assert.assertEquals(largeGroupExpected, largeConfig.group_query_limits);
}
// Test pool resolution
@Test
public void testNullLlamaSite() throws Exception {
createPoolService(ALLOCATION_FILE_MODIFIED, null);
@@ -265,17 +379,165 @@ public class TestRequestPoolService {
Assert.assertEquals("root.queueC", poolService_.assignToPool("queueC", "userA"));
// Test pool ACLs
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertFalse(poolService_.hasAccess("root.queueC", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueC", "root"));
checkPoolAcls("root.queueA", asList("userA", "userB"), EMPTY_LIST);
checkPoolAcls("root.queueB", asList("userA", "userB"), EMPTY_LIST);
checkPoolAcls("root.queueC", asList("userC", "root"), asList("userA", "userB"));
checkPoolAcls("root.queueD", asList("userA", "userB"), EMPTY_LIST);
// Test pool limits
checkPoolConfigResult("root", -1, 200, -1);
Map<String, Integer> rootQueryLimits = new HashMap<>();
Map<String, Integer> rootGroupLimits = new HashMap<>();
rootQueryLimits.put("userD", 2);
checkPoolConfigResult(
"root", -1, 200, -1, null, "", rootQueryLimits, rootGroupLimits);
checkPoolConfigResult("root.queueA", -1, 200, 100000 * ByteUnits.MEGABYTE);
checkPoolConfigResult("root.queueB", -1, 200, -1);
checkPoolConfigResult("root.queueC", -1, 200, 128 * ByteUnits.MEGABYTE);
Map<String, Integer> queueEUserQueryLimits = new HashMap<>();
queueEUserQueryLimits.put("userA", 3);
queueEUserQueryLimits.put("userG", 3);
queueEUserQueryLimits.put("userH", 0);
queueEUserQueryLimits.put("*", 1);
Map<String, Integer> queueEGroupQueryLimits = new HashMap<>();
queueEGroupQueryLimits.put("group1", 2);
queueEGroupQueryLimits.put("group2", 2);
checkPoolConfigResult("root.queueE", -1, 200, -1, null, "",
queueEUserQueryLimits, queueEGroupQueryLimits);
}
/**
* Parse a snippet of xml, and then call addQueryLimits() on the root element.
*/
private static Map<String, Integer> doQueryLimitParsing(String xmlString, String name)
throws ParserConfigurationException, SAXException, IOException,
AllocationConfigurationException {
// Create a DocumentBuilderFactory instance
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
// Create a DocumentBuilder instance
DocumentBuilder builder = factory.newDocumentBuilder();
// Parse the XML string into a Document object
Document document =
builder.parse(new java.io.ByteArrayInputStream(xmlString.getBytes()));
// Get the root element
Element rootElement = document.getDocumentElement();
Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>();
String queueName = "queue1";
addQueryLimits(queueName, rootElement, "userQueryLimit", userQueryLimits, name);
return userQueryLimits.get(queueName);
}
/**
* Call doQueryLimitParsing() and check that it threw an exception, and that the
* exception message contains the expected text.
*/
private static void assertFailureMessage(String xmlString, String expectedError) {
try {
doQueryLimitParsing(xmlString, "user");
Assert.fail(
"did not get expected exception, with expected message " + expectedError);
} catch (Exception e) {
Assert.assertTrue(e instanceof AllocationConfigurationException);
Assert.assertTrue(e.getMessage().contains(expectedError));
}
}
/**
* Unit test for doQueryLimitParsing().
*/
@Test
public void testLimitsParsing() throws Exception {
String xmlString = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <totalCount>30</totalCount>",
"</userQueryLimit>"
);
Map<String, Integer> expected = new HashMap<String, Integer>() {{
put("John", 30);
}};
Map<String, Integer> parsed = doQueryLimitParsing(xmlString, "user");
Assert.assertEquals(expected, parsed);
String xmlString2 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <user>Barry</user>",
" <totalCount>30</totalCount>",
"</userQueryLimit>"
);
Map<String, Integer> expected2 = new HashMap<String, Integer>() {{
put("John", 30);
put("Barry", 30);
}};
Map<String, Integer> parsed2 = doQueryLimitParsing(xmlString2, "user");
Assert.assertEquals(expected2, parsed2);
String xmlString3 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<groupQueryLimit>",
" <group>group1</group>",
" <group>group2</group>",
" <totalCount>1</totalCount>",
"</groupQueryLimit>"
);
Map<String, Integer> expected3 = new HashMap<String, Integer>() {{
put("group1", 1);
put("group2", 1);
}};
Map<String, Integer> parsed3 = doQueryLimitParsing(xmlString3, "group");
Assert.assertEquals(expected3, parsed3);
}
/**
* Unit test for doQueryLimitParsing() error cases.
*/
@Test
public void testLimitsParsingErrors() throws Exception {
String xmlString1 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <totalCount>30</totalCount>",
"</userQueryLimit>"
);
assertFailureMessage(xmlString1, "Empty user names");
String xmlString2 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <user>Barry</user>",
" <totalCount>30</totalCount>",
" <totalCount>31</totalCount>",
"</userQueryLimit>"
);
assertFailureMessage(xmlString2, "Duplicate totalCount tags");
String xmlString3 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <user>Barry</user>",
" <totalCount>fish</totalCount>",
"</userQueryLimit>"
);
assertFailureMessage(xmlString3, "Could not parse query totalCount");
String xmlString4 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <user>Barry</user>",
"</userQueryLimit>"
);
assertFailureMessage(xmlString4, "No totalCount for");
String xmlString5 = String.join("\n", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
"<userQueryLimit>",
" <user>John</user>",
" <user>John</user>",
" <totalCount>30</totalCount>",
"</userQueryLimit>"
);
assertFailureMessage(xmlString5, "Duplicate value given for name");
}
private void checkModifiedConfigResults()
@@ -286,21 +548,24 @@ public class TestRequestPoolService {
Assert.assertEquals("root.queueC", poolService_.assignToPool("queueC", "userA"));
// Test pool ACL changes
Assert.assertTrue(poolService_.hasAccess("root.queueA", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userB"));
Assert.assertTrue(poolService_.hasAccess("root.queueB", "userA"));
Assert.assertFalse(poolService_.hasAccess("root.queueC", "userA"));
Assert.assertTrue(poolService_.hasAccess("root.queueC", "root"));
checkPoolAcls("root.queueA", asList("userA", "userB"), EMPTY_LIST);
checkPoolAcls("root.queueB", asList("userA", "userB"), EMPTY_LIST);
checkPoolAcls("root.queueC", asList("userC", "root"), asList("userA", "userB"));
checkPoolAcls("root.queueD", asList("userA", "userB"), EMPTY_LIST);
// Test pool limit changes
checkPoolConfigResult("root", 15, 100, -1, 30000L, "");
Map<String, Integer> rootQueryLimits = new HashMap<>();
Map<String, Integer> rootGroupLimits = new HashMap<>();
rootQueryLimits.put("userD", 2);
checkPoolConfigResult(
"root", 15, 100, -1, 30000L, "", rootQueryLimits, rootGroupLimits);
// not_a_valid_option=foo.bar gets filtered out when parsing the query options on
// the backend, but it should be observed coming from the test file here.
checkPoolConfigResult("root.queueA", 1, 30, 100000 * ByteUnits.MEGABYTE,
50L, "mem_limit=128m,query_timeout_s=5,not_a_valid_option=foo.bar");
checkPoolConfigResult("root.queueA", 1, 30, 100000 * ByteUnits.MEGABYTE, 50L,
"mem_limit=128m,query_timeout_s=5,not_a_valid_option=foo.bar");
checkPoolConfigResult("root.queueB", 5, 10, -1, 600000L, "");
checkPoolConfigResult("root.queueC", 10, 30, 128 * ByteUnits.MEGABYTE,
30000L, "mem_limit=2048m,query_timeout_s=60");
checkPoolConfigResult("root.queueC", 10, 30, 128 * ByteUnits.MEGABYTE, 30000L,
"mem_limit=2048m,query_timeout_s=60");
}
/**
@@ -310,7 +575,8 @@ public class TestRequestPoolService {
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit,
boolean clamp_mem_limit_query_option, long max_query_cpu_core_per_node_limit,
long max_query_cpu_core_coordinator_limit) {
long max_query_cpu_core_coordinator_limit, Map<String, Integer> userQueryLimits,
Map<String, Integer> groupQueryLimits) {
TPoolConfig expectedResult = new TPoolConfig();
expectedResult.setMax_requests(expectedMaxRequests);
expectedResult.setMax_queued(expectedMaxQueued);
@@ -328,15 +594,30 @@ public class TestRequestPoolService {
if (expectedQueryOptions != null) {
expectedResult.setDefault_query_options(expectedQueryOptions);
}
Assert.assertEquals("Unexpected config values for pool " + pool,
expectedResult, poolService_.getPoolConfig(pool));
expectedResult.setUser_query_limits(
userQueryLimits != null ? userQueryLimits : Collections.emptyMap());
expectedResult.setGroup_query_limits(
groupQueryLimits != null ? groupQueryLimits : Collections.emptyMap());
TPoolConfig poolConfig = poolService_.getPoolConfig(pool);
Assert.assertEquals(
"Unexpected config values for pool " + pool, expectedResult, poolConfig);
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
String expectedQueryOptions) {
checkPoolConfigResult( pool, expectedMaxRequests,
expectedMaxQueued, expectedMaxMem, expectedQueueTimeoutMs,
expectedQueryOptions, Collections.emptyMap(), Collections.emptyMap());
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
String expectedQueryOptions, Map<String, Integer> userQueryLimits,
Map<String, Integer> groupQueryLimits) {
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued, expectedMaxMem,
expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0, 0);
expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0, 0,
userQueryLimits, groupQueryLimits);
}
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
@@ -344,4 +625,4 @@ public class TestRequestPoolService {
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
expectedMaxMemUsage, null, "");
}
}
}

View File

@@ -1,6 +1,18 @@
<?xml version="1.0"?>
<allocations>
<queue name="root">
<userQueryLimit>
<user>userB</user>
<totalCount>6</totalCount>
</userQueryLimit>
<userQueryLimit>
<user>*</user>
<totalCount>10</totalCount>
</userQueryLimit>
<groupQueryLimit>
<group>group3</group>
<totalCount>5</totalCount>
</groupQueryLimit>
<queue name="queueA">
<aclSubmitApps>* </aclSubmitApps>
<maxResources>1024 mb, 2 vcores</maxResources>
@@ -12,10 +24,32 @@
<aclSubmitApps>* </aclSubmitApps>
<maxResources>1024 mb, 0 vcores</maxResources>
</queue>
<queue name="queueD">
<aclSubmitApps>userA,userB </aclSubmitApps>
<userQueryLimit>
<user>*</user>
<totalCount>3</totalCount>
</userQueryLimit>
<userQueryLimit>
<user>userA</user>
<user>userF</user>
<totalCount>2</totalCount>
</userQueryLimit>
<userQueryLimit>
<!-- test that whitespace is trimmed -->
<user> userG </user>
<totalCount> 101 </totalCount>
</userQueryLimit>
<groupQueryLimit>
<group>group1</group>
<group>group2</group>
<totalCount>1</totalCount>
</groupQueryLimit>
</queue>
<aclSubmitApps> </aclSubmitApps>
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false"/>
<rule name="reject" />
</queuePlacementPolicy>
</allocations>
</allocations>

View File

@@ -1,6 +1,10 @@
<?xml version="1.0"?>
<allocations>
<queue name="root">
<userQueryLimit>
<user>userD</user>
<totalCount>2</totalCount>
</userQueryLimit>
<queue name="queueA">
<aclSubmitApps>*</aclSubmitApps>
<maxResources>100000 mb, 2 vcores</maxResources>
@@ -16,6 +20,34 @@
<aclSubmitApps>* </aclSubmitApps>
<maxResources>400 mb, 0 vcores</maxResources>
</queue>
<queue name="queueE">
<aclSubmitApps>* </aclSubmitApps>
<userQueryLimit>
<user>userA</user>
<user>userG</user>
<totalCount>3</totalCount>
</userQueryLimit>
<userQueryLimit>
<user>userH</user>
<totalCount>0</totalCount>
</userQueryLimit>
<groupQueryLimit>
<group>group1</group>
<group>group2</group>
<totalCount>2</totalCount>
</groupQueryLimit>
<userQueryLimit>
<user>*</user>
<totalCount>1</totalCount>
</userQueryLimit>
</queue>
<queue name="queueF">
<aclSubmitApps>* </aclSubmitApps>
<userQueryLimit>
<user>*</user>
<totalCount>30</totalCount>
</userQueryLimit>
</queue>
<aclSubmitApps> </aclSubmitApps>
</queue>
<queuePlacementPolicy>

View File

@@ -0,0 +1,81 @@
<?xml version="1.0"?>
<allocations>
<queue name="root">
<!-- A user not matched by any other rule can run 8 queries across the cluster -->
<userQueryLimit>
<user>*</user>
<totalCount>8</totalCount>
</userQueryLimit>
<!-- Members of group 'support' are limited to run 6 queries across the cluster -->
<groupQueryLimit>
<group>support</group>
<totalCount>6</totalCount>
</groupQueryLimit>
<!-- User 'howard' is limited to running 4 queries across the cluster -->
<userQueryLimit>
<user>howard</user>
<totalCount>4</totalCount>
</userQueryLimit>
<queue name="group-set-small">
<!-- A user not matched by any other rule can run 1 query in the small pool -->
<userQueryLimit>
<user>*</user>
<totalCount>1</totalCount>
</userQueryLimit>
<!-- Members of groups 'dev' and 'support' can run 5 queries in the small pool-->
<groupQueryLimit>
<group>dev</group>
<group>support</group>
<totalCount>5</totalCount>
</groupQueryLimit>
<!-- Members of the group 'it' can run 2 queries in the small pool-->
<groupQueryLimit>
<group>it</group>
<totalCount>2</totalCount>
</groupQueryLimit>
<!-- The user 'alice' can run 4 queries in the small pool -->
<userQueryLimit>
<user>alice</user>
<totalCount>4</totalCount>
</userQueryLimit>
</queue>
<queue name="group-set-large">
<!-- A user not matched by any other rule can run 1 query in the large pool -->
<userQueryLimit>
<user>*</user>
<totalCount>1</totalCount>
</userQueryLimit>
<!-- Members of the group 'dev' can run 2 queries in the large pool-->
<groupQueryLimit>
<group>dev</group>
<totalCount>2</totalCount>
</groupQueryLimit>
<!-- Members of the group 'dev' can run 1 query in the large pool-->
<groupQueryLimit>
<group>support</group>
<totalCount>1</totalCount>
</groupQueryLimit>
<!-- The user 'claire' can run 3 queries in the large pool -->
<userQueryLimit>
<user>claire</user>
<totalCount>3</totalCount>
</userQueryLimit>
<!-- The user 'alice' can run 4 queries in the large pool -->
<userQueryLimit>
<user>alice</user>
<totalCount>4</totalCount>
</userQueryLimit>
</queue>
</queue>
</allocations>

View File

@@ -18,6 +18,7 @@
package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
//YARNUTIL: MODIFIED
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -51,11 +52,15 @@ public class AllocationConfiguration {
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
// AdmissionControl quotas at user and group level.
private final Map<String, Map<String, Integer>> userQueryLimits;
private final Map<String, Map<String, Integer>> groupQueryLimits;
// Policy for mapping apps to queues
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
//Configured queues in the alloc xml
// Configured queues in the alloc xml
@VisibleForTesting
Map<FSQueueType, Set<String>> configuredQueues;
@@ -73,6 +78,8 @@ public class AllocationConfiguration {
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<String, Integer>> userQueryLimits,
Map<String, Map<String, Integer>> groupQueryLimits,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> nonPreemptableQueues) {
@@ -80,15 +87,19 @@ public class AllocationConfiguration {
this.maxQueueResources = maxQueueResources;
this.queueMaxResourcesDefault = queueMaxResourcesDefault;
this.queueAcls = queueAcls;
this.userQueryLimits = userQueryLimits;
this.groupQueryLimits = groupQueryLimits;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
}
public AllocationConfiguration(Configuration conf) {
minQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
queueMaxResourcesDefault = Resources.unbounded();
queueAcls = new HashMap<>();
userQueryLimits = new HashMap<>();
groupQueryLimits = new HashMap<>();
configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
@@ -96,7 +107,7 @@ public class AllocationConfiguration {
placementPolicy =
QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
}
/**
* Get the ACLs associated with this queue. If a given ACL is not explicitly
* configured, include the default value for that ACL. The default for the
@@ -170,15 +181,25 @@ public class AllocationConfiguration {
lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
}
return false;
}
public Map<FSQueueType, Set<String>> getConfiguredQueues() {
return configuredQueues;
}
public QueuePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
}
public Map<String, Integer> getUserQueryLimits(String queueName) {
Map<String, Integer> limits = userQueryLimits.get(queueName);
return limits != null ? limits : Collections.emptyMap();
}
public Map<String, Integer> getGroupQueryLimits(String queueName) {
Map<String, Integer> limits = groupQueryLimits.get(queueName);
return limits != null ? limits : Collections.emptyMap();
}
}

View File

@@ -96,9 +96,8 @@ public class AllocationFileLoaderService extends AbstractService {
public AllocationFileLoaderService(Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf);
@@ -142,7 +141,7 @@ public class AllocationFileLoaderService extends AbstractService {
}
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
if (reloadThread != null) {
@@ -150,7 +149,7 @@ public class AllocationFileLoaderService extends AbstractService {
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
running = false;
@@ -164,7 +163,7 @@ public class AllocationFileLoaderService extends AbstractService {
}
super.serviceStop();
}
/**
* Path to XML file containing allocations. If the
* path is relative, it is searched for in the
@@ -189,11 +188,66 @@ public class AllocationFileLoaderService extends AbstractService {
}
return allocFile;
}
public synchronized void setReloadListener(Listener reloadListener) {
this.reloadListener = reloadListener;
}
/**
* Add a user or group query limit to the limits Map.
* @param queueName the name of the queue name.
* @param element the element containing the definition.
* @param parentName for diagnostics, the enclosing tag name
* @param limitsMap where the Map will be inserted
* @param tagName the name of the tag "user" or "group"
* @throws AllocationConfigurationException if parsing fails.
*/
@VisibleForTesting
public static void addQueryLimits(String queueName, Element element, String parentName,
Map<String, Map<String, Integer>> limitsMap, String tagName)
throws AllocationConfigurationException {
Map<String, Integer> limits =
limitsMap.computeIfAbsent(queueName, k -> new HashMap<>());
int number = -1;
List<String> nameList = new ArrayList<>();
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element)) continue;
Element field = (Element) fieldNode;
if (tagName.equals(field.getTagName())) {
String name = ((Text) field.getFirstChild()).getData().trim();
if (nameList.contains(name)) {
throw new AllocationConfigurationException(
"Duplicate value given for name " + name);
}
nameList.add(name);
} else if ("totalCount".equals(field.getTagName())) {
String numberStr = ((Text) field.getFirstChild()).getData().trim();
if (number != -1) {
throw new AllocationConfigurationException(
"Duplicate totalCount tags for " + parentName + "/" + field.getTagName());
}
try {
number = Integer.parseInt(numberStr);
} catch (NumberFormatException e) {
throw new AllocationConfigurationException(
"Could not parse query totalCount for " + parentName + "/"
+ field.getTagName(),
e);
}
}
if (nameList.isEmpty()) {
throw new AllocationConfigurationException("Empty user names for " + parentName);
}
}
if (number == -1) {
throw new AllocationConfigurationException("No totalCount for " + parentName);
}
for (String name : nameList) { limits.put(name, number); }
}
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
@@ -224,6 +278,8 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>();
Map<String, Map<String, Integer>> groupQueryLimits = new HashMap<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
@@ -351,8 +407,8 @@ public class AllocationFileLoaderService extends AbstractService {
loadQueue(parent, element, minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, configuredQueues, nonPreemptableQueues);
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
@@ -381,20 +437,20 @@ public class AllocationFileLoaderService extends AbstractService {
defaultFairSharePreemptionThreshold);
}
AllocationConfiguration info =
new AllocationConfiguration(minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, maxChildQueueResources, queueMaxApps, userMaxApps,
queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, userQueryLimits, groupQueryLimits,
newPlacementPolicy, configuredQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
reloadListener.onReload(info);
}
/**
* Loads a queue from a queue element in the configuration file
*/
@@ -411,10 +467,11 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<String, Map<String, Integer>> userQueryLimits,
Map<String, Map<String, Integer>> groupQueryLimits,
Map<FSQueueType, Set<String>> configuredQueues,
Set<String> nonPreemptableQueues)
throws AllocationConfigurationException {
String queueName = CharMatcher.whitespace().trimFrom(element.getAttribute("name"));
if (queueName.contains(".")) {
@@ -490,6 +547,10 @@ public class AllocationFileLoaderService extends AbstractService {
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("userQueryLimit".equals(field.getTagName())) {
addQueryLimits(queueName, field, "userQueryLimit", userQueryLimits, "user");
} else if ("groupQueryLimit".equals(field.getTagName())) {
addQueryLimits(queueName, field, "groupQueryLimit", groupQueryLimits, "group");
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
@@ -498,13 +559,13 @@ public class AllocationFileLoaderService extends AbstractService {
if (!Boolean.parseBoolean(text)) {
nonPreemptableQueues.add(queueName);
}
} else if ("queue".endsWith(field.getTagName()) ||
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, configuredQueues, nonPreemptableQueues);
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
@@ -529,7 +590,7 @@ public class AllocationFileLoaderService extends AbstractService {
minQueueResources.get(queueName)));
}
}
public interface Listener {
public void onReload(AllocationConfiguration info);
}

View File

@@ -64,6 +64,8 @@ LOG = logging.getLogger('admission_test')
# that running queries can be correlated with the thread that submitted them.
QUERY = " union all ".join(["select * from functional.alltypesagg where id != {0}"] * 30)
SLOW_QUERY = "select count(*) from functional.alltypes where int_col = sleep(20000)"
# Same query but with additional unpartitioned non-coordinator fragments.
# The unpartitioned fragments are both interior fragments that consume input
# from a scan fragment and non-interior fragments with a constant UNION.
@@ -78,7 +80,7 @@ STATESTORE_RPC_FREQUENCY_MS = 100
# Time to sleep (in milliseconds) between issuing queries. When the delay is at least
# the statestore heartbeat frequency, all state should be visible by every impalad by
# the time the next query is submitted. Otherwise the different impalads will see stale
# the time the next query is submitted. Otherwise, the different impalads will see stale
# state for some admission decisions.
SUBMISSION_DELAY_MS = \
[0, STATESTORE_RPC_FREQUENCY_MS // 2, STATESTORE_RPC_FREQUENCY_MS * 3 // 2]
@@ -551,7 +553,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
cluster_size=2)
def test_dedicated_coordinator_mem_accounting(self, vector):
"""Verify that when using dedicated coordinators, the memory admitted for and the
mem limit applied to the query fragments running on the coordinator is different than
mem limit applied to the query fragments running on the coordinator is different from
the ones on executors."""
self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=True)
@@ -612,7 +614,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.client.close_query(handle)
# Make sure query execution works perfectly for a query that does not have any
# fragments schdeuled on the coordinator, but has runtime-filters that need to be
# fragments scheduled on the coordinator, but has runtime-filters that need to be
# aggregated at the coordinator.
exec_options = vector.get_value('exec_option')
exec_options['RUNTIME_FILTER_WAIT_TIME_MS'] = 30000
@@ -709,7 +711,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2)
def test_mem_limit_executors(self, vector, unique_database):
def test_mem_limit_executors(self, vector):
"""Verify that the query option mem_limit_executors is only enforced on the
executors."""
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
@@ -812,7 +814,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "2G"
# Since Queuing is synchronous and we can't close the previous query till this
# Since Queuing is synchronous, and we can't close the previous query till this
# returns, we wait for this to timeout instead.
self.execute_query(query, exec_options)
except ImpalaBeeswaxException as e:
@@ -830,7 +832,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
disable_log_buffering=True)
def test_cancellation(self):
""" Test to confirm that all Async cancellation windows are hit and are able to
succesfully cancel the query"""
successfully cancel the query"""
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
try:
@@ -1196,6 +1198,168 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
exec_options['num_nodes'] = 1
self.run_test_case('QueryTest/admission-max-min-mem-limits', vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml"),
statestored_args=_STATESTORED_ARGS)
def test_user_loads_propagate(self):
"""Test that user loads are propagated between impalads by checking
metric values"""
LOG.info("Exploration Strategy {0}".format(self.exploration_strategy()))
self.check_user_loads(user_loads_present=True, pool='root.queueB')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-3-groups.xml",
llama_site_file="llama-site-3-groups.xml"),
statestored_args=_STATESTORED_ARGS)
def test_user_loads_do_not_propagate(self):
"""Test that user loads are not propagated between impalads if user
quotas are not configured. There are no user quotas configured in
fair-scheduler-3-groups.xml."""
self.check_user_loads(user_loads_present=False, pool="root.tiny")
def check_user_loads(self, user_loads_present, pool):
"""Fetches the metrics for user loads from the webui and checks they are as
expected."""
USER_ROOT = 'root'
USER_C = 'userC'
impalad1 = self.cluster.impalads[0]
impalad2 = self.cluster.impalads[1]
query1 = self.execute_async_and_wait_for_running(impalad1, SLOW_QUERY, USER_C,
pool=pool)
query2 = self.execute_async_and_wait_for_running(impalad2, SLOW_QUERY, USER_ROOT,
pool=pool)
keys = [
"admission-controller.agg-current-users.root.queueB",
"admission-controller.local-current-users.root.queueB",
]
values1 = impalad1.service.get_metric_values(keys)
values2 = impalad2.service.get_metric_values(keys)
if self.get_ac_log_name() == 'impalad':
if user_loads_present:
# The aggregate users are the same on either server.
assert values1[0] == [USER_ROOT, USER_C]
assert values2[0] == [USER_ROOT, USER_C]
# The local users differ.
assert values1[1] == [USER_C]
assert values2[1] == [USER_ROOT]
else:
# No user quotas configured means no metrics.
assert values1[0] is None
assert values2[0] is None
assert values1[1] is None
assert values2[1] is None
else:
# In exhaustive mode, running with AdmissionD.
assert self.get_ac_log_name() == 'admissiond'
admissiond = self.cluster.admissiond
valuesA = admissiond.service.get_metric_values(keys)
if user_loads_present:
# In this case the metrics are the same everywhere
assert values1[0] == [USER_ROOT, USER_C]
assert values2[0] == [USER_ROOT, USER_C]
assert values1[1] == []
assert values2[1] == []
assert valuesA[0] == [USER_ROOT, USER_C]
assert valuesA[1] == [USER_ROOT, USER_C]
else:
# No user quotas configured means no metrics.
assert values1[0] is None
assert values2[0] is None
assert values1[1] is None
assert values2[1] is None
assert valuesA[0] is None
assert valuesA[1] is None
query1.close()
query2.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml",
additional_args="--injected_group_members_debug_only=group1:userB,userC"
),
statestored_args=_STATESTORED_ARGS)
def test_user_loads_rules(self):
"""Test that rules for user loads are followed for new queries.
Note that some detailed checking of rule semantics is done at the unit test level in
admission-controller-test.cc"""
# The per-pool limit for userA is 3 in root.queueE.
self.check_user_load_limits('userA', 'root.queueE', 3, "user")
# In queueE the wildcard limit is 1
self.check_user_load_limits('random_user', 'root.queueE', 1, "wildcard")
# userB is in the injected group1, so the limit is 2.
self.check_user_load_limits('userB', 'root.queueE', 2, "group", group_name="group1")
# userD had a limit at the pool level, run it in queueD which has no wildcard limit.
self.check_user_load_limits('userD', 'root.queueD', 2, "user", pool_to_fail="root")
def check_user_load_limits(self, user, pool, limit, err_type, group_name="",
pool_to_fail=None):
query_handles = []
type = "group" if group_name else "user"
group_description = " in group " + group_name if group_name else ""
pool_that_fails = pool_to_fail if pool_to_fail else pool
for i in range(limit):
impalad = self.cluster.impalads[i % 2]
query_handle = self.execute_async_and_wait_for_running(impalad, SLOW_QUERY, user,
pool=pool)
query_handles.append(query_handle)
# Let state sync across impalads.
sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
# Another query should be rejected
impalad = self.cluster.impalads[limit % 2]
client = impalad.service.create_beeswax_client()
client.set_configuration({'request_pool': pool})
try:
client.execute(SLOW_QUERY, user=user)
assert False, "query should fail"
except Exception as e:
# Construct the expected error message.
expected = ("Rejected query from pool {pool}: current per-{type} load {limit} for "
"user {user}{group_description} is at or above the {err_type} limit "
"{limit} in pool {pool_that_fails}".
format(pool=pool, type=type, limit=limit, user=user,
group_description=group_description, err_type=err_type,
pool_that_fails=pool_that_fails))
assert expected in str(e)
for query_handle in query_handles:
query_handle.close()
class ClientAndHandle:
"""Holder class for a client and query handle"""
def __init__(self, client, handle):
self.client = client
self.handle = handle
def close(self):
"""close the query"""
self.client.close_query(self.handle)
def execute_async_and_wait_for_running(self, impalad, query, user, pool):
# Execute a query asynchronously, and wait for it to be running.
# Use beeswax client as it allows specifying the user that runs the query.
client = impalad.service.create_beeswax_client()
client.set_configuration({'request_pool': pool})
handle = client.execute_async(query, user=user)
timeout_s = 10
# Make sure the query has been admitted and is running.
self.wait_for_state(
handle, client.QUERY_STATES['RUNNING'], timeout_s, client=client)
return self.ClientAndHandle(client, handle)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
@@ -1203,7 +1367,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
llama_site_file="mem-limit-test-llama-site.xml",
additional_args="-default_pool_max_requests 1", make_copy=True),
statestored_args=_STATESTORED_ARGS)
def test_pool_config_change_while_queued(self, vector):
def test_pool_config_change_while_queued(self):
"""Tests that the invalid checks work even if the query is queued. Makes sure that a
queued query is dequeued and rejected if the config is invalid."""
# IMPALA-9856: This test modify request pool max-query-mem-limit. Therefore, we
@@ -1247,7 +1411,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
queued_query_handle = self.client.execute_async(
"select * from functional_parquet.alltypes limit 1")
self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
# Change config to something less than the what is required to accommodate the
# Change config to something less than what is required to accommodate the
# largest min_reservation (which in this case is 32.09 MB.
config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
# Close running query so the queued one gets a chance.
@@ -1343,7 +1507,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
threads = []
# Test mixed trivial and non-trivial queries workload, and should successfully run
# for all.
# Test the case when the number of trivial queries is over the maximum pallelism,
# Test the case when the number of trivial queries is over the maximum parallelism,
# which is three.
for i in range(5):
thread_instance = self.MultiTrivialRunThread(self, "select 1")
@@ -1367,7 +1531,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
statestored_args=_STATESTORED_ARGS)
def test_trivial_query_multi_runs_fallback(self):
threads = []
# Test the case when the number of trivial queries is over the maximum pallelism,
# Test the case when the number of trivial queries is over the maximum parallelism,
# which is three, other trivial queries should fall back to normal process and
# blocked by the long sleep query in our testcase, then leads to a timeout error.
long_query_handle = self.client.execute_async("select sleep(100000)")
@@ -1447,7 +1611,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
"""Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
the query's queuing status, that is, whether the query was queued and what was the
latest queuing reason."""
# Start a long running query.
# Start a long-running query.
long_query_resp = self.execute_statement("select sleep(10000)")
# Ensure that the query has started executing.
self.wait_for_admission_control(long_query_resp.operationHandle)
@@ -1904,7 +2068,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
return metrics
LOG.info("Got inconsistent metrics {0}".format(metrics))
assert False, "Could not get consistent metrics for {0} queries after {1} attempts: "\
"{2}".format(num_submitted, ATTEMPTS, metrics)
"{2}".format(num_submitted, ATTEMPTS,
metrics)
def wait_for_metric_changes(self, metric_names, initial, expected_delta):
"""
@@ -2055,7 +2220,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
exec_options = self.vector.get_value('exec_option')
exec_options.update(self.additional_query_options)
# Turning off result spooling allows us to better control query execution by
# controlling the number or rows fetched. This allows us to maintain resource
# controlling the number of rows fetched. This allows us to maintain resource
# usage among backends.
exec_options['spool_query_results'] = 0
query = QUERY.format(self.query_num)
@@ -2145,8 +2310,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# Sleep and wait for the query to be cancelled. The cancellation will
# set the state to EXCEPTION.
start_time = time()
while (client.get_state(self.query_handle)
!= client.QUERY_STATES['EXCEPTION']):
while (client.get_state(self.query_handle) != client.QUERY_STATES['EXCEPTION']):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
sleep(1)
@@ -2205,7 +2369,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
LOG.info("Found %s queued queries after %s seconds", actual_queued,
round(time() - start_time, 1))
def run_admission_test(self, vector, additional_query_options):
def run_admission_test(self, vector, additional_query_options,
check_user_aggregates=False):
LOG.info("Starting test case with parameters: %s", vector)
self.impalads = self.cluster.impalads
self.ac_processes = self.get_ac_processes()
@@ -2229,7 +2394,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
self.all_threads.append(thread)
sleep(submission_delay_ms / 1000.0)
# Wait for the admission control to make the initial admission decision for all of
# Wait for the admission control to make the initial admission decision for all
# the queries. They should either be admitted immediately, queued, or rejected.
# The test query is chosen that it with remain active on all backends until the test
# ends the query. This prevents queued queries from being dequeued in the background
@@ -2289,7 +2454,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
assert metric_deltas['timed-out'] == 0
self.wait_for_admitted_threads(metric_deltas['admitted'])
# Wait a few topic updates to ensure the admission controllers have reached a steady
# state or we may find an impalad dequeue more requests after we capture metrics.
# state, or we may find an impalad dequeue more requests after we capture metrics.
self.wait_for_statestore_updates(10)
final_metrics = self.get_consistent_admission_metrics(num_queries)
@@ -2306,7 +2471,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
assert metric_deltas['queued'] == initial_metric_deltas['queued']
assert metric_deltas['rejected'] == initial_metric_deltas['rejected']
else:
# We shouldn't go over the max number of queries or queue size so we can compute
# We shouldn't go over the max number of queries or queue size, so we can compute
# the expected number of queries that should have been admitted (which includes the
# number queued as they eventually get admitted as well), queued, and rejected
expected_admitted = MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
@@ -2318,6 +2483,18 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
self.wait_on_queries_page_num_queued(0, 0)
self._check_queries_page_resource_pools()
if check_user_aggregates:
# Check that metrics tracking running users are empty as queries have finished.
# These metrics are only present if user quotas are configured.
keys = [
"admission-controller.agg-current-users.root.queueF",
"admission-controller.local-current-users.root.queueF",
]
for impalad in self.ac_processes:
values = impalad.service.get_metric_values(keys)
assert values[0] == []
assert values[1] == []
for thread in self.all_threads:
if thread.error is not None:
raise thread.error
@@ -2349,6 +2526,21 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
self.pool_name = 'root.queueB'
self.run_admission_test(vector, {'request_pool': self.pool_name})
@pytest.mark.execute_serially
@SkipIfOS.redhat6
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
fs_allocation_file="fair-scheduler-test2.xml",
llama_site_file="llama-site-test2.xml"),
statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_quota_configs(self, vector):
"""Run a workload with a variety of outcomes in a pool that has user quotas
configured. Note the user quotas will not prevent any queries from running, but this
allows verification that metrics about users are consistent after queries end"""
self.pool_name = 'root.queueF'
self.run_admission_test(vector, {'request_pool': self.pool_name},
check_user_aggregates=True)
def get_proc_limit(self):
"""Gets the process mem limit as reported by the impalad's mem-tracker metric.
Raises an assertion if not all impalads have the same value."""

View File

@@ -193,7 +193,7 @@ function renderGraph() {
}
}
// Picks up all the elemets classified as memory and replaces it with pretty printed
// Picks up all the elements classified as memory and replaces it with pretty printed
// value.
function formatMemoryColumns() {
var cols = document.getElementsByClassName("memory");
@@ -412,6 +412,14 @@ Time since last statestore update containing admission control topic state (ms):
<td>Time in queue (exponential moving average)</td>
<td colspan='2'>{{wait_time_ms_ema}} ms</td>
</tr>
<tr>
<td>Users with queries queued or running on this coordinator (visible if pool has user quotas configured)</td>
<td colspan='2'>{{local_current_users}}</td>
</tr>
<tr>
<td>Users with queries queued or running, aggregated across coordinators (visible if pool has user quotas configured)</td>
<td colspan='2'>{{agg_current_users}}</td>
</tr>
<tr>
<td colspan='3'>
<canvas id="{{pool_name}}" style="border:1px solid"></canvas>