IMPALA-565: Support user impersonation for HS2 authorization requests

This change adds support for user impersonation for HS2 authorization
requests. It adds a new flag (--authorized_proxy_user_config) that if
set, allows users (ex. hue) to impersonate as another user. The user they
wish to impersonate as is passed using the HS2 configuration property,
'impala.doas.user'.
The configuration allows for specifying the list of users a proxy user
can impersonate as well, or '*' to allow the proxy user to impersonate
any user. For example: hue=user1,user2,admin=*

Change-Id: I2a13e31e5bde2e6df47134458c803168415d0437
Reviewed-on: http://gerrit.ent.cloudera.com:8080/574
Reviewed-by: Lenni Kuff <lskuff@cloudera.com>
Tested-by: Lenni Kuff <lskuff@cloudera.com>
This commit is contained in:
Lenni Kuff
2013-09-30 11:52:04 -07:00
committed by Henry Robinson
parent 601f24a198
commit af6d381401
10 changed files with 195 additions and 88 deletions

View File

@@ -32,12 +32,19 @@ DEFINE_bool(load_catalog_at_startup, false, "if true, load all catalog data at s
// Authorization related flags. Must be set to valid values to properly configure
// authorization.
DEFINE_string(server_name, "", "The name to use for securing this impalad "
"server during authorization. If set, authorization will be enabled.");
"server during authorization. If set, authorization will be enabled.");
DEFINE_string(authorization_policy_file, "", "HDFS path to the authorization policy "
"file. If set, authorization will be enabled.");
"file. If set, authorization will be enabled.");
DEFINE_string(authorization_policy_provider_class,
"org.apache.sentry.provider.file.HadoopGroupResourceAuthorizationProvider",
"Advanced: The authorization policy provider class name.");
DEFINE_string(authorized_proxy_user_config, "",
"Specifies the set of authorized proxy users (users who can impersonate other "
"users during authorization) and whom they are allowed to impersonate. "
"Input is a semicolon-separated list of key=value pairs of authorized proxy "
"users to the user(s) they can impersonate. These users are specified as a comma "
"separated list of short usernames, or '*' to indicate all users. For example: "
"hue=user1,user2;admin=*");
// Describes one method to look up in a Frontend object
struct Frontend::MethodDescriptor {

View File

@@ -274,6 +274,15 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
if (request.__isset.configuration) {
map<string, string>::const_iterator conf_itr = request.configuration.begin();
for (; conf_itr != request.configuration.end(); ++conf_itr) {
// If the current user is a valid proxy user, he/she can optionally perform
// authorization requests on behalf of another user. This is done by setting the
// 'impala.doas.user' Hive Server 2 configuration property.
if (conf_itr->first == "impala.doas.user") {
state->do_as_user = conf_itr->second;
Status status = AuthorizeProxyUser(state->user, state->do_as_user);
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
continue;
}
Status status = SetQueryOptions(conf_itr->first, conf_itr->second,
&state->default_query_options);
HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);

View File

@@ -93,6 +93,7 @@ DECLARE_string(nn);
DECLARE_int32(nn_port);
DECLARE_bool(enable_process_lifetime_heap_profiling);
DECLARE_string(heap_profile_dir);
DECLARE_string(authorized_proxy_user_config);
DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served");
DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served");
@@ -402,6 +403,33 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
exit(1);
}
if (!FLAGS_authorized_proxy_user_config.empty()) {
// Parse the proxy user configuration using the format:
// <proxy user>=<comma separated list of users they are allowed to impersonate>
// See FLAGS_authorized_proxy_user_config for more details.
vector<string> proxy_user_config;
split(proxy_user_config, FLAGS_authorized_proxy_user_config, is_any_of(";"),
token_compress_on);
if (proxy_user_config.size() > 0) {
BOOST_FOREACH(const string& config, proxy_user_config) {
size_t pos = config.find("=");
if (pos == string::npos) {
LOG(ERROR) << "Invalid proxy user configuration. No mapping value specified "
<< "for the proxy user. For more information review usage of the "
<< "--authorized_proxy_user_config flag: " << config;
exit(1);
}
string proxy_user = config.substr(0, pos);
string config_str = config.substr(pos + 1);
vector<string> parsed_allowed_users;
split(parsed_allowed_users, config_str, is_any_of(","), token_compress_on);
unordered_set<string> allowed_users(parsed_allowed_users.begin(),
parsed_allowed_users.end());
authorized_proxy_user_config_.insert(make_pair(proxy_user, allowed_users));
}
}
}
Webserver::PathHandlerCallback varz_callback =
bind<void>(mem_fn(&ImpalaServer::RenderHadoopConfigs), this, _1, _2);
exec_env->webserver()->RegisterPathHandler("/varz", varz_callback);
@@ -495,9 +523,12 @@ Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_sta
writer.String(exec_state.query_status().GetErrorMsg().c_str());
writer.String("user");
writer.String(exec_state.user().c_str());
// Impala does not support impersonation so always mark this field as null.
writer.String("impersonator");
writer.Null();
if (exec_state.do_as_user().empty()) {
writer.Null();
} else {
writer.String(exec_state.do_as_user().c_str());
}
writer.String("statement_type");
if (request.stmt_type == TStmtType::DDL) {
if (request.catalog_op_request.op_type == TCatalogOpType::DDL) {
@@ -1261,7 +1292,6 @@ Status ImpalaServer::ParseQueryOptions(const string& options,
<< ": bad format (expected key=value)";
return Status(ss.str());
}
RETURN_IF_ERROR(SetQueryOptions(key_value[0], key_value[1], query_options));
}
return Status::OK;
@@ -1633,7 +1663,9 @@ void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
state->session_id = session_id;
state->session_type = session_type;
state->database = database;
state->user = user;
// The do_as_user will only be set if impersonation is enabled and the
// proxy user is authorized to impersonate as this user.
state->user = do_as_user.empty() ? user : do_as_user;
state->network_address = network_address;
}
@@ -1647,6 +1679,41 @@ void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
}
}
Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_user) {
if (user.empty()) {
return Status("Unable to impersonate using empty proxy username.");
} else if (user.empty()) {
return Status("Unable to impersonate using empty doAs username.");
}
stringstream error_msg;
error_msg << "User '" << user << "' is not authorized to impersonate '"
<< do_as_user << "'.";
if (authorized_proxy_user_config_.size() == 0) {
error_msg << " User impersonation is disabled.";
return Status(error_msg.str());
}
// Get the short version of the user name (the user name up to the first '/' or '@')
// from the full principal name.
size_t end_idx = min(user.find("/"), user.find("@"));
// If neither are found (or are found at the beginning of the user name),
// return the username. Otherwise, return the username up to the matching character.
string short_user(
end_idx == string::npos || end_idx == 0 ? user : user.substr(0, end_idx));
// Check if the proxy user exists. If he/she does, then check if they are allowed
// to impersonate the do_as_user.
ProxyUserMap::const_iterator proxy_user =
authorized_proxy_user_config_.find(short_user);
if (proxy_user != authorized_proxy_user_config_.end()) {
BOOST_FOREACH(const string& user, proxy_user->second) {
if (user == "*" || user == do_as_user) return Status::OK;
}
}
return Status(error_msg.str());
}
void ImpalaServer::CatalogUpdateCallback(
const StateStoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {

View File

@@ -439,6 +439,11 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
// Must be called with query_exec_state_map_lock_ held
void ArchiveQuery(const QueryExecState& query);
// Checks whether the given user is allowed to impersonate as the specified do_as_user.
// Returns OK if the authorization suceeds, otherwise returns an status with details
// on why the failure occurred.
Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user);
// Snapshot of a query's state, archived in the query log.
struct QueryStateRecord {
// Pretty-printed runtime profile. TODO: Copy actual profile object
@@ -640,6 +645,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
// User for this session
std::string user;
// The user to impersonate. Empty for no impersonation.
std::string do_as_user;
// Client network address
TNetworkAddress network_address;
@@ -735,6 +743,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
// and the CatalogService ID that this version was from.
int64_t current_catalog_version_;
TUniqueId current_catalog_service_id_;
// Map of short usernames of authorized proxy users to the set of user(s) they are
// allowed to impersonate. Populated by parsing the --authorized_proxy_users_config
// flag.
typedef boost::unordered_map<std::string, boost::unordered_set<std::string> >
ProxyUserMap;
ProxyUserMap authorized_proxy_user_config_;
};
// Create an ImpalaServer and Thrift servers.

View File

@@ -103,6 +103,7 @@ class ImpalaServer::QueryExecState {
ImpalaServer::SessionState* parent_session() const { return parent_session_.get(); }
const std::string& user() const { return parent_session_->user; }
const std::string& do_as_user() const { return parent_session_->do_as_user; }
TSessionType::type session_type() const { return query_session_state_.session_type; }
const TUniqueId& session_id() const { return query_session_state_.session_id; }
const std::string& default_db() const { return query_session_state_.database; }

View File

@@ -43,9 +43,10 @@ do
done
LOG_DIR=${IMPALA_TEST_CLUSTER_LOG_DIR}/query_tests
FE_LOG_DIR=${IMPALA_TEST_CLUSTER_LOG_DIR}/fe_tests
AUTHORIZATION_LOG_DIR=${IMPALA_TEST_CLUSTER_LOG_DIR}/authorization_tests
mkdir -p ${LOG_DIR}
mkdir -p ${FE_LOG_DIR}
mkdir -p ${AUTHORIZATION_LOG_DIR}
# Enable core dumps
ulimit -c unlimited
@@ -65,7 +66,19 @@ do
# Run backend tests.
${IMPALA_HOME}/bin/run-backend-tests.sh
# Run the remaining tests against an external Impala test cluster.
# Start up a cluster with authorization enabled.
${IMPALA_HOME}/bin/start-impala-cluster.py --log_dir=${AUTHORIZATION_LOG_DIR} \
--cluster_size=3 --impalad_args="\
--authorization_policy_file='/test-warehouse/authz-policy.ini'\
--server_name=server1\
--authorized_proxy_user_config=hue=$USER"
# Run authorization tests
pushd ${IMPALA_HOME}/tests
py.test authorization/test_authorization.py -k test_impersonation
popd
# Run the remaining tests against a cluster with authorization disabled.
${IMPALA_HOME}/bin/start-impala-cluster.py --log_dir=${LOG_DIR} --cluster_size=3
# Run some queries using run-workload to verify run-workload has not been broken.
@@ -91,4 +104,6 @@ do
# this requires adjusting the timeout values and making changes to the ImpalaService()
# class. Disable them for now.
#${IMPALA_HOME}/tests/run-process-failure-tests.sh
done

View File

@@ -1,77 +0,0 @@
#!/usr/bin/env bash
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
# run rebuild.sh -help to see options
root=`dirname "$0"`
root=`cd "$root"; pwd`
export IMPALA_HOME=$root
export METASTORE_DB=`basename $root | sed -e "s/\\./_/g" | sed -e "s/[.-]/_/g"`
. "$root"/bin/impala-config.sh
# Exit on non-true return value
set -e
# Exit on reference to unitialized variable
set -u
tests_action=0
# parse command line options
for ARG in $*
do
case "$ARG" in
-runtests)
tests_action=1
;;
-help)
echo "buildall.sh [-runtests]"
echo "[-runtests] : run fe and be tests"
exit
;;
esac
done
# cleanup FE process
$IMPALA_HOME/bin/clean-fe-processes.py
# build common and backend
cd $IMPALA_HOME
cmake -DCMAKE_BUILD_TYPE=Debug .
cd $IMPALA_HOME/common/function-registry
make
cd $IMPALA_HOME/common/thrift
make
cd $IMPALA_BE_DIR
make
# build frontend
# skip tests since any failures will prevent the
# package phase from completing.
cd $IMPALA_FE_DIR
mvn package -DskipTests=true
# run frontend tests
if [ $tests_action -eq 1 ]
then
mvn test
fi
# run backend tests For some reason this does not work on Jenkins
if [ $tests_action -eq 1 ]
then
cd $IMPALA_FE_DIR
mvn exec:java -Dexec.mainClass=com.cloudera.impala.testutil.PlanService \
-Dexec.classpathScope=test &
PID=$!
# Wait for planner to startup TODO: can we do something better than wait arbitrarily for
# 3 seconds. Not a huge deal if it's not long enough, BE tests will just wait a bit
sleep 3
cd $IMPALA_BE_DIR
make test
kill $PID
fi
# Generate list of files for Cscope to index
$IMPALA_HOME/bin/gen-cscope.sh

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Client tests for SQL statement authorization
import os
import pytest
import json
from tests.hs2.test_hs2 import *
from time import sleep
from getpass import getuser
from cli_service import TCLIService
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
from thrift.protocol import TBinaryProtocol
from thrift.Thrift import TApplicationException
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
class TestAuthorization(TestHS2):
def test_impersonation(self):
"""End-to-end impersonation + authorization test. Expects authorization to be
configured before running this test"""
open_session_req = TCLIService.TOpenSessionReq()
open_session_req.username = 'hue'
open_session_req.configuration = dict()
open_session_req.configuration['impala.doas.user'] = getuser()
resp = self.hs2_client.OpenSession(open_session_req)
TestHS2.check_response(resp)
# Try to query a table we are not authorized to access.
self.session_handle = resp.sessionHandle
execute_statement_req = TCLIService.TExecuteStatementReq()
execute_statement_req.sessionHandle = self.session_handle
execute_statement_req.statement = "describe tpch_seq.lineitem"
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
assert 'User \'%s\' does not have privileges to access' % getuser() in\
str(execute_statement_resp)
# Now try the same operation on a table we are authorized to access.
execute_statement_req = TCLIService.TExecuteStatementReq()
execute_statement_req.sessionHandle = self.session_handle
execute_statement_req.statement = "describe tpch.lineitem"
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
TestHS2.check_response(execute_statement_resp)
# Try to impersonate as a user we are not authorized to impersonate.
open_session_req.configuration['impala.doas.user'] = 'some_user'
resp = self.hs2_client.OpenSession(open_session_req)
assert 'User \'hue\' is not authorized to impersonate \'some_user\'' in str(resp)
self.socket.close()
self.socket = None

1
tests/hs2/__init__.py Normal file
View File

@@ -0,0 +1 @@
# This file is needed to make the files in this directory a python module

View File

@@ -15,9 +15,12 @@
#
# Client tests for Impala's HiveServer2 interface
from cli_service import TCLIService
import os
import pytest
import json
from time import sleep
from getpass import getuser
from cli_service import TCLIService
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
from thrift.protocol import TBinaryProtocol
@@ -30,6 +33,8 @@ def needs_session(fn):
"""
def add_session(self):
open_session_req = TCLIService.TOpenSessionReq()
open_session_req.username = getuser()
open_session_req.configuration = dict()
resp = self.hs2_client.OpenSession(open_session_req)
TestHS2.check_response(resp)
self.session_handle = resp.sessionHandle