Added user() utility function.

This commit is contained in:
Alex Behm
2013-07-10 11:09:17 -07:00
committed by Henry Robinson
parent a7f6bacb6d
commit 3696e6a4b4
17 changed files with 77 additions and 56 deletions

View File

@@ -618,7 +618,7 @@ void TestSingleLiteralConstruction(PrimitiveType type, void* value,
const string& string_val) {
ObjectPool pool;
RowDescriptor desc;
RuntimeState state(TUniqueId(), TQueryOptions(), "", NULL);
RuntimeState state(TUniqueId(), TQueryOptions(), "", "", NULL);
Expr* expr = Expr::CreateLiteral(&pool, type, value);
EXPECT_TRUE(expr != NULL);
@@ -629,7 +629,7 @@ void TestSingleLiteralConstruction(PrimitiveType type, void* value,
TEST_F(ExprTest, NullLiteral) {
for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
NullLiteral expr(static_cast<PrimitiveType>(type));
RuntimeState state(TUniqueId(), TQueryOptions(), "", NULL);
RuntimeState state(TUniqueId(), TQueryOptions(), "", "", NULL);
Status status = Expr::Prepare(&expr, &state, RowDescriptor(), disable_codegen_);
EXPECT_TRUE(status.ok());
EXPECT_TRUE(expr.GetValue(NULL) == NULL);
@@ -1352,10 +1352,6 @@ TEST_F(ExprTest, StringFunctions) {
TestIsNull("find_in_set(NULL, 'abc,ad,,ade,cde')", TYPE_INT);
TestIsNull("find_in_set('abc,def', NULL)", TYPE_INT);
TestIsNull("find_in_set(NULL, NULL)", TYPE_INT);
TestStringValue("version()", GetVersionString());
TestValue("sleep(100)", TYPE_BOOLEAN, true);
TestIsNull("sleep(NULL)", TYPE_BOOLEAN);
}
TEST_F(ExprTest, StringRegexpFunctions) {
@@ -1681,6 +1677,13 @@ TEST_F(ExprTest, StringParseUrlFunction) {
"index.html?test=true&name=networking&op=true', 'XYZ', 'name')", TYPE_STRING);
}
TEST_F(ExprTest, UtilityFunctions) {
TestStringValue("user()", "impala_test_user");
TestStringValue("version()", GetVersionString());
TestValue("sleep(100)", TYPE_BOOLEAN, true);
TestIsNull("sleep(NULL)", TYPE_BOOLEAN);
}
TEST_F(ExprTest, MathTrigonometricFunctions) {
// It is important to calculate the expected values
// using math functions, and not simply use constants.

View File

@@ -37,6 +37,10 @@ Status FunctionCall::Prepare(RuntimeState* state, const RowDescriptor& row_desc)
DCHECK(state != NULL);
DCHECK(!state->now()->NotADateTime());
result_.timestamp_val = *(state->now());
} else if (opcode_ == TExprOpcode::UTILITY_USER) {
// Set username from runtime state.
DCHECK(state != NULL);
result_.SetStringVal(state->user());
}
return Status::OK;
}

View File

@@ -22,6 +22,13 @@ using namespace std;
namespace impala {
void* UtilityFunctions::User(Expr* e, TupleRow* row) {
DCHECK_EQ(e->GetNumChildren(), 0);
// An empty string indicates the user wasn't set in the session
// or in the query request.
return (e->result_.string_val.len > 0) ? &e->result_.string_val : NULL;
}
void* UtilityFunctions::Version(Expr* e, TupleRow* row) {
DCHECK_EQ(e->GetNumChildren(), 0);
e->result_.SetStringVal(GetVersionString());

View File

@@ -24,6 +24,10 @@ class TupleRow;
class UtilityFunctions {
public:
// Implementation of the user() function. Returns the username of the user who executed
// this function.
static void* User(Expr* e, TupleRow* row);
// Implementation of the version() function. Returns the version string.
static void* Version(Expr* e, TupleRow* row);

View File

@@ -84,7 +84,7 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
class DataStreamTest : public testing::Test {
protected:
DataStreamTest()
: runtime_state_(TUniqueId(), TQueryOptions(), "", &exec_env_),
: runtime_state_(TUniqueId(), TQueryOptions(), "", "", &exec_env_),
next_val_(0) {}
virtual void SetUp() {

View File

@@ -77,9 +77,12 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
<< " instance_id=" << PrintId(params.fragment_instance_id);
VLOG(2) << "params:\n" << ThriftDebugString(params);
// The empty string is an illegal username indicating that the user was not set.
const string& user =
(request.query_globals.__isset.user) ? request.query_globals.user : "";
runtime_state_.reset(
new RuntimeState(params.fragment_instance_id, request.query_options,
request.query_globals.now_string, exec_env_));
request.query_globals.now_string, user, exec_env_));
// Reserve one main thread from the pool
runtime_state_->resource_pool()->AcquireThreadToken();

View File

@@ -46,21 +46,22 @@ namespace impala {
RuntimeState::RuntimeState(
const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
const string& now, ExecEnv* exec_env)
const string& now, const string& user, ExecEnv* exec_env)
: obj_pool_(new ObjectPool()),
data_stream_recvrs_pool_(new ObjectPool()),
unreported_error_idx_(0),
profile_(obj_pool_.get(), "Fragment " + PrintId(fragment_instance_id)),
query_mem_limit_(NULL),
is_cancelled_(false) {
Status status = Init(fragment_instance_id, query_options, now, exec_env);
Status status = Init(fragment_instance_id, query_options, now, user, exec_env);
DCHECK(status.ok());
}
RuntimeState::RuntimeState(const std::string& now)
RuntimeState::RuntimeState(const string& now, const string& user)
: obj_pool_(new ObjectPool()),
data_stream_recvrs_pool_(new ObjectPool()),
unreported_error_idx_(0),
user_(user),
profile_(obj_pool_.get(), "<unnamed>"),
query_mem_limit_(NULL) {
query_options_.batch_size = DEFAULT_BATCH_SIZE;
@@ -72,10 +73,11 @@ RuntimeState::~RuntimeState() {
Status RuntimeState::Init(
const TUniqueId& fragment_instance_id, const TQueryOptions& query_options,
const string& now, ExecEnv* exec_env) {
const string& now, const std::string& user, ExecEnv* exec_env) {
fragment_instance_id_ = fragment_instance_id;
query_options_ = query_options;
now_.reset(new TimestampValue(now.c_str(), now.size()));
user_ = user;
exec_env_ = exec_env;
if (!query_options.disable_codegen) {
RETURN_IF_ERROR(CreateCodegen());

View File

@@ -63,19 +63,19 @@ typedef std::map<std::string, std::string> FileMoveMap;
class RuntimeState {
public:
RuntimeState(const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options,
const std::string& now, ExecEnv* exec_env);
const TQueryOptions& query_options, const std::string& now,
const std::string& user, ExecEnv* exec_env);
// RuntimeState for executing expr in fe-support.
RuntimeState(const std::string& now);
RuntimeState(const std::string& now, const std::string& user);
// Empty d'tor to avoid issues with scoped_ptr.
~RuntimeState();
// Set per-query state.
Status Init(const TUniqueId& fragment_instance_id,
const TQueryOptions& query_options,
const std::string& now, ExecEnv* exec_env);
const TQueryOptions& query_options, const std::string& now,
const std::string& user, ExecEnv* exec_env);
ObjectPool* obj_pool() const { return obj_pool_.get(); }
const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
@@ -90,6 +90,7 @@ class RuntimeState {
int max_io_buffers() const { return query_options_.max_io_buffers; }
const TimestampValue* now() const { return now_.get(); }
void set_now(const TimestampValue* now);
const std::string& user() const { return user_; }
const std::vector<std::string>& error_log() const { return error_log_; }
const std::vector<std::pair<std::string, int> >& file_errors() const {
return file_errors_;
@@ -190,6 +191,9 @@ class RuntimeState {
// Stores the number of parse errors per file.
std::vector<std::pair<std::string, int> > file_errors_;
// Username of user that is executing the query to which this RuntimeState belongs.
std::string user_;
// Query-global timestamp, e.g., for implementing now().
// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues.
boost::scoped_ptr<TimestampValue> now_;

View File

@@ -57,7 +57,7 @@ Java_com_cloudera_impala_service_FeSupport_NativeEvalConstExpr(
DeserializeThriftMsg(env, thrift_predicate_bytes, &thrift_predicate);
TQueryGlobals query_globals;
DeserializeThriftMsg(env, thrift_query_globals_bytes, &query_globals);
RuntimeState state(query_globals.now_string);
RuntimeState state(query_globals.now_string, query_globals.user);
jbyteArray result_bytes = NULL;
JniLocalFrame jni_frame;
Expr* e;

View File

@@ -84,6 +84,7 @@ Status ImpaladQueryExecutor::Exec(
Query query;
query.query = query_string;
query.configuration = exec_options_;
query.hadoop_user = "impala_test_user";
query_results_.data.clear();
// TODO: catch exception and return error code

View File

@@ -148,6 +148,7 @@ functions = [
'StringFunctions::ParseUrl', ['parse_url']],
['String_Parse_Url', 'STRING', ['STRING', 'STRING', 'STRING'], \
'StringFunctions::ParseUrlKey', ['parse_url']],
['Utility_User', 'STRING', [], 'UtilityFunctions::User', ['user']],
['Utility_Sleep', 'BOOLEAN', ['INT'], 'UtilityFunctions::Sleep', ['sleep']],
['Utility_Version', 'STRING', [], 'UtilityFunctions::Version', ['version']],

View File

@@ -111,6 +111,9 @@ struct TPlanFragmentExecParams {
struct TQueryGlobals {
// String containing a timestamp set as the current time.
1: required string now_string
// Name of the user executing this query.
2: optional string user
}

View File

@@ -20,7 +20,6 @@ import com.cloudera.impala.authorization.User;
import com.cloudera.impala.catalog.AuthorizationException;
import com.cloudera.impala.catalog.Catalog;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.thrift.TQueryGlobals;
import com.google.common.base.Preconditions;
/**
@@ -36,13 +35,10 @@ public class AnalysisContext {
// The user who initiated the request.
private final User user;
private final TQueryGlobals queryGlobals;
public AnalysisContext(Catalog catalog, String defaultDb, User user) {
this.catalog = catalog;
this.defaultDatabase = defaultDb;
this.user = user;
this.queryGlobals = Analyzer.createQueryGlobals();
}
static public class AnalysisResult {
@@ -233,7 +229,7 @@ public class AnalysisContext {
if (result.stmt == null) {
return null;
}
result.analyzer = new Analyzer(catalog, defaultDatabase, user, queryGlobals);
result.analyzer = new Analyzer(catalog, defaultDatabase, user);
result.stmt.analyze(result.analyzer);
return result;
} catch (AnalysisException e) {
@@ -244,7 +240,4 @@ public class AnalysisContext {
throw new AnalysisException(parser.getErrorMsg(stmt), e);
}
}
public TQueryGlobals getQueryGlobals() { return queryGlobals; }
}

View File

@@ -125,24 +125,18 @@ public class Analyzer {
// all conjuncts of the Where clause
private final Set<ExprId> whereClauseConjuncts = Sets.newHashSet();
/**
* Analyzer constructor for AnalyzerTest.
* @param catalog
*/
public Analyzer(Catalog catalog) {
this(catalog, Catalog.DEFAULT_DB, new User(System.getProperty("user.name")),
createQueryGlobals());
}
public Analyzer(Catalog catalog, String defaultDb, User user,
TQueryGlobals queryGlobals) {
public Analyzer(Catalog catalog, String defaultDb, User user) {
this.parentAnalyzer = null;
this.catalog = catalog;
this.descTbl = new DescriptorTable();
this.defaultDb = defaultDb;
this.user = user;
this.conjunctIdGenerator = new IdGenerator<ExprId>();
this.queryGlobals = queryGlobals;
// Create query global parameters to be set in each TPlanExecRequest.
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
queryGlobals = new TQueryGlobals();
queryGlobals.setNow_string(formatter.format(Calendar.getInstance().getTime()));
queryGlobals.setUser(user.getName());
}
/**
@@ -746,19 +740,6 @@ public class Analyzer {
}
}
/**
* Create query global parameters to be set in each TPlanExecRequest.
*/
public static TQueryGlobals createQueryGlobals() {
SimpleDateFormat formatter =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
TQueryGlobals queryGlobals = new TQueryGlobals();
Calendar currentDate = Calendar.getInstance();
String nowStr = formatter.format(currentDate.getTime());
queryGlobals.setNow_string(nowStr);
return queryGlobals;
}
/**
* If the table name is fully qualified, the database from the TableName object will
* be returned. Otherwise the default analyzer database will be returned.

View File

@@ -405,7 +405,7 @@ public class Frontend {
}
// Global query parameters to be set in each TPlanExecRequest.
queryExecRequest.query_globals = analysisCtxt.getQueryGlobals();
queryExecRequest.query_globals = analysisResult.getAnalyzer().getQueryGlobals();
if (analysisResult.isQueryStmt()) {
// fill in the metadata

View File

@@ -49,8 +49,7 @@ public class AnalyzerTest {
}
protected Analyzer createAnalyzer(String defaultDb) {
return new Analyzer(catalog, defaultDb, new User(System.getProperty("user.name")),
Analyzer.createQueryGlobals());
return new Analyzer(catalog, defaultDb, new User(System.getProperty("user.name")));
}
@BeforeClass
@@ -126,7 +125,7 @@ public class AnalyzerTest {
* Analyze 'stmt', expecting it to pass. Asserts in case of analysis error.
*/
public ParseNode AnalyzesOk(String stmt) {
return AnalyzesOk(stmt, new Analyzer(catalog));
return AnalyzesOk(stmt, createAnalyzer(Catalog.DEFAULT_DB));
}
/**
@@ -160,7 +159,7 @@ public class AnalyzerTest {
* is non-null.
*/
public void AnalysisError(String stmt, String expectedErrorString) {
AnalysisError(stmt, new Analyzer(catalog), expectedErrorString);
AnalysisError(stmt, createAnalyzer(Catalog.DEFAULT_DB), expectedErrorString);
}
/**

View File

@@ -5,6 +5,7 @@ package com.cloudera.impala.service;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -215,4 +216,19 @@ public class JdbcTest {
assertFalse(rs.next());
rs.close();
}
@Test
public void testUtilityFunctions() throws SQLException {
ResultSet rs = con.createStatement().executeQuery("select user()");
try {
// We expect exactly one result row with a NULL inside the first column.
// The user() function returns NULL because we currently cannot set the user
// when establishing the Jdbc connection.
assertTrue(rs.next());
assertNull(rs.getString(1));
assertFalse(rs.next());
} finally {
rs.close();
}
}
}