Remove RM pool configuration and yarn_pool query option/profile property

Admission control adds support for configuring pools via a fair scheduler
allocation configuration, so the pool configuration mechanism is no longer
needed. This also renames the "yarn_pool" query option to the more general
"request_pool" as it can also be used to configure the admission controller
when RM/Yarn is not used. Similarly, the query profile shows the pool as
"Request Pool" rather than "Yarn Pool".

Change-Id: Id2cefb77ccec000e8df954532399d27eb18a2309
Reviewed-on: http://gerrit.ent.cloudera.com:8080/1668
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: jenkins
(cherry picked from commit 8d59416fb519ec357f23b5267949fd9682c9d62f)
Reviewed-on: http://gerrit.ent.cloudera.com:8080/1759
This commit is contained in:
Matthew Jacobs
2014-02-26 17:44:30 -08:00
committed by jenkins
parent 41d90312fa
commit 989830186f
12 changed files with 22 additions and 228 deletions

View File

@@ -922,8 +922,8 @@ Status ImpalaServer::SetQueryOptions(const string& key, const string& value,
case TImpalaQueryOptions::SYNC_DDL:
query_options->__set_sync_ddl(iequals(value, "true") || iequals(value, "1"));
break;
case TImpalaQueryOptions::YARN_POOL:
query_options->__set_yarn_pool(value);
case TImpalaQueryOptions::REQUEST_POOL:
query_options->__set_request_pool(value);
break;
case TImpalaQueryOptions::V_CPU_CORES:
query_options->__set_v_cpu_cores(atoi(value.c_str()));
@@ -1196,8 +1196,8 @@ void ImpalaServer::TQueryOptionsToMap(const TQueryOptions& query_option,
case TImpalaQueryOptions::SYNC_DDL:
val << query_option.sync_ddl;
break;
case TImpalaQueryOptions::YARN_POOL:
val << query_option.yarn_pool;
case TImpalaQueryOptions::REQUEST_POOL:
val << query_option.request_pool;
break;
case TImpalaQueryOptions::V_CPU_CORES:
val << query_option.v_cpu_cores;

View File

@@ -280,8 +280,6 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
summary_profile_.AddInfoString("Request Pool", schedule_->request_pool());
if (FLAGS_enable_rm) {
// TODO: Remove 'Yarn Pool' once we have admission control user->pool resolution
summary_profile_.AddInfoString("Yarn Pool", schedule_->yarn_pool());
if (status.ok()) {
DCHECK(schedule_->reservation_request() != NULL);
stringstream reservation_request_ss;

View File

@@ -131,7 +131,6 @@ int16_t QuerySchedule::GetPerHostVCores() const {
void QuerySchedule::CreateReservationRequest(const string& pool, const string& user,
const vector<string>& llama_nodes) {
yarn_pool_ = pool;
DCHECK(reservation_request_.get() == NULL);
reservation_request_.reset(new TResourceBrokerReservationRequest());
reservation_request_->resources.clear();

View File

@@ -87,7 +87,6 @@ class QuerySchedule {
const TUniqueId& query_id() const { return query_id_; }
const TQueryExecRequest& request() const { return request_; }
const TQueryOptions& query_options() const { return query_options_; }
const std::string& yarn_pool() const { return yarn_pool_; }
const std::string& request_pool() const { return request_pool_; }
void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; }
bool HasReservation() const { return !reservation_.allocated_resources.empty(); }
@@ -170,12 +169,6 @@ class QuerySchedule {
// Total number of scan ranges of this query.
int64_t num_scan_ranges_;
// Yarn pool from which resources were requested for this query schedule.
// Set in CreateReservationRequest().
// TODO: Remove once we can use llama as a library to resolve pools locally; just
// use request_pool_.
std::string yarn_pool_;
// Request pool to which the request was submitted for admission.
std::string request_pool_;

View File

@@ -147,51 +147,6 @@ TEST_F(SimpleSchedulerTest, NonLocalHost) {
EXPECT_EQ(backends.at(4).address.port, 1000);
}
TEST_F(SimpleSchedulerTest, InitPoolWhiteList) {
// Check a non-existant configuration
FLAGS_pool_conf_file = "/I/do/not/exist";
vector<TNetworkAddress> backends;
backends.push_back(TNetworkAddress());
scoped_ptr<SimpleScheduler> sched(new SimpleScheduler(backends, NULL, NULL, NULL));
EXPECT_FALSE(sched->Init().ok());
// Check a valid configuration (although one with some malformed lines)
string impala_home(getenv("IMPALA_HOME"));
stringstream conf_file;
conf_file << impala_home << "/be/src/statestore/test-pool-conf";
FLAGS_pool_conf_file = conf_file.str();
sched.reset(new SimpleScheduler(backends, NULL, NULL, NULL));
EXPECT_TRUE(sched->Init().ok());
const SimpleScheduler::UserPoolMap& pool_map = sched->user_pool_map();
EXPECT_EQ(3, pool_map.size());
EXPECT_EQ(2, pool_map.find("admin")->second.size());
EXPECT_TRUE(pool_map.end() == pool_map.find("root"));
EXPECT_EQ(2, pool_map.find("*")->second.size());
EXPECT_EQ("Staging", pool_map.find("*")->second[0]);
// Check the pool determination logic.
string pool;
EXPECT_TRUE(sched->GetYarnPool("admin", TQueryOptions(), &pool).ok());
// Staging is the default pool
EXPECT_EQ("Staging", pool);
EXPECT_TRUE(sched->GetYarnPool("i-want-default", TQueryOptions(), &pool).ok());
EXPECT_EQ("Staging", pool);
TQueryOptions options;
// Only admin can use prod
options.__set_yarn_pool("prod");
EXPECT_FALSE(sched->GetYarnPool("user", options, &pool).ok());
EXPECT_TRUE(sched->GetYarnPool("admin", options, &pool).ok());
// Everyone can use the default pools
options.__set_yarn_pool("Staging");
EXPECT_TRUE(sched->GetYarnPool("user", options, &pool).ok());
EXPECT_TRUE(sched->GetYarnPool("admin", options, &pool).ok());
options.__set_yarn_pool("Default");
EXPECT_TRUE(sched->GetYarnPool("user", options, &pool).ok());
EXPECT_TRUE(sched->GetYarnPool("admin", options, &pool).ok());
}
}
int main(int argc, char **argv) {

View File

@@ -47,8 +47,6 @@ using namespace strings;
DECLARE_int32(be_port);
DECLARE_string(hostname);
DECLARE_bool(enable_rm);
DEFINE_string(pool_conf_file, "", "The full path to the YARN user-to-pool "
"configuration file");
namespace impala {
@@ -194,10 +192,6 @@ Status SimpleScheduler::Init() {
backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
}
}
if (!FLAGS_pool_conf_file.empty()) {
RETURN_IF_ERROR(InitPoolWhitelist(FLAGS_pool_conf_file));
}
return Status::OK;
}
@@ -707,7 +701,7 @@ int SimpleScheduler::FindLeftmostInputFragment(
Status SimpleScheduler::GetRequestPool(const string& user,
const TQueryOptions& query_options, string* pool) const {
TResolveRequestPoolResult resolve_pool_result;
const string& configured_pool = query_options.yarn_pool;
const string& configured_pool = query_options.request_pool;
RETURN_IF_ERROR(request_pool_utils_->ResolveRequestPool(configured_pool, user,
&resolve_pool_result));
if (resolve_pool_result.resolved_pool.empty()) {
@@ -722,129 +716,22 @@ Status SimpleScheduler::GetRequestPool(const string& user,
return Status::OK;
}
Status SimpleScheduler::GetYarnPool(const string& user,
const TQueryOptions& query_options, string* pool) const {
if (query_options.__isset.yarn_pool) {
*pool = query_options.yarn_pool;
if (default_pools_.find(*pool) != default_pools_.end()) return Status::OK;
UserPoolMap::const_iterator pool_it = user_pool_whitelist_.find(user);
if (pool_it == user_pool_whitelist_.end()) {
stringstream ss;
ss << "No whitelist found for user: " << user << " to access pool: " << *pool;
return Status(ss.str());
}
BOOST_FOREACH(const string& whitelisted_pool, pool_it->second) {
if (whitelisted_pool == *pool) return Status::OK;
}
stringstream ss;
ss << "User: " << user << " not authorized to access pool: " << *pool;
return Status(ss.str());
}
if (user_pool_whitelist_.empty()) {
return Status("Either a default pool must be configured, or the pool must be "
"explicitly specified by a query option");
}
// Per YARN, a default pool is used if a pool is not explicitly configured.
UserPoolMap::const_iterator pool_it = user_pool_whitelist_.find(DEFAULT_USER);
DCHECK(pool_it != user_pool_whitelist_.end());
DCHECK(!pool_it->second.empty());
*pool = pool_it->second[0];
return Status::OK;
}
Status SimpleScheduler::InitPoolWhitelist(const string& conf_path) {
ifstream whitelist(conf_path.c_str(), ios::in);
if (!whitelist.is_open()) {
stringstream err_msg;
err_msg << "Could not open pool configuration file: " << conf_path;
return Status(err_msg.str());
}
// Each line is user: pool1, pool2
string line;
bool in_user_section = false;
while (getline(whitelist, line)) {
trim(line);
if (line.empty()) continue;
if (line == "[users]") {
in_user_section = true;
} else if (line.size() > 2 && line[0] == '[' && line[line.size() - 1] == ']') {
// Headers are '[header name]'
in_user_section = false;
}
if (!in_user_section) continue;
size_t colon_pos = line.find_first_of(":");
if (colon_pos == string::npos) {
LOG(WARNING) << "Could not read line: " << line << " in pool configuration "
<< conf_path << ", ignoring.";
continue;
}
string user = line.substr(0, colon_pos);
trim(user);
if (user.empty()) {
LOG(WARNING) << "Empty user in line: "<< line << " in pool configuration "
<< conf_path << ", ignoring.";
continue;
}
colon_pos = min(colon_pos, line.size());
string pools = line.substr(colon_pos + 1);
trim(pools);
if (pools.empty()) {
LOG(WARNING) << "Empty pool configuration for user: " << user
<< " in pool configuration " << conf_path << ", ignoring.";
continue;
}
vector<string> splits;
split(splits, pools, is_any_of(","));
if (splits.empty()) {
LOG(WARNING) << "Empty pool configuration for user: " << user
<< " in pool configuration " << conf_path << ", ignoring.";
continue;
}
BOOST_FOREACH(string& split, splits) {
trim(split);
}
user_pool_whitelist_[user] = splits;
}
UserPoolMap::const_iterator pool_it = user_pool_whitelist_.find(DEFAULT_USER);
if (pool_it == user_pool_whitelist_.end() || pool_it->second.size() == 0) {
stringstream err_msg;
err_msg << "No default pool mapping found. Please set a value for user '*' in "
<< conf_path;
return Status(err_msg.str());
}
default_pools_.insert(pool_it->second.begin(), pool_it->second.end());
return Status::OK;
}
Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
// TODO: Should this take impersonation into account?
const string& user = schedule->request().query_ctxt.session.connected_user;
string request_pool;
RETURN_IF_ERROR(GetRequestPool(user, schedule->query_options(), &request_pool));
schedule->set_request_pool(request_pool);
schedule->set_num_hosts(num_backends_metric_->value());
string pool;
RETURN_IF_ERROR(GetRequestPool(user, schedule->query_options(), &pool));
schedule->set_request_pool(pool);
// Statestore topic may not have been updated yet if this is soon after startup, but
// there is always at least this backend.
schedule->set_num_hosts(max(num_backends_metric_->value(), 1L));
RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
ComputeFragmentHosts(schedule->request(), schedule);
ComputeFragmentExecParams(schedule->request(), schedule);
if (!FLAGS_enable_rm) return Status::OK;
// TODO: Combine related RM and admission control paths for looking up the pool once
// we've decided on a behavior for admission control when using RM as well.
string yarn_pool;
RETURN_IF_ERROR(GetYarnPool(user, schedule->query_options(), &yarn_pool));
schedule->CreateReservationRequest(yarn_pool, user, resource_broker_->llama_nodes());
schedule->CreateReservationRequest(pool, user, resource_broker_->llama_nodes());
const TResourceBrokerReservationRequest* reservation_request =
schedule->reservation_request();
if (!reservation_request->resources.empty()) {

View File

@@ -91,21 +91,6 @@ class SimpleScheduler : public Scheduler {
virtual void HandlePreemptedResource(const TUniqueId& client_resource_id);
virtual void HandleLostResource(const TUniqueId& client_resource_id);
// Map form a user ID to a list of pools they are allowed to submit work to
typedef boost::unordered_map<std::string, std::vector<std::string> > UserPoolMap;
// Used for testing, to confirm correct parsing of the configuration file
const UserPoolMap& user_pool_map() const { return user_pool_whitelist_; }
// Determines the pool for a user, given a set of query options and any configuration
// loaded from a file. Returns the first pool from all pools configured for a user. Does
// not confirm that a user has access to a pool, if query_options.yarn_pool is set.
// Public only for testing.
// TODO: Combine with GetRequestPool RM and admission control paths for looking up the
// pool once we've decided on a behavior for admission control when using RM as well.
Status GetYarnPool(const std::string& user,
const TQueryOptions& query_options, std::string* pool) const;
private:
// Protects access to backend_map_ and backend_ip_map_, which might otherwise be updated
// asynchronously with respect to reads. Also protects the locality
@@ -183,12 +168,6 @@ class SimpleScheduler : public Scheduler {
// Set to NULL if resource management is disabled.
ResourceBroker* resource_broker_;
// Map from a user ID to a list of pools they are allowed to submit work to.
UserPoolMap user_pool_whitelist_;
// Default pools read from the whitelist, accessible to all users.
std::set<std::string> default_pools_;
// Used for user-to-pool resolution and looking up pool configurations.
boost::scoped_ptr<RequestPoolUtils> request_pool_utils_;
@@ -212,15 +191,7 @@ class SimpleScheduler : public Scheduler {
// Webserver callback that prints a list of known backends
void BackendsPathHandler(const Webserver::ArgumentMap& args, std::stringstream* output);
// Loads the list of permissible pools from the provided configuration file, failing if
// there is an error or the file can't be found.
Status InitPoolWhitelist(const std::string& conf_path);
// Determines the actual pool to use for a user and the request_pool query option and
// checks if the user has access to submit to that pool. If no request_pool query
// option is set, the policy uses the default pool. Uses request_pool_utils_ via the
// fair-scheduler allocation configuration. The policy may change the requested pool.
// If no pool can be used or the user does not have access, an error is returned.
// Determines the pool for a user and query options via request_pool_utils_.
Status GetRequestPool(const std::string& user,
const TQueryOptions& query_options, std::string* pool) const;

View File

@@ -1,9 +0,0 @@
[unused section]
[users]
admin: prod, dev.
root:
user: dev.
*: Staging, Default
[groups]

View File

@@ -66,9 +66,9 @@ struct TQueryOptions {
18: optional Types.TExplainLevel explain_level
19: optional bool sync_ddl = 0
// Yarn pool this request should be submitted to. If not set
// the pool is determined based on the user (only relevant with RM).
20: optional string yarn_pool
// Request pool this request should be submitted to. If not set
// the pool is determined based on the user.
20: optional string request_pool
// Per-host virtual CPU cores required for query (only relevant with RM).
21: optional i16 v_cpu_cores

View File

@@ -122,9 +122,9 @@ enum TImpalaQueryOptions {
// active impalad in the cluster before completing.
SYNC_DDL,
// Yarn pool this request should be submitted to. If not set
// the pool is determined based on the user (only relevant with RM).
YARN_POOL,
// Request pool this request should be submitted to. If not set
// the pool is determined based on the user.
REQUEST_POOL,
// Per-host virtual CPU cores required for query (only relevant with RM).
V_CPU_CORES,

View File

@@ -201,8 +201,8 @@ public class ImpaladClientExecutor {
case PARQUET_FILE_SIZE:
optionValue = String.valueOf(queryOptions.getParquet_file_size());
break;
case YARN_POOL:
optionValue = String.valueOf(queryOptions.yarn_pool);
case REQUEST_POOL:
optionValue = String.valueOf(queryOptions.request_pool);
break;
case V_CPU_CORES:
optionValue = String.valueOf(queryOptions.v_cpu_cores);

View File

@@ -309,7 +309,7 @@ class TestAdmissionController(CustomClusterTestSuite):
exec_options = self.vector.get_value('exec_option')
exec_options['debug_action'] = '0:GETNEXT:WAIT'
exec_options['yarn_pool'] = self.pool_name
exec_options['request_pool'] = self.pool_name
query = QUERY % (self.query_num,)
self.query_state = 'SUBMITTING'
client = self.impalad.service.create_beeswax_client()