mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-2538,IMPALA-1168: Add per-pool settings, metric rename
Adds the ability to set: 1) Per-pool MEM_LIMIT query options (IMPALA-2538). 2) Per-pool queue timeouts (milliseconds) (IMPALA-1168). Both are set via the llama-site.xml (future work to define a better admission control specific configuration format, IMPALA-2573). Also renames a number of admission control pool metrics in preparation for a larger change to the admission control logic. This will allow us to get the new metric names available so that CM can start collecting them before the other changes which will take longer to get in. This mostly just changes the string metric key names with a few small exceptions: 1) The 'cluster-mem-usage' metric was removed because we will not be sending per-backend mem_usage in statestore updates anymore, so the aggregate metric no longer makes sense. 2) The 'cluster-mem-reserved' metric is registered even though it is not yet updated. Having it exposed unblocks is necessary for CM. The follow-up change adds the ability to collect mem_reserved from the pool MemTracker and to update this metric. Change-Id: Ie36b8a06b1b11c8ecad63c3ac4506d369b9835fa Reviewed-on: http://gerrit.cloudera.org:8080/1806 Reviewed-by: Matthew Jacobs <mj@cloudera.com> Tested-by: Internal Jenkins
This commit is contained in:
committed by
Internal Jenkins
parent
ba1ad352a6
commit
7c81e7e057
@@ -95,6 +95,7 @@ class ExecEnv {
|
||||
}
|
||||
ImpalaServer* impala_server() { return impala_server_; }
|
||||
Frontend* frontend() { return frontend_.get(); };
|
||||
RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
|
||||
|
||||
void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "gen-cpp/PlanNodes_types.h"
|
||||
#include "gen-cpp/Types_types.h" // for TUniqueId
|
||||
#include "gen-cpp/ImpalaInternalService_types.h" // for TQueryOptions
|
||||
#include "util/auth-util.h"
|
||||
#include "util/runtime-profile.h"
|
||||
|
||||
namespace impala {
|
||||
@@ -109,11 +110,7 @@ class RuntimeState {
|
||||
}
|
||||
const TExecPlanFragmentParams& fragment_params() const { return fragment_params_; }
|
||||
const std::string& effective_user() const {
|
||||
if (query_ctx().session.__isset.delegated_user &&
|
||||
!query_ctx().session.delegated_user.empty()) {
|
||||
return do_as_user();
|
||||
}
|
||||
return connected_user();
|
||||
return GetEffectiveUser(query_ctx().session);
|
||||
}
|
||||
const std::string& do_as_user() const { return query_ctx().session.delegated_user; }
|
||||
const std::string& connected_user() const {
|
||||
|
||||
@@ -48,35 +48,35 @@ const char TOPIC_KEY_DELIMITER = '!';
|
||||
// Define metric key format strings for metrics in PoolMetrics
|
||||
// '$0' is replaced with the pool name by strings::Substitute
|
||||
const string LOCAL_ADMITTED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-admitted";
|
||||
"admission-controller.total-admitted.$0";
|
||||
const string LOCAL_QUEUED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-queued";
|
||||
"admission-controller.total-queued.$0";
|
||||
const string LOCAL_DEQUEUED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-dequeued";
|
||||
"admission-controller.total-dequeued.$0";
|
||||
const string LOCAL_REJECTED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-rejected";
|
||||
"admission-controller.total-rejected.$0";
|
||||
const string LOCAL_TIMED_OUT_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-timed-out";
|
||||
"admission-controller.total-timed-out.$0";
|
||||
const string LOCAL_COMPLETED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-completed";
|
||||
"admission-controller.total-released.$0";
|
||||
const string LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-time-in-queue-ms";
|
||||
"admission-controller.time-in-queue-ms.$0";
|
||||
const string CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.cluster-num-running";
|
||||
"admission-controller.agg-num-running.$0";
|
||||
const string CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.cluster-in-queue";
|
||||
const string CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.cluster-mem-usage";
|
||||
"admission-controller.agg-num-queued.$0";
|
||||
const string CLUSTER_MEM_RESERVED_METRIC_KEY_FORMAT =
|
||||
"admission-controller.agg-mem-reserved.$0";
|
||||
const string CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.cluster-mem-estimate";
|
||||
"admission-controller.agg-mem-admitted.$0";
|
||||
const string LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-num-running";
|
||||
"admission-controller.local-num-admitted-running.$0";
|
||||
const string LOCAL_IN_QUEUE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-in-queue";
|
||||
"admission-controller.local-num-queued.$0";
|
||||
const string LOCAL_MEM_USAGE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-mem-usage";
|
||||
"admission-controller.local-backend-mem-usage.$0";
|
||||
const string LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT =
|
||||
"admission-controller.$0.local-mem-estimate";
|
||||
"admission-controller.local-backend-mem-reserved.$0";
|
||||
|
||||
// Profile query events
|
||||
const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
|
||||
@@ -264,11 +264,11 @@ Status AdmissionController::RejectRequest(const string& pool_name,
|
||||
|
||||
Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
|
||||
const string& pool_name = schedule->request_pool();
|
||||
TPoolConfigResult pool_config;
|
||||
TPoolConfig pool_config;
|
||||
RETURN_IF_ERROR(request_pool_service_->GetPoolConfig(pool_name, &pool_config));
|
||||
const int64_t max_requests = pool_config.max_requests;
|
||||
const int64_t max_queued = pool_config.max_queued;
|
||||
const int64_t mem_limit = pool_config.mem_limit;
|
||||
const int64_t mem_limit = pool_config.max_mem_resources;
|
||||
|
||||
// Note the queue_node will not exist in the queue when this method returns.
|
||||
QueueNode queue_node(*schedule);
|
||||
@@ -338,8 +338,13 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
|
||||
if (pool_metrics != NULL) pool_metrics->local_queued->Increment(1L);
|
||||
}
|
||||
|
||||
int64_t queue_wait_timeout_ms = FLAGS_queue_wait_timeout_ms;
|
||||
if (pool_config.__isset.queue_timeout_ms) {
|
||||
queue_wait_timeout_ms = pool_config.queue_timeout_ms;
|
||||
}
|
||||
queue_wait_timeout_ms = max<int64_t>(0, queue_wait_timeout_ms);
|
||||
int64_t wait_start_ms = MonotonicMillis();
|
||||
int64_t queue_wait_timeout_ms = max<int64_t>(0, FLAGS_queue_wait_timeout_ms);
|
||||
|
||||
// We just call Get() to block until the result is set or it times out. Note that we
|
||||
// don't hold the admission_ctrl_lock_ while we wait on this promise so we need to
|
||||
// check the state after acquiring the lock in order to avoid any races because it is
|
||||
@@ -539,7 +544,6 @@ void AdmissionController::UpdateClusterAggregates(const string& pool_name) {
|
||||
if (pool_metrics != NULL) {
|
||||
pool_metrics->cluster_num_running->set_value(total_stats.num_running);
|
||||
pool_metrics->cluster_in_queue->set_value(total_stats.num_queued);
|
||||
pool_metrics->cluster_mem_usage->set_value(total_stats.mem_usage);
|
||||
pool_metrics->cluster_mem_estimate->set_value(total_stats.mem_estimate);
|
||||
}
|
||||
|
||||
@@ -604,10 +608,10 @@ void AdmissionController::DequeueLoop() {
|
||||
|
||||
PoolConfigMap::iterator it = pool_config_cache_.find(pool_name);
|
||||
if (it == pool_config_cache_.end()) continue; // No local requests in this pool
|
||||
const TPoolConfigResult& pool_config = it->second;
|
||||
const TPoolConfig& pool_config = it->second;
|
||||
|
||||
const int64_t max_requests = pool_config.max_requests;
|
||||
const int64_t mem_limit = pool_config.mem_limit;
|
||||
const int64_t mem_limit = pool_config.max_mem_resources;
|
||||
|
||||
// We should never have queued any requests in pools where either limit is 0 as no
|
||||
// requests should ever be admitted or when both limits are less than 0, i.e.
|
||||
@@ -710,8 +714,9 @@ AdmissionController::GetPoolMetrics(const string& pool_name) {
|
||||
|
||||
pool_metrics->cluster_in_queue = metrics_->AddGauge<int64_t>(
|
||||
CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, 0, pool_name);
|
||||
pool_metrics->cluster_mem_usage = metrics_->AddGauge<int64_t>(
|
||||
CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, 0, pool_name);
|
||||
// TODO: Store and update mem_reserved metric. Added without usage so it is visible in
|
||||
// the metrics output.
|
||||
metrics_->AddGauge<int64_t>(CLUSTER_MEM_RESERVED_METRIC_KEY_FORMAT, 0, pool_name);
|
||||
pool_metrics->cluster_mem_estimate = metrics_->AddGauge<int64_t>(
|
||||
CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, 0, pool_name);
|
||||
|
||||
|
||||
@@ -235,7 +235,7 @@ class AdmissionController {
|
||||
/// Map of pool names to the most recent pool configs returned by request_pool_service_.
|
||||
/// Stored so that the dequeue thread does not need to access the configs via the
|
||||
/// request pool service again (which involves a JNI call and error checking).
|
||||
typedef boost::unordered_map<std::string, TPoolConfigResult> PoolConfigMap;
|
||||
typedef boost::unordered_map<std::string, TPoolConfig> PoolConfigMap;
|
||||
PoolConfigMap pool_config_cache_;
|
||||
|
||||
/// Notifies the dequeuing thread that pool stats have changed and it may be
|
||||
|
||||
@@ -58,12 +58,10 @@ const int64_t DEFAULT_REQUEST_TIMEOUT_MS = 5 * 60 * 1000;
|
||||
|
||||
QuerySchedule::QuerySchedule(const TUniqueId& query_id,
|
||||
const TQueryExecRequest& request, const TQueryOptions& query_options,
|
||||
const string& effective_user, RuntimeProfile* summary_profile,
|
||||
RuntimeProfile::EventSequence* query_events)
|
||||
RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
|
||||
: query_id_(query_id),
|
||||
request_(request),
|
||||
query_options_(query_options),
|
||||
effective_user_(effective_user),
|
||||
summary_profile_(summary_profile),
|
||||
query_events_(query_events),
|
||||
num_fragment_instances_(0),
|
||||
|
||||
@@ -67,8 +67,8 @@ struct FragmentExecParams {
|
||||
class QuerySchedule {
|
||||
public:
|
||||
QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
|
||||
const TQueryOptions& query_options, const std::string& effective_user,
|
||||
RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events);
|
||||
const TQueryOptions& query_options, RuntimeProfile* summary_profile,
|
||||
RuntimeProfile::EventSequence* query_events);
|
||||
|
||||
/// Returns OK if reservation_ contains a matching resource for each
|
||||
/// of the hosts in fragment_exec_params_. Returns an error otherwise.
|
||||
@@ -77,7 +77,6 @@ class QuerySchedule {
|
||||
const TUniqueId& query_id() const { return query_id_; }
|
||||
const TQueryExecRequest& request() const { return request_; }
|
||||
const TQueryOptions& query_options() const { return query_options_; }
|
||||
const std::string& effective_user() const { return effective_user_; }
|
||||
const std::string& request_pool() const { return request_pool_; }
|
||||
void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; }
|
||||
bool HasReservation() const { return !reservation_.allocated_resources.empty(); }
|
||||
@@ -134,8 +133,9 @@ class QuerySchedule {
|
||||
/// are all owned by the enclosing QueryExecState.
|
||||
const TUniqueId& query_id_;
|
||||
const TQueryExecRequest& request_;
|
||||
|
||||
/// The query options from the TClientRequest
|
||||
const TQueryOptions& query_options_;
|
||||
const std::string effective_user_;
|
||||
RuntimeProfile* summary_profile_;
|
||||
RuntimeProfile::EventSequence* query_events_;
|
||||
|
||||
|
||||
@@ -14,11 +14,16 @@
|
||||
|
||||
#include "scheduling/request-pool-service.h"
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
#include <list>
|
||||
#include <string>
|
||||
#include <gutil/strings/substitute.h>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "rpc/jni-thrift-util.h"
|
||||
#include "service/query-options.h"
|
||||
#include "util/auth-util.h"
|
||||
#include "util/mem-info.h"
|
||||
#include "util/parse-util.h"
|
||||
#include "util/time.h"
|
||||
@@ -27,6 +32,12 @@
|
||||
|
||||
using namespace impala;
|
||||
|
||||
DEFINE_bool(require_username, false, "Requires that a user be provided in order to "
|
||||
"schedule requests. If enabled and a user is not provided, requests will be "
|
||||
"rejected, otherwise requests without a username will be submitted with the "
|
||||
"username 'default'.");
|
||||
static const string DEFAULT_USER = "default";
|
||||
|
||||
DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler "
|
||||
"allocation file (fair-scheduler.xml).");
|
||||
DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
|
||||
@@ -36,6 +47,7 @@ DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
|
||||
// configuration files are not provided. The default values for this 'default pool'
|
||||
// are the same as the default values for pools defined via the fair scheduler
|
||||
// allocation file and Llama configurations.
|
||||
// TODO: Remove?
|
||||
DEFINE_int64(default_pool_max_requests, 200, "Maximum number of concurrent outstanding "
|
||||
"requests allowed to run before queueing incoming requests. A negative value "
|
||||
"indicates no limit. 0 indicates no requests will be admitted. Ignored if "
|
||||
@@ -53,6 +65,7 @@ DEFINE_int64(default_pool_max_queued, 200, "Maximum number of requests allowed t
|
||||
"llama_site_path are set.");
|
||||
|
||||
// Flags to disable the pool limits for all pools.
|
||||
// TODO: Remove?
|
||||
DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.");
|
||||
DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the "
|
||||
"maximum number of running requests.");
|
||||
@@ -60,15 +73,22 @@ DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on t
|
||||
DECLARE_bool(enable_rm);
|
||||
|
||||
// Pool name used when the configuration files are not specified.
|
||||
const string DEFAULT_POOL_NAME = "default-pool";
|
||||
static const string DEFAULT_POOL_NAME = "default-pool";
|
||||
|
||||
const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-duration-ms";
|
||||
static const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-duration-ms";
|
||||
|
||||
static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND =
|
||||
"No mapping found for request from user '$0' with requested pool '$1'";
|
||||
static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request from user '$0' with "
|
||||
"requested pool '$1' denied access to assigned pool '$2'";
|
||||
static const string ERROR_USER_NOT_SPECIFIED = "User must be specified because "
|
||||
"-require_username=true.";
|
||||
|
||||
RequestPoolService::RequestPoolService(MetricGroup* metrics) :
|
||||
metrics_(metrics), resolve_pool_ms_metric_(NULL) {
|
||||
DCHECK(metrics_ != NULL);
|
||||
resolve_pool_ms_metric_(NULL) {
|
||||
DCHECK(metrics != NULL);
|
||||
resolve_pool_ms_metric_ =
|
||||
StatsMetric<double>::CreateAndRegister(metrics_, RESOLVE_POOL_METRIC_NAME);
|
||||
StatsMetric<double>::CreateAndRegister(metrics, RESOLVE_POOL_METRIC_NAME);
|
||||
|
||||
if (FLAGS_fair_scheduler_allocation_path.empty() &&
|
||||
FLAGS_llama_site_path.empty()) {
|
||||
@@ -130,33 +150,54 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
|
||||
EXIT_IF_EXC(jni_env);
|
||||
}
|
||||
|
||||
Status RequestPoolService::ResolveRequestPool(const string& requested_pool_name,
|
||||
const string& user, TResolveRequestPoolResult* resolved_pool) {
|
||||
Status RequestPoolService::ResolveRequestPool(const TQueryCtx& ctx,
|
||||
string* resolved_pool) {
|
||||
if (default_pool_only_) {
|
||||
resolved_pool->__set_resolved_pool(DEFAULT_POOL_NAME);
|
||||
resolved_pool->__set_has_access(true);
|
||||
*resolved_pool = DEFAULT_POOL_NAME;
|
||||
return Status::OK();
|
||||
}
|
||||
string user = GetEffectiveUser(ctx.session);
|
||||
if (user.empty()) {
|
||||
if (FLAGS_require_username) return Status(ERROR_USER_NOT_SPECIFIED);
|
||||
// Fall back to a 'default' user if not set so that queries can still run.
|
||||
VLOG_RPC << "No user specified: using user=default";
|
||||
user = DEFAULT_USER;
|
||||
}
|
||||
|
||||
const string& requested_pool = ctx.request.query_options.request_pool;
|
||||
TResolveRequestPoolParams params;
|
||||
params.__set_user(user);
|
||||
params.__set_requested_pool(requested_pool_name);
|
||||
|
||||
params.__set_requested_pool(requested_pool);
|
||||
TResolveRequestPoolResult result;
|
||||
int64_t start_time = MonotonicMillis();
|
||||
Status status = JniUtil::CallJniMethod(request_pool_service_, resolve_request_pool_id_,
|
||||
params, resolved_pool);
|
||||
params, &result);
|
||||
resolve_pool_ms_metric_->Update(MonotonicMillis() - start_time);
|
||||
return status;
|
||||
|
||||
if (result.status.status_code != TErrorCode::OK) {
|
||||
return Status(boost::algorithm::join(result.status.error_msgs, "; "));
|
||||
}
|
||||
if (result.resolved_pool.empty()) {
|
||||
return Status(strings::Substitute(ERROR_USER_TO_POOL_MAPPING_NOT_FOUND,
|
||||
user, requested_pool));
|
||||
}
|
||||
if (!result.has_access) {
|
||||
return Status(strings::Substitute(ERROR_USER_NOT_ALLOWED_IN_POOL, user,
|
||||
requested_pool, result.resolved_pool));
|
||||
}
|
||||
*resolved_pool = result.resolved_pool;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status RequestPoolService::GetPoolConfig(const string& pool_name,
|
||||
TPoolConfigResult* pool_config) {
|
||||
TPoolConfig* pool_config) {
|
||||
if (default_pool_only_) {
|
||||
pool_config->__set_max_requests(
|
||||
FLAGS_disable_pool_max_requests ? -1 : FLAGS_default_pool_max_requests);
|
||||
pool_config->__set_mem_limit(
|
||||
pool_config->__set_max_mem_resources(
|
||||
FLAGS_disable_pool_mem_limits ? -1 : default_pool_mem_limit_);
|
||||
pool_config->__set_max_queued(FLAGS_default_pool_max_queued);
|
||||
pool_config->__set_default_query_options("");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -165,6 +206,6 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
|
||||
RETURN_IF_ERROR(JniUtil::CallJniMethod(
|
||||
request_pool_service_, get_pool_config_id_, params, pool_config));
|
||||
if (FLAGS_disable_pool_max_requests) pool_config->__set_max_requests(-1);
|
||||
if (FLAGS_disable_pool_mem_limits) pool_config->__set_mem_limit(-1);
|
||||
if (FLAGS_disable_pool_mem_limits) pool_config->__set_max_mem_resources(-1);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -23,12 +23,14 @@
|
||||
|
||||
namespace impala {
|
||||
|
||||
/// Used by the SimpleScheduler and AdmissionController to resolve users to pools and to
|
||||
/// get per-pool configurations for admission control. If fair scheduler allocation and
|
||||
/// Llama configuration files are available, then they are used via the Java-side
|
||||
/// RequestPoolService class. Otherwise, a default pool is always returned with pool
|
||||
/// limits configurable via gflags. A single instance of RequestPoolService is created and
|
||||
/// it lives the lifetime of the process.
|
||||
/// Service to resolve incoming requests to resource pools and to provide the pool
|
||||
/// configurations. A single instance of RequestPoolService is created and it lives the
|
||||
/// lifetime of the process.
|
||||
///
|
||||
/// The resource pools are specified via fair-scheduler.xml and llama-site.xml files if
|
||||
/// they are available, otherwise all requests are mapped to a single 'default-pool'
|
||||
/// which is configurable via gflags. The xml files are managed by the Java class
|
||||
/// RequestPoolService, called via JNI.
|
||||
class RequestPoolService {
|
||||
public:
|
||||
/// Initializes the JNI method stubs if configuration files are specified. If any
|
||||
@@ -36,22 +38,18 @@ class RequestPoolService {
|
||||
/// terminate the process.
|
||||
RequestPoolService(MetricGroup* metrics);
|
||||
|
||||
/// Resolves the user and user-provided pool name to the pool returned by the placement
|
||||
/// policy and whether or not the user is authorized. If default_pool_only_ is true,
|
||||
/// then this will always return the default pool and will always be authorized, i.e.
|
||||
/// pool and user are ignored.
|
||||
Status ResolveRequestPool(const std::string& requested_pool_name,
|
||||
const std::string& user, TResolveRequestPoolResult* resolved_pool);
|
||||
/// Resolves the request to a resource pool as determined by the policy. Returns an
|
||||
/// error if the request cannot be resolved to a pool or if the user does not have
|
||||
/// access to submit requests in the resolved pool. If default_pool_only_ is true,
|
||||
/// then this will always return the default pool.
|
||||
Status ResolveRequestPool(const TQueryCtx& ctx, std::string* resolved_pool);
|
||||
|
||||
/// Gets the pool configuration values for the specified pool. If default_pool_only_ is
|
||||
/// true, then the returned values are always the default pool values, i.e. pool is
|
||||
/// ignored.
|
||||
Status GetPoolConfig(const std::string& pool_name, TPoolConfigResult* pool_config);
|
||||
/// true, then the returned values are always the default pool values, i.e. pool_name
|
||||
/// is ignored.
|
||||
Status GetPoolConfig(const std::string& pool_name, TPoolConfig* pool_config);
|
||||
|
||||
private:
|
||||
/// Metrics subsystem access
|
||||
MetricGroup* metrics_;
|
||||
|
||||
/// Metric measuring the time ResolveRequestPool() takes, in milliseconds.
|
||||
StatsMetric<double>* resolve_pool_ms_metric_;
|
||||
|
||||
|
||||
@@ -58,31 +58,18 @@ DECLARE_string(rm_default_memory);
|
||||
|
||||
DEFINE_bool(disable_admission_control, true, "Disables admission control.");
|
||||
|
||||
DEFINE_bool(require_username, false, "Requires that a user be provided in order to "
|
||||
"schedule requests. If enabled and a user is not provided, requests will be "
|
||||
"rejected, otherwise requests without a username will be submitted with the "
|
||||
"username 'default'.");
|
||||
|
||||
namespace impala {
|
||||
|
||||
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
|
||||
static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
|
||||
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
|
||||
static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
|
||||
static const string DEFAULT_USER("default");
|
||||
|
||||
static const string BACKENDS_WEB_PAGE = "/backends";
|
||||
static const string BACKENDS_TEMPLATE = "backends.tmpl";
|
||||
|
||||
const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
|
||||
|
||||
static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND(
|
||||
"No mapping found for request from user '$0' with requested pool '$1'");
|
||||
static const string ERROR_USER_NOT_ALLOWED_IN_POOL("Request from user '$0' with "
|
||||
"requested pool '$1' denied access to assigned pool '$2'");
|
||||
static const string ERROR_USER_NOT_SPECIFIED("User must be specified because "
|
||||
"-require_username=true.");
|
||||
|
||||
SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
|
||||
const string& backend_id, const TNetworkAddress& backend_address,
|
||||
MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
|
||||
@@ -862,39 +849,11 @@ int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
|
||||
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
|
||||
}
|
||||
|
||||
Status SimpleScheduler::GetRequestPool(const string& user,
|
||||
const TQueryOptions& query_options, string* pool) const {
|
||||
TResolveRequestPoolResult resolve_pool_result;
|
||||
const string& configured_pool = query_options.request_pool;
|
||||
RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(configured_pool, user,
|
||||
&resolve_pool_result));
|
||||
if (resolve_pool_result.status.status_code != TErrorCode::OK) {
|
||||
return Status(join(resolve_pool_result.status.error_msgs, "; "));
|
||||
}
|
||||
if (resolve_pool_result.resolved_pool.empty()) {
|
||||
return Status(Substitute(ERROR_USER_TO_POOL_MAPPING_NOT_FOUND, user,
|
||||
configured_pool));
|
||||
}
|
||||
if (!resolve_pool_result.has_access) {
|
||||
return Status(Substitute(ERROR_USER_NOT_ALLOWED_IN_POOL, user,
|
||||
configured_pool, resolve_pool_result.resolved_pool));
|
||||
}
|
||||
*pool = resolve_pool_result.resolved_pool;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
|
||||
if (schedule->effective_user().empty()) {
|
||||
if (FLAGS_require_username) return Status(ERROR_USER_NOT_SPECIFIED);
|
||||
// Fall back to a 'default' user if not set so that queries can still run.
|
||||
VLOG(2) << "No user specified: using user=default";
|
||||
}
|
||||
const string& user =
|
||||
schedule->effective_user().empty() ? DEFAULT_USER : schedule->effective_user();
|
||||
VLOG(3) << "user='" << user << "'";
|
||||
string pool;
|
||||
RETURN_IF_ERROR(GetRequestPool(user, schedule->query_options(), &pool));
|
||||
schedule->set_request_pool(pool);
|
||||
string resolved_pool;
|
||||
RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
|
||||
schedule->request().query_ctx, &resolved_pool));
|
||||
schedule->set_request_pool(resolved_pool);
|
||||
// Statestore topic may not have been updated yet if this is soon after startup, but
|
||||
// there is always at least this backend.
|
||||
schedule->set_num_hosts(max<int64_t>(num_fragment_instances_metric_->value(), 1));
|
||||
@@ -910,7 +869,9 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
|
||||
ComputeFragmentHosts(schedule->request(), schedule);
|
||||
ComputeFragmentExecParams(schedule->request(), schedule);
|
||||
if (!FLAGS_enable_rm) return Status::OK();
|
||||
schedule->PrepareReservationRequest(pool, user);
|
||||
string user = coord->runtime_state()->effective_user();
|
||||
if (user.empty()) user = "default";
|
||||
schedule->PrepareReservationRequest(resolved_pool, user);
|
||||
const TResourceBrokerReservationRequest& reservation_request =
|
||||
schedule->reservation_request();
|
||||
if (!reservation_request.resources.empty()) {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "service/child-query.h"
|
||||
#include "service/impala-server.inline.h"
|
||||
#include "service/query-exec-state.h"
|
||||
#include "service/query-options.h"
|
||||
#include "util/debug-util.h"
|
||||
|
||||
#include "common/names.h"
|
||||
@@ -98,64 +99,21 @@ Status ChildQuery::ExecAndFetch() {
|
||||
return status;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void SetQueryOption(TImpalaQueryOptions::type opt, const T& opt_val,
|
||||
TExecuteStatementReq* exec_stmt_req) {
|
||||
stringstream opt_val_ss;
|
||||
opt_val_ss << opt_val;
|
||||
map<int, const char*>::const_iterator it =
|
||||
_TImpalaQueryOptions_VALUES_TO_NAMES.find(opt);
|
||||
if (it == _TImpalaQueryOptions_VALUES_TO_NAMES.end()) return;
|
||||
exec_stmt_req->confOverlay[it->second] = opt_val_ss.str();
|
||||
exec_stmt_req->__isset.confOverlay = true;
|
||||
}
|
||||
|
||||
#define SET_QUERY_OPTION(NAME, ENUM)\
|
||||
if (parent_options.__isset.NAME) {\
|
||||
SetQueryOption(TImpalaQueryOptions::ENUM,\
|
||||
parent_options.NAME, exec_stmt_req);\
|
||||
}
|
||||
|
||||
void ChildQuery::SetQueryOptions(const TQueryOptions& parent_options,
|
||||
TExecuteStatementReq* exec_stmt_req) {
|
||||
// If this DCHECK is hit then handle the missing query option below.
|
||||
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),
|
||||
TImpalaQueryOptions::RANDOM_REPLICA + 1);
|
||||
SET_QUERY_OPTION(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED);
|
||||
SET_QUERY_OPTION(abort_on_error, ABORT_ON_ERROR);
|
||||
SET_QUERY_OPTION(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS);
|
||||
SET_QUERY_OPTION(batch_size, BATCH_SIZE);
|
||||
map<string, string> conf;
|
||||
#define QUERY_OPT_FN(NAME, ENUM)\
|
||||
if (parent_options.__isset.NAME) {\
|
||||
stringstream val;\
|
||||
val << parent_options.NAME;\
|
||||
conf[#ENUM] = val.str();\
|
||||
}
|
||||
QUERY_OPTS_TABLE
|
||||
#undef QUERY_OPT_FN
|
||||
// Ignore debug actions on child queries because they may cause deadlock.
|
||||
SET_QUERY_OPTION(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT);
|
||||
SET_QUERY_OPTION(disable_cached_reads, DISABLE_CACHED_READS);
|
||||
SET_QUERY_OPTION(disable_outermost_topn, DISABLE_OUTERMOST_TOPN);
|
||||
SET_QUERY_OPTION(disable_codegen, DISABLE_CODEGEN);
|
||||
SET_QUERY_OPTION(explain_level, EXPLAIN_LEVEL);
|
||||
SET_QUERY_OPTION(hbase_cache_blocks, HBASE_CACHE_BLOCKS);
|
||||
SET_QUERY_OPTION(hbase_caching, HBASE_CACHING);
|
||||
SET_QUERY_OPTION(max_errors, MAX_ERRORS);
|
||||
SET_QUERY_OPTION(max_io_buffers, MAX_IO_BUFFERS);
|
||||
SET_QUERY_OPTION(max_scan_range_length, MAX_SCAN_RANGE_LENGTH);
|
||||
SET_QUERY_OPTION(mem_limit, MEM_LIMIT);
|
||||
SET_QUERY_OPTION(num_nodes, NUM_NODES);
|
||||
SET_QUERY_OPTION(num_scanner_threads, NUM_SCANNER_THREADS);
|
||||
SET_QUERY_OPTION(compression_codec, COMPRESSION_CODEC);
|
||||
SET_QUERY_OPTION(parquet_file_size, PARQUET_FILE_SIZE);
|
||||
SET_QUERY_OPTION(request_pool, REQUEST_POOL);
|
||||
SET_QUERY_OPTION(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT);
|
||||
SET_QUERY_OPTION(sync_ddl, SYNC_DDL);
|
||||
SET_QUERY_OPTION(v_cpu_cores, V_CPU_CORES);
|
||||
SET_QUERY_OPTION(rm_initial_mem, RM_INITIAL_MEM);
|
||||
SET_QUERY_OPTION(query_timeout_s, QUERY_TIMEOUT_S);
|
||||
SET_QUERY_OPTION(max_block_mgr_memory, MAX_BLOCK_MGR_MEMORY);
|
||||
SET_QUERY_OPTION(appx_count_distinct, APPX_COUNT_DISTINCT);
|
||||
SET_QUERY_OPTION(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS);
|
||||
SET_QUERY_OPTION(seq_compression_mode, SEQ_COMPRESSION_MODE);
|
||||
SET_QUERY_OPTION(exec_single_node_rows_threshold,
|
||||
EXEC_SINGLE_NODE_ROWS_THRESHOLD);
|
||||
SET_QUERY_OPTION(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS);
|
||||
SET_QUERY_OPTION(replica_preference, REPLICA_PREFERENCE);
|
||||
SET_QUERY_OPTION(random_replica, RANDOM_REPLICA);
|
||||
map<string, string>::iterator it = conf.find("DEBUG_ACTION");
|
||||
if (it != conf.end()) conf.erase(it);
|
||||
exec_stmt_req->__set_confOverlay(conf);
|
||||
}
|
||||
|
||||
void ChildQuery::Cancel() {
|
||||
|
||||
@@ -513,6 +513,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
|
||||
TQueryCtx* query_ctx) {
|
||||
query_ctx->request.stmt = query.query;
|
||||
VLOG_QUERY << "query: " << ThriftDebugString(query);
|
||||
QueryOptionsMask set_query_options_mask;
|
||||
{
|
||||
shared_ptr<SessionState> session;
|
||||
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
|
||||
@@ -525,6 +526,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
|
||||
lock_guard<mutex> l(session->lock);
|
||||
if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
|
||||
query_ctx->request.query_options = session->default_query_options;
|
||||
set_query_options_mask = session->set_query_options_mask;
|
||||
}
|
||||
session->ToThrift(session_id, &query_ctx->session);
|
||||
}
|
||||
@@ -532,12 +534,16 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
|
||||
// Override default query options with Query.Configuration
|
||||
if (query.__isset.configuration) {
|
||||
BOOST_FOREACH(const string& option, query.configuration) {
|
||||
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
|
||||
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options,
|
||||
&set_query_options_mask));
|
||||
}
|
||||
VLOG_QUERY << "TClientRequest.queryOptions: "
|
||||
<< ThriftDebugString(query_ctx->request.query_options);
|
||||
}
|
||||
|
||||
// Only query options not set in the session or confOverlay can be overridden by the
|
||||
// pool options.
|
||||
AddPoolQueryOptions(query_ctx, ~set_query_options_mask);
|
||||
VLOG_QUERY << "TClientRequest.queryOptions: "
|
||||
<< ThriftDebugString(query_ctx->request.query_options);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -527,6 +527,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
|
||||
const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) {
|
||||
query_ctx->request.stmt = execute_request.statement;
|
||||
VLOG_QUERY << "TExecuteStatementReq: " << ThriftDebugString(execute_request);
|
||||
QueryOptionsMask set_query_options_mask;
|
||||
{
|
||||
shared_ptr<SessionState> session_state;
|
||||
TUniqueId session_id;
|
||||
@@ -538,6 +539,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
|
||||
session_state->ToThrift(session_id, &query_ctx->session);
|
||||
lock_guard<mutex> l(session_state->lock);
|
||||
query_ctx->request.query_options = session_state->default_query_options;
|
||||
set_query_options_mask = session_state->set_query_options_mask;
|
||||
}
|
||||
|
||||
if (execute_request.__isset.confOverlay) {
|
||||
@@ -551,11 +553,14 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second,
|
||||
&query_ctx->request.query_options));
|
||||
&query_ctx->request.query_options, &set_query_options_mask));
|
||||
}
|
||||
VLOG_QUERY << "TClientRequest.queryOptions: "
|
||||
<< ThriftDebugString(query_ctx->request.query_options);
|
||||
}
|
||||
// Only query options not set in the session or confOverlay can be overridden by the
|
||||
// pool options.
|
||||
AddPoolQueryOptions(query_ctx, ~set_query_options_mask);
|
||||
VLOG_QUERY << "TClientRequest.queryOptions: "
|
||||
<< ThriftDebugString(query_ctx->request.query_options);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -637,7 +642,8 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
|
||||
continue;
|
||||
}
|
||||
// Ignore failure to set query options (will be logged)
|
||||
SetQueryOption(conf_itr->first, conf_itr->second, &state->default_query_options);
|
||||
SetQueryOption(conf_itr->first, conf_itr->second, &state->default_query_options,
|
||||
&state->set_query_options_mask);
|
||||
}
|
||||
}
|
||||
TQueryOptionsToMap(state->default_query_options, &return_val.configuration);
|
||||
|
||||
@@ -55,7 +55,6 @@
|
||||
#include "service/fragment-exec-state.h"
|
||||
#include "service/impala-internal-service.h"
|
||||
#include "service/query-exec-state.h"
|
||||
#include "service/query-options.h"
|
||||
#include "scheduling/simple-scheduler.h"
|
||||
#include "util/bit-util.h"
|
||||
#include "util/cgroups-mgr.h"
|
||||
@@ -708,11 +707,51 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) {
|
||||
|
||||
ImpalaServer::~ImpalaServer() {}
|
||||
|
||||
void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx,
|
||||
const QueryOptionsMask& override_options_mask) {
|
||||
// Errors are not returned and are only logged (at level 2) because some incoming
|
||||
// requests are not expected to be mapped to a pool and will not have query options,
|
||||
// e.g. 'use [db];'. For requests that do need to be mapped to a pool successfully, the
|
||||
// pool is resolved again during scheduling and errors are handled at that point.
|
||||
string resolved_pool;
|
||||
Status status = exec_env_->request_pool_service()->ResolveRequestPool(*ctx,
|
||||
&resolved_pool);
|
||||
if (!status.ok()) {
|
||||
VLOG_RPC << "Not adding pool query options for query=" << ctx->query_id
|
||||
<< " ResolveRequestPool status: " << status.GetDetail();
|
||||
return;
|
||||
}
|
||||
|
||||
TPoolConfig config;
|
||||
status = exec_env_->request_pool_service()->GetPoolConfig(resolved_pool, &config);
|
||||
if (!status.ok()) {
|
||||
VLOG_RPC << "Not adding pool query options for query=" << ctx->query_id
|
||||
<< " GetConfigPool status: " << status.GetDetail();
|
||||
return;
|
||||
}
|
||||
|
||||
TQueryOptions pool_options;
|
||||
QueryOptionsMask set_pool_options_mask;
|
||||
status = ParseQueryOptions(config.default_query_options, &pool_options,
|
||||
&set_pool_options_mask);
|
||||
if (!status.ok()) {
|
||||
VLOG_RPC << "Not adding pool query options for query=" << ctx->query_id
|
||||
<< " ParseQueryOptions status: " << status.GetDetail();
|
||||
return;
|
||||
}
|
||||
|
||||
QueryOptionsMask overlay_mask = override_options_mask & set_pool_options_mask;
|
||||
VLOG_RPC << "Parsed pool options: " << DebugQueryOptions(pool_options)
|
||||
<< " override_options_mask=" << override_options_mask.to_string()
|
||||
<< " set_pool_mask=" << set_pool_options_mask.to_string()
|
||||
<< " overlay_mask=" << overlay_mask.to_string();
|
||||
OverlayQueryOptions(pool_options, overlay_mask, &ctx->request.query_options);
|
||||
}
|
||||
|
||||
Status ImpalaServer::Execute(TQueryCtx* query_ctx,
|
||||
shared_ptr<SessionState> session_state,
|
||||
shared_ptr<QueryExecState>* exec_state) {
|
||||
PrepareQueryContext(query_ctx);
|
||||
bool registered_exec_state;
|
||||
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
||||
|
||||
// Redact the SQL stmt and update the query context
|
||||
@@ -720,6 +759,7 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
|
||||
Redact(&stmt);
|
||||
query_ctx->request.__set_redacted_stmt((const string) stmt);
|
||||
|
||||
bool registered_exec_state;
|
||||
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state,
|
||||
exec_state);
|
||||
if (!status.ok() && registered_exec_state) {
|
||||
@@ -1098,7 +1138,9 @@ void ImpalaServer::TransmitData(
|
||||
}
|
||||
|
||||
void ImpalaServer::InitializeConfigVariables() {
|
||||
Status status = ParseQueryOptions(FLAGS_default_query_options, &default_query_options_);
|
||||
QueryOptionsMask set_query_options; // unused
|
||||
Status status = ParseQueryOptions(FLAGS_default_query_options, &default_query_options_,
|
||||
&set_query_options);
|
||||
if (!status.ok()) {
|
||||
// Log error and exit if the default query options are invalid.
|
||||
LOG(ERROR) << "Invalid default query options. Please check -default_query_options.\n"
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "rpc/thrift-server.h"
|
||||
#include "common/status.h"
|
||||
#include "service/frontend.h"
|
||||
#include "service/query-options.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/runtime-profile.h"
|
||||
#include "util/simple-logger.h"
|
||||
@@ -681,6 +682,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
|
||||
void CancelFromThreadPool(uint32_t thread_id,
|
||||
const CancellationWork& cancellation_work);
|
||||
|
||||
/// Helper method to add any pool query options to the query_ctx. Must be called before
|
||||
/// ExecuteInternal() at which point the TQueryCtx is const and cannot be mutated.
|
||||
/// override_options_mask indicates which query options can be overridden by the pool
|
||||
/// default query options.
|
||||
void AddPoolQueryOptions(TQueryCtx* query_ctx,
|
||||
const QueryOptionsMask& override_options_mask);
|
||||
|
||||
/// Processes a CatalogUpdateResult returned from the CatalogServer and ensures
|
||||
/// the update has been applied to the local impalad's catalog cache. If
|
||||
/// wait_for_all_subscribers is true, this function will also wait until all
|
||||
@@ -794,8 +802,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
|
||||
/// run as children of Beeswax sessions) get results back in the expected format -
|
||||
/// child queries inherit the HS2 version from their parents, and a Beeswax session
|
||||
/// will never update the HS2 version from the default.
|
||||
SessionState() : closed(false), expired(false), hs2_version(
|
||||
apache::hive::service::cli::thrift::
|
||||
SessionState() : closed(false), expired(false),
|
||||
hs2_version(apache::hive::service::cli::thrift::
|
||||
TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) {
|
||||
}
|
||||
|
||||
@@ -829,9 +837,15 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
|
||||
/// The default database (changed as a result of 'use' query execution).
|
||||
std::string database;
|
||||
|
||||
/// The default query options of this session.
|
||||
/// The default query options of this session. When the session is created, the
|
||||
/// session inherits the global defaults from ImpalaServer::default_query_options_.
|
||||
TQueryOptions default_query_options;
|
||||
|
||||
/// BitSet indicating which query options in default_query_options have been
|
||||
/// explicitly set in the session. Updated when a query option is specified using a
|
||||
/// SET command: the bit corresponding to the TImpalaQueryOptions enum is set.
|
||||
QueryOptionsMask set_query_options_mask;
|
||||
|
||||
/// For HS2 only, the protocol version this session is expecting.
|
||||
apache::hive::service::cli::thrift::TProtocolVersion::type hs2_version;
|
||||
|
||||
|
||||
@@ -133,6 +133,8 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
|
||||
profile_.AddChild(&server_profile_);
|
||||
summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
|
||||
summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
|
||||
summary_profile_.AddInfoString("Query Options (non default)",
|
||||
DebugQueryOptions(query_ctx_.request.query_options));
|
||||
|
||||
switch (exec_request->stmt_type) {
|
||||
case TStmtType::QUERY:
|
||||
@@ -181,7 +183,8 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
|
||||
RETURN_IF_ERROR(SetQueryOption(
|
||||
exec_request_.set_query_option_request.key,
|
||||
exec_request_.set_query_option_request.value,
|
||||
&session_->default_query_options));
|
||||
&session_->default_query_options,
|
||||
&session_->set_query_options_mask));
|
||||
} else {
|
||||
// "SET" returns a table of all query options.
|
||||
map<string, string> config;
|
||||
@@ -422,7 +425,7 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
|
||||
DCHECK(exec_env_->resource_broker() != NULL);
|
||||
}
|
||||
schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
|
||||
exec_request_.query_options, effective_user(), &summary_profile_, query_events_));
|
||||
exec_request_.query_options, &summary_profile_, query_events_));
|
||||
coord_.reset(new Coordinator(exec_env_, query_events_));
|
||||
Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
|
||||
summary_profile_.AddInfoString("Request Pool", schedule_->request_pool());
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "gen-cpp/Frontend_types.h"
|
||||
#include "service/impala-server.h"
|
||||
#include "gen-cpp/Frontend_types.h"
|
||||
#include "util/auth-util.h"
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/unordered_set.hpp>
|
||||
@@ -132,16 +133,13 @@ class ImpalaServer::QueryExecState {
|
||||
Status SetResultCache(QueryResultSet* cache, int64_t max_size);
|
||||
|
||||
ImpalaServer::SessionState* session() const { return session_.get(); }
|
||||
|
||||
/// Queries are run and authorized on behalf of the effective_user.
|
||||
/// When a do_as_user is specified (is not empty), the effective_user is set to the
|
||||
/// do_as_user. This is because the connected_user is acting as a "proxy user" for the
|
||||
/// do_as_user. When do_as_user is empty, the effective_user is always set to the
|
||||
/// connected_user.
|
||||
const std::string& effective_user() const {
|
||||
return do_as_user().empty() ? connected_user() : do_as_user();
|
||||
return GetEffectiveUser(query_ctx_.session);
|
||||
}
|
||||
const std::string& connected_user() const { return query_ctx_.session.connected_user; }
|
||||
const std::string& do_as_user() const { return session_->do_as_user; }
|
||||
const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
|
||||
TSessionType::type session_type() const { return query_ctx_.session.session_type; }
|
||||
const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
|
||||
const std::string& default_db() const { return query_ctx_.session.database; }
|
||||
|
||||
@@ -48,6 +48,43 @@ static Status ParseMemValue(const string& value, const string& key, int64_t* res
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask,
|
||||
TQueryOptions* dst) {
|
||||
DCHECK_GT(mask.size(), _TImpalaQueryOptions_VALUES_TO_NAMES.size()) <<
|
||||
"Size of QueryOptionsMask must be increased.";
|
||||
#define QUERY_OPT_FN(NAME, ENUM)\
|
||||
if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->NAME = src.NAME;
|
||||
QUERY_OPTS_TABLE
|
||||
#undef QUERY_OPT_FN
|
||||
}
|
||||
|
||||
void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
|
||||
map<string, string>* configuration) {
|
||||
#define QUERY_OPT_FN(NAME, ENUM)\
|
||||
{\
|
||||
stringstream val;\
|
||||
val << query_options.NAME;\
|
||||
(*configuration)[#ENUM] = val.str();\
|
||||
}
|
||||
QUERY_OPTS_TABLE
|
||||
#undef QUERY_OPT_FN
|
||||
}
|
||||
|
||||
string impala::DebugQueryOptions(const TQueryOptions& query_options) {
|
||||
const static TQueryOptions defaults;
|
||||
int i = 0;
|
||||
stringstream ss;
|
||||
#define QUERY_OPT_FN(NAME, ENUM)\
|
||||
if (query_options.__isset.NAME &&\
|
||||
(!defaults.__isset.NAME || query_options.NAME != defaults.NAME)) {\
|
||||
if (i++ > 0) ss << ",";\
|
||||
ss << #ENUM << "=" << query_options.NAME;\
|
||||
}
|
||||
QUERY_OPTS_TABLE
|
||||
#undef QUERY_OPT_FN
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
// Returns the TImpalaQueryOptions enum for the given "key". Input is case insensitive.
|
||||
// Return -1 if the input is an invalid option.
|
||||
int GetQueryOptionForKey(const string& key) {
|
||||
@@ -61,126 +98,8 @@ int GetQueryOptionForKey(const string& key) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
|
||||
map<string, string>* configuration) {
|
||||
map<int, const char*>::const_iterator itr =
|
||||
_TImpalaQueryOptions_VALUES_TO_NAMES.begin();
|
||||
for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
|
||||
stringstream val;
|
||||
switch (itr->first) {
|
||||
case TImpalaQueryOptions::ABORT_ON_ERROR:
|
||||
val << query_options.abort_on_error;
|
||||
break;
|
||||
case TImpalaQueryOptions::MAX_ERRORS:
|
||||
val << query_options.max_errors;
|
||||
break;
|
||||
case TImpalaQueryOptions::DISABLE_CODEGEN:
|
||||
val << query_options.disable_codegen;
|
||||
break;
|
||||
case TImpalaQueryOptions::BATCH_SIZE:
|
||||
val << query_options.batch_size;
|
||||
break;
|
||||
case TImpalaQueryOptions::MEM_LIMIT:
|
||||
val << query_options.mem_limit;
|
||||
break;
|
||||
case TImpalaQueryOptions::NUM_NODES:
|
||||
val << query_options.num_nodes;
|
||||
break;
|
||||
case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH:
|
||||
val << query_options.max_scan_range_length;
|
||||
break;
|
||||
case TImpalaQueryOptions::MAX_IO_BUFFERS:
|
||||
val << query_options.max_io_buffers;
|
||||
break;
|
||||
case TImpalaQueryOptions::NUM_SCANNER_THREADS:
|
||||
val << query_options.num_scanner_threads;
|
||||
break;
|
||||
case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
|
||||
val << query_options.allow_unsupported_formats;
|
||||
break;
|
||||
case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
|
||||
val << query_options.default_order_by_limit;
|
||||
break;
|
||||
case TImpalaQueryOptions::DEBUG_ACTION:
|
||||
val << query_options.debug_action;
|
||||
break;
|
||||
case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
|
||||
val << query_options.abort_on_default_limit_exceeded;
|
||||
break;
|
||||
case TImpalaQueryOptions::COMPRESSION_CODEC:
|
||||
val << query_options.compression_codec;
|
||||
break;
|
||||
case TImpalaQueryOptions::SEQ_COMPRESSION_MODE:
|
||||
val << query_options.seq_compression_mode;
|
||||
break;
|
||||
case TImpalaQueryOptions::HBASE_CACHING:
|
||||
val << query_options.hbase_caching;
|
||||
break;
|
||||
case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
|
||||
val << query_options.hbase_cache_blocks;
|
||||
break;
|
||||
case TImpalaQueryOptions::PARQUET_FILE_SIZE:
|
||||
val << query_options.parquet_file_size;
|
||||
break;
|
||||
case TImpalaQueryOptions::EXPLAIN_LEVEL:
|
||||
val << query_options.explain_level;
|
||||
break;
|
||||
case TImpalaQueryOptions::SYNC_DDL:
|
||||
val << query_options.sync_ddl;
|
||||
break;
|
||||
case TImpalaQueryOptions::REQUEST_POOL:
|
||||
val << query_options.request_pool;
|
||||
break;
|
||||
case TImpalaQueryOptions::V_CPU_CORES:
|
||||
val << query_options.v_cpu_cores;
|
||||
break;
|
||||
case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
|
||||
val << query_options.reservation_request_timeout;
|
||||
break;
|
||||
case TImpalaQueryOptions::DISABLE_CACHED_READS:
|
||||
val << query_options.disable_cached_reads;
|
||||
break;
|
||||
case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
|
||||
val << query_options.disable_outermost_topn;
|
||||
break;
|
||||
case TImpalaQueryOptions::RM_INITIAL_MEM:
|
||||
val << query_options.rm_initial_mem;
|
||||
break;
|
||||
case TImpalaQueryOptions::QUERY_TIMEOUT_S:
|
||||
val << query_options.query_timeout_s;
|
||||
break;
|
||||
case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY:
|
||||
val << query_options.max_block_mgr_memory;
|
||||
break;
|
||||
case TImpalaQueryOptions::APPX_COUNT_DISTINCT:
|
||||
val << query_options.appx_count_distinct;
|
||||
break;
|
||||
case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS:
|
||||
val << query_options.disable_unsafe_spills;
|
||||
break;
|
||||
case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
|
||||
val << query_options.exec_single_node_rows_threshold;
|
||||
break;
|
||||
case TImpalaQueryOptions::OPTIMIZE_PARTITION_KEY_SCANS:
|
||||
val << query_options.optimize_partition_key_scans;
|
||||
case TImpalaQueryOptions::REPLICA_PREFERENCE:
|
||||
val << query_options.replica_preference;
|
||||
break;
|
||||
case TImpalaQueryOptions::RANDOM_REPLICA:
|
||||
val << query_options.random_replica;
|
||||
break;
|
||||
default:
|
||||
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
|
||||
// when we add a new query option.
|
||||
LOG(ERROR) << "Missing exec option implementation: " << itr->second;
|
||||
DCHECK(false);
|
||||
}
|
||||
(*configuration)[itr->second] = val.str();
|
||||
}
|
||||
}
|
||||
|
||||
Status impala::SetQueryOption(const string& key, const string& value,
|
||||
TQueryOptions* query_options) {
|
||||
TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) {
|
||||
int option = GetQueryOptionForKey(key);
|
||||
if (option < 0) {
|
||||
return Status(Substitute("Ignoring invalid configuration option: $0", key));
|
||||
@@ -396,11 +315,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
|
||||
DCHECK(false);
|
||||
break;
|
||||
}
|
||||
if (set_query_options_mask != NULL) {
|
||||
DCHECK_LT(option, set_query_options_mask->size());
|
||||
set_query_options_mask->set(option);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_options) {
|
||||
Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_options,
|
||||
QueryOptionsMask* set_query_options_mask) {
|
||||
if (options.length() == 0) return Status::OK();
|
||||
vector<string> kv_pairs;
|
||||
split(kv_pairs, options, is_any_of(","), token_compress_on);
|
||||
@@ -413,7 +337,8 @@ Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_opt
|
||||
return Status(Substitute("Ignoring invalid configuration option $0: bad format "
|
||||
"(expected 'key=value')", kv_string));
|
||||
}
|
||||
RETURN_IF_ERROR(SetQueryOption(key_value[0], key_value[1], query_options));
|
||||
RETURN_IF_ERROR(SetQueryOption(key_value[0], key_value[1], query_options,
|
||||
set_query_options_mask));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <bitset>
|
||||
|
||||
#include "common/status.h"
|
||||
|
||||
@@ -26,19 +27,80 @@ namespace impala {
|
||||
|
||||
class TQueryOptions;
|
||||
|
||||
/// Converts a TQueryOptions struct into a map of key, value pairs
|
||||
// Macro to help generate functions that use or manipulate query options.
|
||||
// If the DCHECK is hit then handle the missing query option below and update
|
||||
// the DCHECK.
|
||||
#define QUERY_OPTS_TABLE\
|
||||
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
|
||||
TImpalaQueryOptions::RANDOM_REPLICA + 1);\
|
||||
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
|
||||
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
|
||||
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
|
||||
QUERY_OPT_FN(batch_size, BATCH_SIZE)\
|
||||
QUERY_OPT_FN(debug_action, DEBUG_ACTION)\
|
||||
QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
|
||||
QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS)\
|
||||
QUERY_OPT_FN(disable_outermost_topn, DISABLE_OUTERMOST_TOPN)\
|
||||
QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN)\
|
||||
QUERY_OPT_FN(explain_level, EXPLAIN_LEVEL)\
|
||||
QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS)\
|
||||
QUERY_OPT_FN(hbase_caching, HBASE_CACHING)\
|
||||
QUERY_OPT_FN(max_errors, MAX_ERRORS)\
|
||||
QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS)\
|
||||
QUERY_OPT_FN(max_scan_range_length, MAX_SCAN_RANGE_LENGTH)\
|
||||
QUERY_OPT_FN(mem_limit, MEM_LIMIT)\
|
||||
QUERY_OPT_FN(num_nodes, NUM_NODES)\
|
||||
QUERY_OPT_FN(num_scanner_threads, NUM_SCANNER_THREADS)\
|
||||
QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC)\
|
||||
QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE)\
|
||||
QUERY_OPT_FN(request_pool, REQUEST_POOL)\
|
||||
QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT)\
|
||||
QUERY_OPT_FN(sync_ddl, SYNC_DDL)\
|
||||
QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\
|
||||
QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\
|
||||
QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S)\
|
||||
QUERY_OPT_FN(max_block_mgr_memory, MAX_BLOCK_MGR_MEMORY)\
|
||||
QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT)\
|
||||
QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS)\
|
||||
QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
|
||||
QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD)\
|
||||
QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS)\
|
||||
QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE);\
|
||||
QUERY_OPT_FN(random_replica, RANDOM_REPLICA);
|
||||
|
||||
/// Converts a TQueryOptions struct into a map of key, value pairs.
|
||||
void TQueryOptionsToMap(const TQueryOptions& query_options,
|
||||
std::map<std::string, std::string>* configuration);
|
||||
|
||||
/// Returns a comma-delimted string of the contents of query_options. The output does not
|
||||
/// contain key-value pairs where the value matches the default value specified in the
|
||||
/// TQueryOptions definition (regardless of whether or not it was explicitly or
|
||||
/// implicitly set to the default value).
|
||||
std::string DebugQueryOptions(const TQueryOptions& query_options);
|
||||
|
||||
/// Bitmask for the values of TQueryOptions.
|
||||
/// TODO: Find a way to set the size based on the number of fields.
|
||||
typedef std::bitset<64> QueryOptionsMask;
|
||||
|
||||
/// Updates the query options in dst from those in src where the query option is set
|
||||
/// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If
|
||||
/// mask has no set bits, no options are set. If all bits are set, then all options
|
||||
/// that were set on src are copied to dst.
|
||||
void OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask,
|
||||
TQueryOptions* dst);
|
||||
|
||||
/// Set the key/value pair in TQueryOptions. It will override existing setting in
|
||||
/// query_options.
|
||||
/// query_options. The bit corresponding to query option 'key' in set_query_options_mask
|
||||
/// is set.
|
||||
Status SetQueryOption(const std::string& key, const std::string& value,
|
||||
TQueryOptions* query_options);
|
||||
TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
|
||||
|
||||
/// Parse a "," separated key=value pair of query options and set it in 'query_options'.
|
||||
/// If the same query option is specified more than once, the last one wins.
|
||||
/// Return an error if the input is invalid (bad format or invalid query option).
|
||||
Status ParseQueryOptions(const std::string& options, TQueryOptions* query_options);
|
||||
/// If the same query option is specified more than once, the last one wins. The
|
||||
/// set_query_options_mask bitmask is updated to reflect the query options which were
|
||||
/// set. Return an error if the input is invalid (bad format or invalid query option).
|
||||
Status ParseQueryOptions(const std::string& options, TQueryOptions* query_options,
|
||||
QueryOptionsMask* set_query_options_mask);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util")
|
||||
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/util")
|
||||
|
||||
add_library(Util
|
||||
auth-util.cc
|
||||
avro-util.cc
|
||||
benchmark.cc
|
||||
bitmap.cc
|
||||
|
||||
28
be/src/util/auth-util.cc
Normal file
28
be/src/util/auth-util.cc
Normal file
@@ -0,0 +1,28 @@
|
||||
// Copyright 2016 Cloudera Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "util/auth-util.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace impala {
|
||||
|
||||
const string& GetEffectiveUser(const TSessionState& session) {
|
||||
if (session.__isset.delegated_user && !session.delegated_user.empty()) {
|
||||
return session.delegated_user;
|
||||
}
|
||||
return session.connected_user;
|
||||
}
|
||||
|
||||
}
|
||||
33
be/src/util/auth-util.h
Normal file
33
be/src/util/auth-util.h
Normal file
@@ -0,0 +1,33 @@
|
||||
// Copyright 2016 Cloudera Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
|
||||
#ifndef IMPALA_UTIL_AUTH_UTIL_H
|
||||
#define IMPALA_UTIL_AUTH_UTIL_H
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "gen-cpp/ImpalaInternalService_types.h"
|
||||
|
||||
namespace impala {
|
||||
|
||||
/// Returns a reference to the "effective user" from the specified session. Queries
|
||||
/// are run and authorized on behalf of the effective user. When a delegated_user is
|
||||
/// specified (is not empty), the effective user is the delegated_user. This is because
|
||||
/// the connected_user is acting as a "proxy user" for the delegated_user. When
|
||||
/// delegated_user is empty, the effective user is the connected user.
|
||||
const std::string& GetEffectiveUser(const TSessionState& session);
|
||||
|
||||
} // namespace impala
|
||||
#endif
|
||||
@@ -42,7 +42,6 @@ const i32 INVALID_PLAN_NODE_ID = -1
|
||||
// Constant default partition ID, must be < 0 to avoid collisions
|
||||
const i64 DEFAULT_PARTITION_ID = -1;
|
||||
|
||||
// Preference for replica selection
|
||||
enum TReplicaPreference {
|
||||
CACHE_LOCAL,
|
||||
CACHE_RACK,
|
||||
@@ -51,8 +50,19 @@ enum TReplicaPreference {
|
||||
REMOTE
|
||||
}
|
||||
|
||||
// Query options that correspond to ImpalaService.ImpalaQueryOptions,
|
||||
// with their respective defaults
|
||||
// Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
|
||||
// respective defaults. Query options can be set in the following ways:
|
||||
//
|
||||
// 1) Process-wide defaults (via the impalad arg --default_query_options)
|
||||
// 2) Resource pool defaults (via resource pool configuration)
|
||||
// 3) Session settings (via the SET command or the HS2 OpenSession RPC)
|
||||
// 4) HS2/Beeswax configuration 'overlay' in the request metadata
|
||||
//
|
||||
// (1) and (2) are set by administrators and provide the default query options for a
|
||||
// session, in that order, so options set in (2) override those in (1). The user
|
||||
// can specify query options with (3) to override the defaults, which are stored in the
|
||||
// SessionState. Finally, the client can pass a config 'overlay' (4) in the request
|
||||
// metadata which overrides everything else.
|
||||
struct TQueryOptions {
|
||||
1: optional bool abort_on_error = 0
|
||||
2: optional i32 max_errors = 0
|
||||
@@ -484,16 +494,23 @@ struct TPoolConfigParams {
|
||||
}
|
||||
|
||||
// Returned by RequestPoolService.getPoolConfig()
|
||||
struct TPoolConfigResult {
|
||||
struct TPoolConfig {
|
||||
// Maximum number of placed requests before incoming requests are queued.
|
||||
1: required i64 max_requests
|
||||
|
||||
// Maximum number of queued requests before incoming requests are rejected.
|
||||
2: required i64 max_queued
|
||||
|
||||
// Memory limit of the pool before incoming requests are queued.
|
||||
// -1 indicates no limit.
|
||||
3: required i64 mem_limit
|
||||
// Maximum memory resources of the pool in bytes. -1 indicates no limit.
|
||||
3: required i64 max_mem_resources
|
||||
|
||||
// Maximum amount of time (in milliseconds) that a request will wait to be admitted
|
||||
// before timing out. Optional, if not set then the process default (set via gflags) is
|
||||
// used.
|
||||
4: optional i64 queue_timeout_ms;
|
||||
|
||||
// Default query options that are applied to requests mapped to this pool.
|
||||
5: required string default_query_options;
|
||||
}
|
||||
|
||||
service ImpalaInternalService {
|
||||
|
||||
@@ -27,8 +27,7 @@ include "TCLIService.thrift"
|
||||
// The valid keys are listed in this enum. They map to TQueryOptions.
|
||||
// Note: If you add an option or change the default, you also need to update:
|
||||
// - ImpalaInternalService.thrift: TQueryOptions
|
||||
// - ImpaladClientExecutor.getBeeswaxQueryConfigurations()
|
||||
// - SetQueryOption()
|
||||
// - SetQueryOption(), SetQueryOptions()
|
||||
// - TQueryOptionsToMap()
|
||||
enum TImpalaQueryOptions {
|
||||
// if true, abort execution on the first error
|
||||
|
||||
@@ -20,154 +20,154 @@
|
||||
"key": "external-data-source.class-cache.hits"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Cluster In Queue",
|
||||
"description": "Resource Pool $0 Aggregate Queue Size",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Cluster In Queue",
|
||||
"label": "Resource Pool $0 Aggregate Queue Size",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.cluster-in-queue"
|
||||
"key": "admission-controller.agg-num-queued.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Cluster Mem Estimate",
|
||||
"description": "Resource Pool $0 Aggregate Mem Reserved",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Cluster Mem Estimate",
|
||||
"label": "Resource Pool $0 Aggregate Mem Reserved",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.cluster-mem-estimate"
|
||||
"key": "admission-controller.agg-mem-reserved.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Cluster Mem Usage",
|
||||
"description": "Resource Pool $0 Aggregate Mem Admitted",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Cluster Mem Usage",
|
||||
"label": "Resource Pool $0 Aggregate Mem Admitted",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.cluster-mem-usage"
|
||||
"key": "admission-controller.agg-mem-admitted.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Cluster Num Running",
|
||||
"description": "Resource Pool $0 Aggregate Num Running",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Cluster Num Running",
|
||||
"label": "Resource Pool $0 Aggregate Num Running",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.cluster-num-running"
|
||||
"key": "admission-controller.agg-num-running.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Admitted",
|
||||
"description": "Total number of requests admitted to pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Admitted",
|
||||
"label": "Resource Pool $0 Total Admitted",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-admitted"
|
||||
"key": "admission-controller.total-admitted.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Completed",
|
||||
"description": "Total number of requests that have completed and released resources in pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Completed",
|
||||
"label": "Resource Pool $0 Total Released",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-completed"
|
||||
"key": "admission-controller.total-released.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Dequeued",
|
||||
"description": "Total number of requests dequeued in pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Dequeued",
|
||||
"label": "Resource Pool $0 Total Dequeued",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-dequeued"
|
||||
"key": "admission-controller.total-dequeued.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local In Queue",
|
||||
"description": "Resource Pool $0 Queue Size on the coordinator",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local In Queue",
|
||||
"label": "Resource Pool $0 Coordinator Backend Queue Size",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.local-in-queue"
|
||||
"key": "admission-controller.local-num-queued.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Mem Estimate",
|
||||
"description": "Resource Pool $0 Mem Reserved by the backend coordinator",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Mem Estimate",
|
||||
"label": "Resource Pool $0 Coordinator Backend Mem Reserved",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.local-mem-estimate"
|
||||
"key": "admission-controller.local-backend-mem-reserved.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Mem Usage",
|
||||
"description": "Resource Pool $0 Coordinator Backend Mem Usage",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Mem Usage",
|
||||
"label": "Resource Pool $0 Coordinator Backend Mem Usage",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.local-mem-usage"
|
||||
"key": "admission-controller.local-backend-mem-usage.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Num Running",
|
||||
"description": "Resource Pool $0 Coordinator Backend Num Running",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Num Running",
|
||||
"label": "Resource Pool $0 Coordinator Backend Num Running",
|
||||
"units": "NONE",
|
||||
"kind": "GAUGE",
|
||||
"key": "admission-controller.$0.local-num-running"
|
||||
"key": "admission-controller.local-num-admitted-running.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Queued",
|
||||
"description": "Total number of requests queued in pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Queued",
|
||||
"label": "Resource Pool $0 Total Queued",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-queued"
|
||||
"key": "admission-controller.total-queued.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Rejected",
|
||||
"description": "Total number of requests rejected in pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Rejected",
|
||||
"label": "Resource Pool $0 Total Rejected",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-rejected"
|
||||
"key": "admission-controller.total-rejected.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Time In Queue Ms",
|
||||
"description": "Resource Pool $0 Time in Queue",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Time In Queue Ms",
|
||||
"units": "UNIT",
|
||||
"label": "Resource Pool $0 Time in Queue",
|
||||
"units": "TIME_MS",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-time-in-queue-ms"
|
||||
"key": "admission-controller.time-in-queue-ms.$0"
|
||||
},
|
||||
{
|
||||
"description": "Resource Pool $0 Local Timed Out",
|
||||
"description": "Total number of requests timed out waiting while queued in pool $0",
|
||||
"contexts": [
|
||||
"RESOURCE_POOL"
|
||||
],
|
||||
"label": "Resource Pool $0 Local Timed Out",
|
||||
"label": "Resource Pool $0 Total Timed Out",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "admission-controller.$0.local-timed-out"
|
||||
"key": "admission-controller.total-timed-out.$0"
|
||||
},
|
||||
{
|
||||
"description": "Catalog Server Topic Processing Time",
|
||||
@@ -669,6 +669,16 @@
|
||||
"kind": "GAUGE",
|
||||
"key": "mem-tracker.process.bytes-freed-by-last-gc"
|
||||
},
|
||||
{
|
||||
"description": "The process memory tracker limit.",
|
||||
"contexts": [
|
||||
"IMPALAD"
|
||||
],
|
||||
"label": "Process Tracker Limit",
|
||||
"units": "BYTES",
|
||||
"kind": "GAUGE",
|
||||
"key": "mem-tracker.process.limit"
|
||||
},
|
||||
{
|
||||
"description": "The amount of memory by which the process was over its memory limit the last time the memory limit was encountered.",
|
||||
"contexts": [
|
||||
|
||||
@@ -41,7 +41,7 @@ import com.cloudera.impala.common.InternalException;
|
||||
import com.cloudera.impala.common.JniUtil;
|
||||
import com.cloudera.impala.thrift.TErrorCode;
|
||||
import com.cloudera.impala.thrift.TPoolConfigParams;
|
||||
import com.cloudera.impala.thrift.TPoolConfigResult;
|
||||
import com.cloudera.impala.thrift.TPoolConfig;
|
||||
import com.cloudera.impala.thrift.TResolveRequestPoolParams;
|
||||
import com.cloudera.impala.thrift.TResolveRequestPoolResult;
|
||||
import com.cloudera.impala.thrift.TStatus;
|
||||
@@ -82,7 +82,7 @@ public class RequestPoolService {
|
||||
|
||||
// Key for the default maximum number of running queries ("placed reservations")
|
||||
// property. The per-pool key name is this key with the pool name appended, e.g.
|
||||
// "{key}.{pool}".
|
||||
// "{key}.{pool}". This is a llama-site.xml configuration.
|
||||
final static String LLAMA_MAX_PLACED_RESERVATIONS_KEY =
|
||||
"llama.am.throttling.maximum.placed.reservations";
|
||||
|
||||
@@ -92,7 +92,7 @@ public class RequestPoolService {
|
||||
|
||||
// Key for the default maximum number of queued requests ("queued reservations")
|
||||
// property. The per-pool key name is this key with the pool name appended, e.g.
|
||||
// "{key}.{pool}".
|
||||
// "{key}.{pool}". This is a llama-site.xml configuration.
|
||||
final static String LLAMA_MAX_QUEUED_RESERVATIONS_KEY =
|
||||
"llama.am.throttling.maximum.queued.reservations";
|
||||
|
||||
@@ -100,6 +100,19 @@ public class RequestPoolService {
|
||||
// differs from the current Llama default of 0 which disables queuing.
|
||||
final static int LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT = 200;
|
||||
|
||||
// Key for the pool queue timeout (milliseconds). This is be specified in the
|
||||
// llama-site.xml but is Impala-specific and Llama does not use this.
|
||||
final static String QUEUE_TIMEOUT_KEY = "impala.admission-control.pool-queue-timeout-ms";
|
||||
|
||||
// Default value of the pool queue timeout (ms).
|
||||
final static int QUEUE_TIMEOUT_MS_DEFAULT = 60 * 1000;
|
||||
|
||||
// Key for the pool default query options. Query options are specified as a
|
||||
// comma delimited string of 'key=value' pairs, e.g. 'key1=val1,key2=val2'.
|
||||
// This is specified in the llama-site.xml but is Impala-specific and Llama does not
|
||||
// use this.
|
||||
final static String QUERY_OPTIONS_KEY = "impala.admission-control.pool-default-query-options";
|
||||
|
||||
// String format for a per-pool configuration key. First parameter is the key for the
|
||||
// default, e.g. LLAMA_MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the
|
||||
// pool name.
|
||||
@@ -258,7 +271,7 @@ public class RequestPoolService {
|
||||
JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams,
|
||||
thriftResolvePoolParams);
|
||||
TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams);
|
||||
LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
|
||||
LOG.info("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
|
||||
new Object[] { resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
|
||||
result.resolved_pool, result.has_access });
|
||||
try {
|
||||
@@ -314,14 +327,14 @@ public class RequestPoolService {
|
||||
* Gets the pool configuration values for the specified pool.
|
||||
*
|
||||
* @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
|
||||
* @return serialized {@link TPoolConfigResult}
|
||||
* @return serialized {@link TPoolConfig}
|
||||
*/
|
||||
public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
|
||||
Preconditions.checkState(running_.get());
|
||||
TPoolConfigParams poolConfigParams = new TPoolConfigParams();
|
||||
JniUtil.deserializeThrift(protocolFactory_, poolConfigParams,
|
||||
thriftPoolConfigParams);
|
||||
TPoolConfigResult result = getPoolConfig(poolConfigParams.getPool());
|
||||
TPoolConfig result = getPoolConfig(poolConfigParams.getPool());
|
||||
try {
|
||||
return new TSerializer(protocolFactory_).serialize(result);
|
||||
} catch (TException e) {
|
||||
@@ -330,14 +343,15 @@ public class RequestPoolService {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
TPoolConfigResult getPoolConfig(String pool) {
|
||||
TPoolConfigResult result = new TPoolConfigResult();
|
||||
int maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
|
||||
result.setMem_limit(
|
||||
TPoolConfig getPoolConfig(String pool) {
|
||||
TPoolConfig result = new TPoolConfig();
|
||||
long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
|
||||
result.setMax_mem_resources(
|
||||
maxMemoryMb == Integer.MAX_VALUE ? -1 : (long) maxMemoryMb * ByteUnits.MEGABYTE);
|
||||
if (llamaConf_ == null) {
|
||||
result.setMax_requests(LLAMA_MAX_PLACED_RESERVATIONS_DEFAULT);
|
||||
result.setMax_queued(LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT);
|
||||
result.setDefault_query_options("");
|
||||
} else {
|
||||
// Capture the current llamaConf_ in case it changes while we're using it.
|
||||
Configuration currentLlamaConf = llamaConf_;
|
||||
@@ -347,15 +361,25 @@ public class RequestPoolService {
|
||||
result.setMax_queued(getLlamaPoolConfigValue(currentLlamaConf, pool,
|
||||
LLAMA_MAX_QUEUED_RESERVATIONS_KEY,
|
||||
LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT));
|
||||
|
||||
// Only return positive values. Admission control has a default from gflags.
|
||||
int queueTimeoutMs = getLlamaPoolConfigValue(currentLlamaConf, pool,
|
||||
QUEUE_TIMEOUT_KEY, -1);
|
||||
if (queueTimeoutMs > 0) result.setQueue_timeout_ms(queueTimeoutMs);
|
||||
result.setDefault_query_options(getLlamaPoolConfigValue(currentLlamaConf, pool,
|
||||
QUERY_OPTIONS_KEY, ""));
|
||||
}
|
||||
LOG.trace("getPoolConfig(pool={}): mem_limit={}, max_requests={}, max_queued={}",
|
||||
new Object[] { pool, result.mem_limit, result.max_requests, result.max_queued });
|
||||
LOG.info("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}, " +
|
||||
"max_queued={}, queue_timeout_ms={}, default_query_options={}",
|
||||
new Object[] { pool, result.max_mem_resources, result.max_requests,
|
||||
result.max_queued, result.queue_timeout_ms, result.default_query_options });
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up the per-pool Llama config, first checking for a per-pool value, then a
|
||||
* default set in the config, and lastly to the specified 'defaultValue'.
|
||||
* Looks up the per-pool integer config from the llama Configuration. First checks for
|
||||
* a per-pool value, then a default set in the config, and lastly to the specified
|
||||
* 'defaultValue'.
|
||||
*
|
||||
* @param conf The Configuration to use, provided so the caller can ensure the same
|
||||
* Configuration is used to look up multiple properties.
|
||||
@@ -366,6 +390,15 @@ public class RequestPoolService {
|
||||
conf.getInt(key, defaultValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Looks up the per-pool String config from the llama Configuration. See above.
|
||||
*/
|
||||
private String getLlamaPoolConfigValue(Configuration conf, String pool, String key,
|
||||
String defaultValue) {
|
||||
return conf.get(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
|
||||
conf.get(key, defaultValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the actual pool to use via the allocation placement policy. The policy may
|
||||
* change the requested pool.
|
||||
|
||||
@@ -28,7 +28,7 @@ import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import com.cloudera.impala.common.ByteUnits;
|
||||
import com.cloudera.impala.thrift.TErrorCode;
|
||||
import com.cloudera.impala.thrift.TPoolConfigResult;
|
||||
import com.cloudera.impala.thrift.TPoolConfig;
|
||||
import com.cloudera.impala.thrift.TResolveRequestPoolParams;
|
||||
import com.cloudera.impala.thrift.TResolveRequestPoolResult;
|
||||
import com.google.common.collect.Iterables;
|
||||
@@ -168,9 +168,10 @@ public class TestRequestPoolService {
|
||||
@Test
|
||||
public void testPoolLimitConfigs() throws Exception {
|
||||
createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
|
||||
checkPoolConfigResult("root", 15, 50, -1);
|
||||
checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE);
|
||||
checkPoolConfigResult("root.queueB", 5, 10, -1);
|
||||
checkPoolConfigResult("root", 15, 50, -1, 30000L, "mem_limit=1024m");
|
||||
checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE,
|
||||
10000L, "mem_limit=1024m,query_timeout_s=10");
|
||||
checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -246,22 +247,37 @@ public class TestRequestPoolService {
|
||||
Assert.assertTrue(poolService_.hasAccess("root.queueC", "root"));
|
||||
|
||||
// Test pool limit changes
|
||||
checkPoolConfigResult("root", 15, 100, -1);
|
||||
checkPoolConfigResult("root.queueA", 10, 30, 100000 * ByteUnits.MEGABYTE);
|
||||
checkPoolConfigResult("root.queueB", 5, 10, -1);
|
||||
checkPoolConfigResult("root.queueC", 10, 30, 128 * ByteUnits.MEGABYTE);
|
||||
checkPoolConfigResult("root", 15, 100, -1, 30000L, "");
|
||||
checkPoolConfigResult("root.queueA", 1, 30, 100000 * ByteUnits.MEGABYTE,
|
||||
50L, "mem_limit=128m,query_timeout_s=5");
|
||||
checkPoolConfigResult("root.queueB", 5, 10, -1, 60000L, "");
|
||||
checkPoolConfigResult("root.queueC", 10, 30, 128 * ByteUnits.MEGABYTE,
|
||||
30000L, "mem_limit=2048m,query_timeout_s=60");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to verify the per-pool limits.
|
||||
*/
|
||||
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
|
||||
long expectedMaxQueued, long expectedMaxMemUsage) {
|
||||
TPoolConfigResult expectedResult = new TPoolConfigResult();
|
||||
long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
|
||||
String expectedQueryOptions) {
|
||||
TPoolConfig expectedResult = new TPoolConfig();
|
||||
expectedResult.setMax_requests(expectedMaxRequests);
|
||||
expectedResult.setMax_queued(expectedMaxQueued);
|
||||
expectedResult.setMem_limit(expectedMaxMemUsage);
|
||||
expectedResult.setMax_mem_resources(expectedMaxMem);
|
||||
if (expectedQueueTimeoutMs != null) {
|
||||
expectedResult.setQueue_timeout_ms(expectedQueueTimeoutMs);
|
||||
}
|
||||
if (expectedQueryOptions != null) {
|
||||
expectedResult.setDefault_query_options(expectedQueryOptions);
|
||||
}
|
||||
Assert.assertEquals("Unexpected config values for pool " + pool,
|
||||
expectedResult, poolService_.getPoolConfig(pool));
|
||||
}
|
||||
|
||||
private void checkPoolConfigResult(String pool, long expectedMaxRequests,
|
||||
long expectedMaxQueued, long expectedMaxMemUsage) {
|
||||
checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
|
||||
expectedMaxMemUsage, null, "");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,14 @@
|
||||
<name>llama.am.throttling.maximum.queued.reservations</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-queue-timeout-ms</name>
|
||||
<value>30000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-default-query-options</name>
|
||||
<value>mem_limit=1024m</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>llama.am.throttling.maximum.placed.reservations.root</name>
|
||||
@@ -27,4 +35,12 @@
|
||||
<name>llama.am.throttling.maximum.queued.reservations.root.queueA</name>
|
||||
<value>30</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-queue-timeout-ms.root.queueA</name>
|
||||
<value>10000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-default-query-options.root.queueA</name>
|
||||
<value>mem_limit=1024m,query_timeout_s=10</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
||||
@@ -10,6 +10,10 @@
|
||||
<name>llama.am.throttling.maximum.queued.reservations</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-queue-timeout-ms</name>
|
||||
<value>30000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>llama.am.throttling.maximum.placed.reservations.root</name>
|
||||
@@ -21,13 +25,25 @@
|
||||
</property>
|
||||
<property>
|
||||
<name>llama.am.throttling.maximum.placed.reservations.root.queueA</name>
|
||||
<value>10</value>
|
||||
<value>1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>llama.am.throttling.maximum.queued.reservations.root.queueA</name>
|
||||
<value>30</value>
|
||||
</property>
|
||||
<property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-queue-timeout-ms.root.queueA</name>
|
||||
<value>50</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-queue-timeout-ms.root.queueB</name>
|
||||
<value>60000</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-default-query-options.root.queueA</name>
|
||||
<value>mem_limit=128m,query_timeout_s=5</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>llama.am.throttling.maximum.placed.reservations.root.queueC</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
@@ -35,4 +51,8 @@
|
||||
<name>llama.am.throttling.maximum.queued.reservations.root.queueC</name>
|
||||
<value>30</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>impala.admission-control.pool-default-query-options.root.queueC</name>
|
||||
<value>mem_limit=2048m,query_timeout_s=60</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
||||
@@ -13,6 +13,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
||||
from tests.common.test_vector import TestDimension
|
||||
from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
|
||||
from ImpalaService import ImpalaHiveServer2Service
|
||||
from TCLIService import TCLIService
|
||||
|
||||
import logging
|
||||
import os
|
||||
@@ -56,6 +59,9 @@ MEM_TEST_LIMIT = 100000 * 1024 * 1024
|
||||
|
||||
_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s" % (STATESTORE_HEARTBEAT_MS)
|
||||
|
||||
# Key in the query profile for the query options.
|
||||
PROFILE_QUERY_OPTIONS_KEY = "Query Options (non default): "
|
||||
|
||||
def impalad_admission_ctrl_flags(max_requests, max_queued, mem_limit):
|
||||
return ("-vmodule admission-controller=3 -default_pool_max_requests %s "
|
||||
"-default_pool_max_queued %s -default_pool_mem_limit %s "
|
||||
@@ -63,19 +69,19 @@ def impalad_admission_ctrl_flags(max_requests, max_queued, mem_limit):
|
||||
(max_requests, max_queued, mem_limit))
|
||||
|
||||
|
||||
def impalad_admission_ctrl_config_args():
|
||||
def impalad_admission_ctrl_config_args(additional_args=""):
|
||||
impalad_home = os.environ['IMPALA_HOME']
|
||||
resources_dir = os.path.join(impalad_home, "fe", "src", "test", "resources")
|
||||
fs_allocation_path = os.path.join(resources_dir, "fair-scheduler-test2.xml")
|
||||
llama_site_path = os.path.join(resources_dir, "llama-site-test2.xml")
|
||||
return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
|
||||
"-llama_site_path %s -disable_admission_control=false" %\
|
||||
(fs_allocation_path, llama_site_path))
|
||||
"-llama_site_path %s -disable_admission_control=false %s" %\
|
||||
(fs_allocation_path, llama_site_path, additional_args))
|
||||
|
||||
def log_metrics(log_prefix, metrics, log_level=logging.DEBUG):
|
||||
LOG.log(log_level, "%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
|
||||
"completed=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
|
||||
metrics['dequeued'], metrics['rejected'], metrics['completed'],
|
||||
"released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
|
||||
metrics['dequeued'], metrics['rejected'], metrics['released'],
|
||||
metrics['timed-out'])
|
||||
|
||||
def compute_metric_deltas(m2, m1, metric_names):
|
||||
@@ -84,20 +90,21 @@ def compute_metric_deltas(m2, m1, metric_names):
|
||||
|
||||
def metric_key(pool_name, metric_name):
|
||||
"""Helper method to construct the admission controller metric keys"""
|
||||
return "admission-controller.%s.%s" % (pool_name, metric_name)
|
||||
return "admission-controller.%s.%s" % (metric_name, pool_name)
|
||||
|
||||
class TestAdmissionController(CustomClusterTestSuite):
|
||||
class TestAdmissionControllerBase(CustomClusterTestSuite):
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
return 'functional-query'
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestAdmissionController, cls).add_test_dimensions()
|
||||
super(TestAdmissionControllerBase, cls).add_test_dimensions()
|
||||
cls.TestMatrix.add_dimension(create_single_exec_option_dimension())
|
||||
# There's no reason to test this on other file formats/compression codecs right now
|
||||
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
|
||||
|
||||
class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
|
||||
def __check_pool_rejected(self, client, pool, expected_error_re):
|
||||
try:
|
||||
client.set_configuration({'request_pool': pool})
|
||||
@@ -106,15 +113,64 @@ class TestAdmissionController(CustomClusterTestSuite):
|
||||
except ImpalaBeeswaxException as e:
|
||||
assert re.search(expected_error_re, str(e))
|
||||
|
||||
def __check_query_options(self, profile, expected_query_options):
|
||||
"""Validate that the per-pool query options were set on the specified profile.
|
||||
expected_query_options is a list of "KEY=VALUE" strings, e.g. ["MEM_LIMIT=1", ...]"""
|
||||
confs = []
|
||||
for line in profile.split("\n"):
|
||||
if PROFILE_QUERY_OPTIONS_KEY in line:
|
||||
rhs = re.split(": ", line)[1]
|
||||
confs = re.split(",", rhs)
|
||||
break
|
||||
assert len(confs) == len(expected_query_options)
|
||||
confs = map(str.lower, confs)
|
||||
for expected in expected_query_options:
|
||||
assert expected.lower() in confs,\
|
||||
"Expected query options '%s' to be set" % (",".join(expected_query_options))
|
||||
|
||||
def __check_hs2_query_opts(self, pool_name, mem_limit=None, expected_options=None):
|
||||
""" Submits a query via HS2 (optionally with a mem_limit in the confOverlay)
|
||||
into pool_name and checks that the expected_query_options are set in the
|
||||
profile."""
|
||||
execute_statement_req = TCLIService.TExecuteStatementReq()
|
||||
execute_statement_req.sessionHandle = self.session_handle
|
||||
execute_statement_req.confOverlay = {'request_pool': pool_name}
|
||||
if mem_limit is not None: execute_statement_req.confOverlay['mem_limit'] = mem_limit
|
||||
execute_statement_req.statement = "select 1";
|
||||
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
|
||||
HS2TestSuite.check_response(execute_statement_resp)
|
||||
|
||||
fetch_results_req = TCLIService.TFetchResultsReq()
|
||||
fetch_results_req.operationHandle = execute_statement_resp.operationHandle
|
||||
fetch_results_req.maxRows = 1
|
||||
fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
|
||||
HS2TestSuite.check_response(fetch_results_resp)
|
||||
|
||||
close_operation_req = TCLIService.TCloseOperationReq()
|
||||
close_operation_req.operationHandle = execute_statement_resp.operationHandle
|
||||
HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
|
||||
|
||||
get_profile_req = ImpalaHiveServer2Service.TGetRuntimeProfileReq()
|
||||
get_profile_req.operationHandle = execute_statement_resp.operationHandle
|
||||
get_profile_req.sessionHandle = self.session_handle
|
||||
get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req)
|
||||
HS2TestSuite.check_response(get_profile_resp)
|
||||
self.__check_query_options(get_profile_resp.profile, expected_options)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=impalad_admission_ctrl_config_args(),
|
||||
impalad_args=impalad_admission_ctrl_config_args(\
|
||||
"-default_query_options=mem_limit=200000000"),
|
||||
statestored_args=_STATESTORED_ARGS)
|
||||
def test_set_request_pool(self, vector):
|
||||
@needs_session(conf_overlay={'batch_size': '100'})
|
||||
def test_set_request_pool(self):
|
||||
"""Tests setting the REQUEST_POOL with the pool placement policy configured
|
||||
to require a specific pool (IMPALA-1050)."""
|
||||
to require a specific pool, and validate that the per-pool configurations were
|
||||
applied."""
|
||||
impalad = self.cluster.impalads[0]
|
||||
client = impalad.service.create_beeswax_client()
|
||||
# Expected default mem limit for queueA, used in several tests below
|
||||
queueA_mem_limit = "MEM_LIMIT=%s" % (128*1024*1024)
|
||||
try:
|
||||
for pool in ['', 'not_a_pool_name']:
|
||||
expected_error =\
|
||||
@@ -129,11 +185,63 @@ class TestAdmissionController(CustomClusterTestSuite):
|
||||
|
||||
# Also try setting a valid pool
|
||||
client.set_configuration({'request_pool': 'root.queueB'})
|
||||
client.execute("select 1") # Query should execute in queueB
|
||||
result = client.execute("select 1")
|
||||
# Query should execute in queueB which doesn't have a default mem limit set in the
|
||||
# llama-site.xml, so it should inherit the value from the default process query
|
||||
# options.
|
||||
self.__check_query_options(result.runtime_profile,\
|
||||
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB'])
|
||||
|
||||
# Try setting the pool for a queue with a very low queue timeout.
|
||||
# queueA allows only 1 running query and has a queue timeout of 50ms, so the
|
||||
# second concurrent query should time out quickly.
|
||||
client.set_configuration({'request_pool': 'root.queueA'})
|
||||
handle = client.execute_async("select sleep(1000)")
|
||||
self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
|
||||
assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
|
||||
# queueA has default query options mem_limit=128m,query_timeout_s=5
|
||||
self.__check_query_options(client.get_runtime_profile(handle),\
|
||||
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
|
||||
client.close_query(handle)
|
||||
|
||||
# Should be able to set query options via the set command (overriding defaults if
|
||||
# applicable). mem_limit overrides the pool default. abort_on_error has no
|
||||
# proc/pool default.
|
||||
client.execute("set mem_limit=31337")
|
||||
client.execute("set abort_on_error=1")
|
||||
result = client.execute("select 1")
|
||||
self.__check_query_options(result.runtime_profile,\
|
||||
['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',\
|
||||
'REQUEST_POOL=root.queueA'])
|
||||
|
||||
# Should be able to set query options (overriding defaults if applicable) with the
|
||||
# config overlay sent with the query RPC. mem_limit is a pool-level override and
|
||||
# max_io_buffers has no proc/pool default.
|
||||
client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345',
|
||||
'max_io_buffers': '100'})
|
||||
result = client.execute("select 1")
|
||||
self.__check_query_options(result.runtime_profile,\
|
||||
['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
|
||||
'ABORT_ON_ERROR=1', 'MAX_IO_BUFFERS=100'])
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
class TestAdmissionControllerStress(TestAdmissionController):
|
||||
# HS2 tests:
|
||||
# batch_size is set in the HS2 OpenSession() call via the requires_session() test
|
||||
# decorator, so that is included in all test cases below.
|
||||
batch_size = "BATCH_SIZE=100"
|
||||
|
||||
# Check HS2 query in queueA gets the correct query options for the pool.
|
||||
self.__check_hs2_query_opts("root.queueA", None,\
|
||||
[queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
|
||||
# Check overriding the mem limit sent in the confOverlay with the query.
|
||||
self.__check_hs2_query_opts("root.queueA", '12345',\
|
||||
['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
|
||||
# Check HS2 query in queueB gets the process-wide default query options
|
||||
self.__check_hs2_query_opts("root.queueB", None,\
|
||||
['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB', batch_size])
|
||||
|
||||
class TestAdmissionControllerStress(TestAdmissionControllerBase):
|
||||
"""Submits a number of queries (parameterized) with some delay between submissions
|
||||
(parameterized) and the ability to submit to one impalad or many in a round-robin
|
||||
fashion. The queries are set with the WAIT debug action so that we have more control
|
||||
@@ -161,11 +269,6 @@ class TestAdmissionControllerStress(TestAdmissionController):
|
||||
submitting to a single impalad, we know exactly what the values should be,
|
||||
otherwise we just check that they are within reasonable bounds.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
return 'functional-query'
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestAdmissionControllerStress, cls).add_test_dimensions()
|
||||
@@ -216,14 +319,14 @@ class TestAdmissionControllerStress(TestAdmissionController):
|
||||
Returns a map of the admission metrics, aggregated across all of the impalads.
|
||||
|
||||
The metrics names are shortened for brevity: 'admitted', 'queued', 'dequeued',
|
||||
'rejected', 'completed', and 'timed-out'.
|
||||
'rejected', 'released', and 'timed-out'.
|
||||
"""
|
||||
metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected' : 0,
|
||||
'completed': 0, 'timed-out': 0}
|
||||
'released': 0, 'timed-out': 0}
|
||||
for impalad in self.impalads:
|
||||
for short_name in metrics.keys():
|
||||
metrics[short_name] += impalad.service.get_metric_value(\
|
||||
metric_key(self.pool_name, 'local-%s' % short_name), 0)
|
||||
metric_key(self.pool_name, 'total-%s' % short_name), 0)
|
||||
return metrics
|
||||
|
||||
def wait_for_metric_changes(self, metric_names, initial, expected_delta, timeout=30):
|
||||
@@ -469,7 +572,7 @@ class TestAdmissionControllerStress(TestAdmissionController):
|
||||
num_to_cancel = len(self.executing_threads)
|
||||
LOG.debug("Main loop, will cancel %s queries", num_to_cancel)
|
||||
self.cancel_admitted_queries(num_to_cancel)
|
||||
self.wait_for_metric_changes(['completed'], curr_metrics, num_to_cancel)
|
||||
self.wait_for_metric_changes(['released'], curr_metrics, num_to_cancel)
|
||||
|
||||
num_queued_remaining =\
|
||||
curr_metrics['queued'] - curr_metrics['dequeued'] - curr_metrics['timed-out']
|
||||
|
||||
Reference in New Issue
Block a user