IMPALA-387 Add refresh/invalidate SQL

This commit is contained in:
Alan Choi
2013-05-06 18:24:26 -07:00
committed by Henry Robinson
parent e7c6d57f9c
commit ecee109e68
25 changed files with 264 additions and 167 deletions

View File

@@ -30,6 +30,8 @@ const Status Status::OK;
const Status Status::CANCELLED(TStatusCode::CANCELLED, "Cancelled", true);
const Status Status::MEM_LIMIT_EXCEEDED(
TStatusCode::MEM_LIMIT_EXCEEDED, "Memory limit exceeded", true);
const Status Status::DEPRECATED_RPC(TStatusCode::NOT_IMPLEMENTED_ERROR,
"Deprecated RPC; please update your client", true);
Status::ErrorDetail::ErrorDetail(const TStatus& status)
: error_code(status.status_code),

View File

@@ -49,6 +49,7 @@ class Status {
static const Status OK;
static const Status CANCELLED;
static const Status MEM_LIMIT_EXCEEDED;
static const Status DEPRECATED_RPC;
// copy c'tor makes copy of error detail so Status can be returned by value
Status(const Status& status)

View File

@@ -91,6 +91,8 @@ Status DdlExecutor::Exec(const TDdlExecRequest& exec_request,
return frontend_->DropDatabase(exec_request.drop_db_params);
case TDdlType::DROP_TABLE:
return frontend_->DropTable(exec_request.drop_table_params);
case TDdlType::RESET_METADATA:
return frontend_->ResetMetadata(exec_request.reset_metadata_params);
default: {
stringstream ss;
ss << "Unknown DDL exec request type: " << exec_request.ddl_type;

View File

@@ -50,8 +50,6 @@ Frontend::Frontend() {
{"<init>", "(ZLjava/lang/String;Ljava/lang/String;)V", &fe_ctor_},
{"createExecRequest", "([B)[B", &create_exec_request_id_},
{"getExplainPlan", "([B)Ljava/lang/String;", &get_explain_plan_id_},
{"resetCatalog", "()V", &reset_catalog_id_},
{"resetTable", "([B)V", &reset_table_id_},
{"getHadoopConfig", "(Z)Ljava/lang/String;", &get_hadoop_config_id_},
{"getHadoopConfigValue", "(Ljava/lang/String;)Ljava/lang/String;",
&get_hadoop_config_value_id_},
@@ -67,6 +65,7 @@ Frontend::Frontend() {
{"createDatabase", "([B)V", &create_database_id_},
{"dropTable", "([B)V", &drop_table_id_},
{"dropDatabase", "([B)V", &drop_database_id_},
{"resetMetadata", "([B)V", &reset_metadata_id_},
{"loadTableData", "([B)[B", &load_table_data_id_}};
JNIEnv* jni_env = getJNIEnv();
@@ -176,6 +175,10 @@ Status Frontend::DropTable(const TDropTableParams& params) {
return CallJniMethodWithThriftArgs(drop_table_id_, params);
}
Status Frontend::ResetMetadata(const TResetMetadataParams& params) {
return CallJniMethodWithThriftArgs(reset_metadata_id_, params);
}
Status Frontend::DescribeTable(const TDescribeTableParams& params,
TDescribeTableResult* response) {
return CallJniMethodWithThriftArgs(describe_table_id_, params, response);
@@ -220,19 +223,6 @@ Status Frontend::GetExplainPlan(
get_explain_plan_id_, query_request, explain_string);
}
Status Frontend::ResetCatalog() {
JNIEnv* jni_env = getJNIEnv();
jni_env->CallObjectMethod(fe_, reset_catalog_id_);
RETURN_ERROR_IF_EXC(jni_env);
return Status::OK;
}
Status Frontend::ResetTable(const TResetTableReq& reset_table_request) {
LOG(INFO) << "Resetting table: "
<< reset_table_request.db_name << "." << reset_table_request.table_name;
return CallJniMethodWithThriftArgs(reset_table_id_, reset_table_request);
}
Status Frontend::ValidateSettings() {
// Use FE to check Hadoop config setting
// TODO: check OS setting

View File

@@ -45,13 +45,6 @@ class Frontend {
// Call FE to get TClientRequestResult.
Status GetExecRequest(const TClientRequest& request, TExecRequest* result);
// Performs a full catalog metadata reset, invalidating all table and database metadata.
Status ResetCatalog();
// Resets the specified table's catalog metadata, forcing a reload on the next access.
// Returns an error if the table or database was not found in the catalog.
Status ResetTable(const TResetTableReq& reset_table_request);
// Returns all matching table names, per Hive's "SHOW TABLES <pattern>". Each
// table name returned is unqualified.
// If pattern is NULL, match all tables otherwise match only those tables that
@@ -118,6 +111,9 @@ class Frontend {
// successful, otherwise CANCELLED is returned.
Status DropTable(const TDropTableParams& drop_table_params);
// Reset the metadata
Status ResetMetadata(const TResetMetadataParams& reset_metadata_params);
// Validate Hadoop config; requires FE
Status ValidateSettings();
@@ -149,8 +145,6 @@ class Frontend {
jmethodID get_hadoop_config_id_; // JniFrontend.getHadoopConfig()
jmethodID get_hadoop_config_value_id_; // JniFrontend.getHadoopConfigValue
jmethodID check_hadoop_config_id_; // JniFrontend.checkHadoopConfig()
jmethodID reset_catalog_id_; // JniFrontend.resetCatalog()
jmethodID reset_table_id_; // JniFrontend.resetTable
jmethodID update_metastore_id_; // JniFrontend.updateMetastore()
jmethodID get_table_names_id_; // JniFrontend.getTableNames
jmethodID describe_table_id_; // JniFrontend.describeTable
@@ -162,6 +156,7 @@ class Frontend {
jmethodID create_table_like_id_; // JniFrontend.createTableLike
jmethodID drop_database_id_; // JniFrontend.dropDatabase
jmethodID drop_table_id_; // JniFrontend.dropTable
jmethodID reset_metadata_id_; // JniFrontend.resetMetadata
jmethodID load_table_data_id_; // JniFrontend.loadTableData
jmethodID fe_ctor_;

View File

@@ -409,11 +409,11 @@ void ImpalaServer::PingImpalaService(TPingImpalaServiceResp& return_val) {
}
void ImpalaServer::ResetCatalog(impala::TStatus& status) {
ResetCatalogInternal().ToThrift(&status);
Status::DEPRECATED_RPC.ToThrift(&status);
}
void ImpalaServer::ResetTable(impala::TStatus& status, const TResetTableReq& request) {
frontend_->ResetTable(request).ToThrift(&status);
Status::DEPRECATED_RPC.ToThrift(&status);
}
void ImpalaServer::SessionStart(const ThriftServer::SessionContext& session_context) {

View File

@@ -634,19 +634,6 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
}
}
void ImpalaServer::ResetCatalog(TResetCatalogResp& return_val) {
VLOG_RPC << "ResetCatalog()";
ResetCatalogInternal().ToThrift(&return_val.status);
VLOG_RPC << "ResetCatalog(): return_val=" << ThriftDebugString(return_val);
}
void ImpalaServer::ResetTable(TResetTableResp& return_val,
const TResetTableReq& request) {
VLOG_RPC << "ResetTable(): request=" << ThriftDebugString(request);
frontend_->ResetTable(request).ToThrift(&return_val.status);
VLOG_RPC << "ResetTable(): return_val=" << ThriftDebugString(return_val);
}
inline void ImpalaServer::THandleIdentifierToTUniqueId(
const apache::hive::service::cli::thrift::THandleIdentifier &handle,
TUniqueId* unique_id, TUniqueId* secret) {

View File

@@ -1029,21 +1029,6 @@ Status ImpalaServer::UpdateCatalogMetrics() {
return Status::OK;
}
Status ImpalaServer::ResetCatalogInternal() {
LOG(INFO) << "Refreshing catalog";
RETURN_IF_ERROR(frontend_->ResetCatalog());
ImpaladMetrics::IMPALA_SERVER_LAST_REFRESH_TIME->Update(
TimestampValue::local_time().DebugString());
Status status = UpdateCatalogMetrics();
if (!status.ok()) {
VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetErrorMsg();
}
return Status::OK;
}
Status ImpalaServer::CancelInternal(const TUniqueId& query_id) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);

View File

@@ -188,8 +188,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
const apache::hive::service::cli::thrift::TFetchResultsReq& request);
virtual void GetLog(apache::hive::service::cli::thrift::TGetLogResp& return_val,
const apache::hive::service::cli::thrift::TGetLogReq& request);
virtual void ResetCatalog(TResetCatalogResp& return_val);
virtual void ResetTable(TResetTableResp& return_val, const TResetTableReq& request);
// ImpalaService common extensions (implemented in impala-server.cc)
// ImpalaInternalService rpcs
@@ -335,9 +333,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
// Returns true if it found a registered exec_state, otherwise false.
bool UnregisterQuery(const TUniqueId& query_id);
// Non-thrift callable version of ResetCatalog
Status ResetCatalogInternal();
// Initiates query cancellation. Returns OK unless query_id is not found.
// Queries still need to be unregistered, usually via Close, after cancellation.
// Caller should not hold any locks when calling this function.

View File

@@ -82,7 +82,7 @@ def exec_hive_query_from_file(file_name):
sys.exit(ret_val)
def exec_impala_query_from_file(file_name):
impala_refresh_cmd = "%s --impalad=%s -q 'invalidate'" %\
impala_refresh_cmd = "%s --impalad=%s -q 'invalidate metadata'" %\
(IMPALA_SHELL_CMD, options.impala_shell_args)
impala_cmd = "%s --impalad=%s -f %s" %\
(IMPALA_SHELL_CMD, options.impala_shell_args, file_name)

View File

@@ -371,6 +371,17 @@ struct TDropTableParams {
2: required bool if_exists
}
// Parameters of REFRESH/INVALIDATE METADATA commands
// NOTE: This struct should only be used for intra-process communication.
struct TResetMetadataParams {
// If true, refresh. Otherwise, invalidate metadata
1: required bool is_refresh
// Fully qualified name of the table to refresh or invalidate; not set if invalidating
// the entire catalog
2: optional TTableName table_name
}
struct TClientRequest {
// select stmt to be executed
1: required string stmt
@@ -515,6 +526,7 @@ enum TDdlType {
CREATE_TABLE_LIKE,
DROP_DATABASE,
DROP_TABLE,
RESET_METADATA
}
struct TDdlExecRequest {
@@ -549,6 +561,9 @@ struct TDdlExecRequest {
// Parameters for DROP TABLE
11: optional TDropTableParams drop_table_params
// Parameters for REFRESH/INVALIDATE METADATA
12: optional TResetMetadataParams reset_metadata_params
}
// HiveServer2 Metadata operations (JniFrontend.hiveServer2MetadataOperation)

View File

@@ -147,29 +147,14 @@ struct TPingImpalaServiceResp {
1: string version
}
// Parameters for a ResetTable request which will reset a table's metadata.
// If is_refresh is true, the metadata will be refreshed immediately.
// Otherwise, the metadata will be invalidated and then loaded on the next
// access.
// Parameters for a ResetTable request which will invalidate a table's metadata.
// DEPRECATED.
struct TResetTableReq {
// Name of the table's parent database.
1: required string db_name
// Name of the table.
2: required string table_name
// If set to true, refresh the metadata immediately. Otherwise, invalidate the metadata
3: optional bool is_refresh
}
// Response from call to ResetCatalog
struct TResetCatalogResp {
1: required Status.TStatus status
}
// Response from call to ResetTable
struct TResetTableResp {
1: required Status.TStatus status
}
// For all rpc that return a TStatus as part of their result type,
@@ -186,9 +171,11 @@ service ImpalaService extends beeswax.BeeswaxService {
throws(1:beeswax.BeeswaxException error);
// Invalidates all catalog metadata, forcing a reload
// DEPRECATED; execute query "invalidate metadata" to refresh metadata
Status.TStatus ResetCatalog();
// Invalidates a specific table's catalog metadata, forcing a reload on the next access
// DEPRECATED; execute query "refresh <table>" to refresh metadata
Status.TStatus ResetTable(1:TResetTableReq request)
// Returns the runtime profile string for the given query handle.
@@ -206,9 +193,4 @@ service ImpalaService extends beeswax.BeeswaxService {
// Impala HiveServer2 service
service ImpalaHiveServer2Service extends cli_service.TCLIService {
// Invalidates all catalog metadata, forcing a reload
TResetCatalogResp ResetCatalog();
// Invalidates a specific table's catalog metadata, forcing a reload on the next access
TResetTableResp ResetTable(1:TResetTableReq request);
}

View File

@@ -174,14 +174,15 @@ terminal KW_ADD, KW_AND, KW_ALL, KW_ALTER, KW_AS, KW_ASC, KW_AVG, KW_BETWEEN, KW
KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DIV,
KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTERNAL,
KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FLOAT, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL,
KW_GROUP, KW_HAVING, KW_IF, KW_IN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,KW_INTERVAL,
KW_INTO, KW_IS, KW_JOIN, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION,
KW_MIN, KW_MAX, KW_NOT, KW_NULL, KW_ON, KW_OR, KW_ORDER, KW_OUTER, KW_OVERWRITE,
KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED, KW_RCFILE, KW_REGEXP, KW_RENAME,
KW_REPLACE, KW_RIGHT, KW_RLIKE, KW_ROW, KW_SCHEMA, KW_SCHEMAS, KW_SELECT,
KW_SEQUENCEFILE, KW_SET, KW_SHOW, KW_SEMI, KW_SMALLINT, KW_STORED, KW_STRING, KW_SUM,
KW_TABLE, KW_TABLES, KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT,
KW_TO, KW_TRUE, KW_UNION, KW_USE, KW_USING, KW_VALUES, KW_WHEN, KW_WHERE, KW_WITH;
KW_GROUP, KW_HAVING, KW_IF, KW_IN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERVAL,
KW_INTO, KW_IS, KW_INVALIDATE, KW_JOIN, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD,
KW_LOCATION, KW_MIN, KW_MAX, KW_METADATA, KW_NOT, KW_NULL, KW_ON, KW_OR, KW_ORDER,
KW_OUTER, KW_OVERWRITE, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED, KW_RCFILE,
KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE, KW_RIGHT, KW_RLIKE, KW_ROW, KW_SCHEMA,
KW_SCHEMAS, KW_SELECT, KW_SEQUENCEFILE, KW_SET, KW_SHOW, KW_SEMI, KW_SMALLINT,
KW_STORED, KW_STRING, KW_SUM, KW_TABLE, KW_TABLES, KW_TERMINATED, KW_TEXTFILE, KW_THEN,
KW_TIMESTAMP, KW_TINYINT, KW_TO, KW_TRUE, KW_UNION, KW_USE, KW_USING, KW_VALUES,
KW_WHEN, KW_WHERE, KW_WITH;
terminal COMMA, DOT, STAR, LPAREN, RPAREN, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -215,6 +216,7 @@ nonterminal String show_pattern;
nonterminal DescribeStmt describe_stmt;
nonterminal TDescribeTableOutputStyle describe_output_style;
nonterminal LoadDataStmt load_stmt;
nonterminal ResetMetadataStmt reset_metadata_stmt;
// List of select blocks connected by UNION operators, with order by or limit.
nonterminal QueryStmt union_with_order_by_or_limit;
nonterminal SelectList select_clause;
@@ -342,6 +344,8 @@ stmt ::=
{: RESULT = explain; :}
| load_stmt: load
{: RESULT = load; :}
| reset_metadata_stmt: reset_metadata
{: RESULT = reset_metadata; :}
;
load_stmt ::=
@@ -357,6 +361,15 @@ overwrite_val ::=
{: RESULT = Boolean.FALSE; :}
;
reset_metadata_stmt ::=
KW_INVALIDATE KW_METADATA
{: RESULT = new ResetMetadataStmt(null, false); :}
| KW_INVALIDATE KW_METADATA table_name:table
{: RESULT = new ResetMetadataStmt(table, false); :}
| KW_REFRESH table_name:table
{: RESULT = new ResetMetadataStmt(table, true); :}
;
explain_stmt ::=
KW_EXPLAIN query_stmt:query
{:

View File

@@ -101,6 +101,10 @@ public class AnalysisContext {
return stmt instanceof DescribeStmt;
}
public boolean isResetMetadataStmt() {
return stmt instanceof ResetMetadataStmt;
}
public boolean isExplainStmt() {
if (isQueryStmt()) return ((QueryStmt)stmt).isExplain();
if (isInsertStmt()) return ((InsertStmt)stmt).isExplain();
@@ -110,7 +114,8 @@ public class AnalysisContext {
public boolean isDdlStmt() {
return isUseStmt() || isShowTablesStmt() || isShowDbsStmt() || isDescribeStmt() ||
isCreateTableLikeStmt() || isCreateTableStmt() || isCreateDbStmt() ||
isDropDbStmt() || isDropTableStmt() || isAlterTableStmt();
isDropDbStmt() || isDropTableStmt() || isAlterTableStmt() ||
isResetMetadataStmt();
}
public boolean isDmlStmt() {

View File

@@ -0,0 +1,80 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.analysis;
import com.cloudera.impala.authorization.Privilege;
import com.cloudera.impala.authorization.PrivilegeRequest;
import com.cloudera.impala.catalog.AuthorizationException;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.thrift.TResetMetadataParams;
import com.cloudera.impala.thrift.TTableName;
import com.google.common.base.Preconditions;
/**
* Representation of a REFRESH/INVALIDATE METADATA statement.
*/
public class ResetMetadataStmt extends StatementBase {
// Updated during analysis. Null if invalidating the entire catalog.
private TableName tableName;
// true if it is a REFRESH statement.
private final boolean isRefresh;
public ResetMetadataStmt(TableName name, boolean isRefresh) {
Preconditions.checkArgument(!isRefresh || name != null);
this.tableName = name;
this.isRefresh = isRefresh;
}
public TableName getTableName() { return tableName; }
public boolean isRefresh() { return isRefresh; }
@Override
public void analyze(Analyzer analyzer) throws AnalysisException,
AuthorizationException {
if (tableName != null) {
String dbName = analyzer.getTargetDbName(tableName);
tableName = new TableName(dbName, tableName.getTbl());
if (!analyzer.dbContainsTable(dbName, tableName.getTbl(), Privilege.ANY)) {
throw new AnalysisException(Analyzer.TBL_DOES_NOT_EXIST_ERROR_MSG + tableName);
}
} else {
PrivilegeRequest privilegeRequest = new PrivilegeRequest(Privilege.ALL);
analyzer.getCatalog().checkAccess(analyzer.getUser(), privilegeRequest);
}
}
@Override
public String toSql() {
StringBuilder result = new StringBuilder();
if (isRefresh) {
result.append("INVALIDATE METADATA");
} else {
result.append("REFRESH");
}
if (tableName != null) result.append(" ").append(tableName);
return result.toString();
}
public TResetMetadataParams toThrift() {
TResetMetadataParams params = new TResetMetadataParams();
params.setIs_refresh(isRefresh);
if (tableName != null) {
params.setTable_name(new TTableName(tableName.getDb(), tableName.getTbl()));
}
return params;
}
}

View File

@@ -74,7 +74,10 @@ public class AuthorizationChecker {
List<Authorizable> authorizeables = Lists.newArrayList();
authorizeables.add(new org.apache.access.core.Server(config.getServerName()));
authorizeables.addAll(request.getAuthorizeable().getHiveAuthorizeableHierarchy());
// If request.getAuthorizeable() is null, the request is for server-level permission.
if (request.getAuthorizeable() != null) {
authorizeables.addAll(request.getAuthorizeable().getHiveAuthorizeableHierarchy());
}
// The Hive Access API does not currently provide a way to check if the user
// has any privileges on a given resource.

View File

@@ -17,7 +17,8 @@ package com.cloudera.impala.authorization;
import com.google.common.base.Preconditions;
/*
* Represents a privilege request in the context of an Authorizeable object.
* Represents a privilege request in the context of an Authorizeable object. If no
* Authorizeable object is provided, it represents a privilege request on the server.
* For example, SELECT on table Foo in database Bar.
*/
public class PrivilegeRequest {
@@ -31,18 +32,24 @@ public class PrivilegeRequest {
this.privilege = privilege;
}
public PrivilegeRequest(Privilege privilege) {
Preconditions.checkNotNull(privilege);
this.authorizeable = null;
this.privilege = privilege;
}
/*
* Returns Authorizeable object.
* Returns Authorizeable object. Null if the request is for server-level permission.
*/
public Authorizeable getAuthorizeable() {
return authorizeable;
}
/*
* Name of the Authorizeable.
* Name of the Authorizeable. Authorizeable refers to the server if it's null.
*/
public String getName() {
return authorizeable.getName();
return (authorizeable != null) ? authorizeable.getName() : "server";
}
/*

View File

@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import com.cloudera.impala.analysis.AnalysisContext;
import com.cloudera.impala.analysis.InsertStmt;
import com.cloudera.impala.analysis.QueryStmt;
import com.cloudera.impala.analysis.ResetMetadataStmt;
import com.cloudera.impala.analysis.TableName;
import com.cloudera.impala.authorization.AuthorizationConfig;
import com.cloudera.impala.authorization.ImpalaInternalAdminUser;
@@ -45,6 +46,7 @@ import com.cloudera.impala.authorization.Privilege;
import com.cloudera.impala.authorization.User;
import com.cloudera.impala.catalog.AuthorizationException;
import com.cloudera.impala.catalog.Catalog;
import com.cloudera.impala.catalog.CatalogException;
import com.cloudera.impala.catalog.DatabaseNotFoundException;
import com.cloudera.impala.catalog.Db;
import com.cloudera.impala.catalog.FileFormat;
@@ -83,6 +85,7 @@ import com.cloudera.impala.thrift.TMetadataOpResponse;
import com.cloudera.impala.thrift.TPlanFragment;
import com.cloudera.impala.thrift.TPrimitiveType;
import com.cloudera.impala.thrift.TQueryExecRequest;
import com.cloudera.impala.thrift.TResetMetadataParams;
import com.cloudera.impala.thrift.TResultRow;
import com.cloudera.impala.thrift.TResultSetMetadata;
import com.cloudera.impala.thrift.TStmtType;
@@ -111,7 +114,7 @@ public class Frontend {
/**
* Invalidates all catalog metadata, forcing a reload.
*/
public void resetCatalog() {
private void resetCatalog() {
catalog.close();
catalog = new Catalog(lazyCatalog, true, authzConfig);
}
@@ -120,14 +123,9 @@ public class Frontend {
* If isRefresh is false, invalidates a specific table's metadata, forcing the
* metadata to be reloaded on the next access.
* If isRefresh is true, performs an immediate incremental refresh.
* @throws DatabaseNotFoundException - If the specified database does not exist.
* @throws TableNotFoundException - If the specified table does not exist.
*/
public void resetTable(String dbName, String tableName, boolean isRefresh)
throws DatabaseNotFoundException, TableNotFoundException, AuthorizationException {
// TODO: Currently refresh commands are not authorized. Once REFRESH becomes a SQL
// statement (IMPALA-339) the authorization code can be added in.
// For now just use the internal Admin user.
private void resetTable(String dbName, String tableName, boolean isRefresh)
throws CatalogException {
Db db = catalog.getDb(dbName, ImpalaInternalAdminUser.getInstance(), Privilege.ANY);
if (db == null) {
throw new DatabaseNotFoundException("Database not found: " + dbName);
@@ -202,6 +200,11 @@ public class Frontend {
ddl.ddl_type = TDdlType.DROP_TABLE;
ddl.setDrop_table_params(analysis.getDropTableStmt().toThrift());
metadata.setColumnDescs(Collections.<TColumnDesc>emptyList());
} else if (analysis.isResetMetadataStmt()) {
ddl.ddl_type = TDdlType.RESET_METADATA;
ResetMetadataStmt resetMetadataStmt = (ResetMetadataStmt) analysis.getStmt();
ddl.setReset_metadata_params(resetMetadataStmt.toThrift());
metadata.setColumnDescs(Collections.<TColumnDesc>emptyList());
}
result.setResult_set_metadata(metadata);
@@ -604,4 +607,19 @@ public class Frontend {
// Refresh the table metadata.
resetTable(dbName, tblName, true);
}
/**
* Execute a reset metadata statement.
*/
public void execResetMetadata(TResetMetadataParams params)
throws CatalogException {
if (params.isSetTable_name()) {
resetTable(params.getTable_name().getDb_name(),
params.getTable_name().getTable_name(), params.isIs_refresh());
} else {
// Invalidate the catalog if no table name is provided.
Preconditions.checkArgument(!params.isIs_refresh());
resetCatalog();
}
}
}

View File

@@ -74,9 +74,8 @@ import com.cloudera.impala.thrift.TLoadDataReq;
import com.cloudera.impala.thrift.TLoadDataResp;
import com.cloudera.impala.thrift.TMetadataOpRequest;
import com.cloudera.impala.thrift.TMetadataOpResponse;
import com.cloudera.impala.thrift.TPartitionKeyValue;
import com.cloudera.impala.thrift.TResetMetadataParams;
import com.google.common.base.Preconditions;
import com.cloudera.impala.thrift.TResetTableReq;
/**
* JNI-callable interface onto a wrapped Frontend instance. The main point is to serialise
@@ -700,15 +699,10 @@ public class JniFrontend {
return "";
}
public void resetTable(byte[] thriftResetTableRequest)
public void resetMetadata(byte[] thriftResetMetadataRequest)
throws ImpalaException {
TResetTableReq request = new TResetTableReq();
deserializeThrift(request, thriftResetTableRequest);
frontend.resetTable(request.getDb_name(), request.getTable_name(),
request.isSetIs_refresh() && request.isIs_refresh());
}
public void resetCatalog() {
frontend.resetCatalog();
TResetMetadataParams request = new TResetMetadataParams();
deserializeThrift(request, thriftResetMetadataRequest);
frontend.execResetMetadata(request);
}
}

View File

@@ -108,6 +108,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("into", new Integer(SqlParserSymbols.KW_INTO));
keywordMap.put("int", new Integer(SqlParserSymbols.KW_INT));
keywordMap.put("integer", new Integer(SqlParserSymbols.KW_INT));
keywordMap.put("invalidate", new Integer(SqlParserSymbols.KW_INVALIDATE));
keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT));
keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE));
@@ -115,8 +116,9 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
keywordMap.put("load", new Integer(SqlParserSymbols.KW_LOAD));
keywordMap.put("location", new Integer(SqlParserSymbols.KW_LOCATION));
keywordMap.put("min", new Integer(SqlParserSymbols.KW_MIN));
keywordMap.put("max", new Integer(SqlParserSymbols.KW_MAX));
keywordMap.put("metadata", new Integer(SqlParserSymbols.KW_METADATA));
keywordMap.put("min", new Integer(SqlParserSymbols.KW_MIN));
keywordMap.put("not", new Integer(SqlParserSymbols.KW_NOT));
keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL));
keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON));
@@ -130,6 +132,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols;
keywordMap.put("partitioned", new Integer(SqlParserSymbols.KW_PARTITIONED));
keywordMap.put("rcfile", new Integer(SqlParserSymbols.KW_RCFILE));
keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP));
keywordMap.put("rename", new Integer(SqlParserSymbols.KW_RENAME));
keywordMap.put("replace", new Integer(SqlParserSymbols.KW_REPLACE));

View File

@@ -345,6 +345,24 @@ public class AnalyzerTest {
"Failed to load metadata for table: functional.bad_serde");
}
@Test
public void TestResetMetadata() {
AnalyzesOk("invalidate metadata");
AnalyzesOk("invalidate metadata functional.alltypessmall");
AnalyzesOk("invalidate metadata functional.bad_serde");
AnalyzesOk("refresh functional.alltypessmall");
AnalyzesOk("refresh functional.bad_serde");
AnalysisError("invalidate metadata functional.unknown_table",
"Table does not exist: functional.unknown_table");
AnalysisError("invalidate metadata unknown_db.unknown_table",
"Database does not exist: unknown_db");
AnalysisError("refresh functional.unknown_table",
"Table does not exist: functional.unknown_table");
AnalysisError("refresh unknown_db.unknown_table",
"Database does not exist: unknown_db");
}
@Test
public void TestExplain() {
// Analysis error from explain insert: too many partitioning columns.

View File

@@ -28,6 +28,7 @@ import org.apache.hive.service.cli.thrift.TGetTablesReq;
import org.junit.Test;
import com.cloudera.impala.authorization.AuthorizationConfig;
import com.cloudera.impala.authorization.ImpalaInternalAdminUser;
import com.cloudera.impala.authorization.User;
import com.cloudera.impala.catalog.AuthorizationException;
import com.cloudera.impala.catalog.Catalog;
@@ -54,6 +55,7 @@ public class AuthorizationTest {
private final static String AUTHZ_POLICY_FILE = "/test-warehouse/authz-policy.ini";
private final static User USER = new User(System.getProperty("user.name"));
private final AnalysisContext analysisContext;
private final AnalysisContext adminUserAnalysisContext;
private final Frontend fe;
public AuthorizationTest() throws IOException {
@@ -61,6 +63,8 @@ public class AuthorizationTest {
new AuthorizationConfig("server1", AUTHZ_POLICY_FILE);
Catalog catalog = new Catalog(true, false, authzConfig);
analysisContext = new AnalysisContext(catalog, Catalog.DEFAULT_DB, USER);
adminUserAnalysisContext = new AnalysisContext(catalog,
Catalog.DEFAULT_DB, ImpalaInternalAdminUser.getInstance());
fe = new Frontend(true, authzConfig);
}
@@ -209,6 +213,30 @@ public class AuthorizationTest {
}
}
@Test
public void TestResetMetadata() throws AnalysisException, AuthorizationException {
// Positive cases (user has privileges on these tables).
AuthzOk("invalidate metadata functional.alltypesagg");
AuthzOk("refresh functional.alltypesagg");
// TODO: Use a real user to run this positive test case once
// AuthorizationChecker supports LocalGroupAuthorizationProvider.
adminUserAnalysisContext.analyze("invalidate metadata");
AuthzError("invalidate metadata",
"User '%s' does not have privileges to access: server");
AuthzError("invalidate metadata unknown_db.alltypessmall",
"User '%s' does not have privileges to access: unknown_db.alltypessmall");
AuthzError("invalidate metadata functional_seq.alltypessmall",
"User '%s' does not have privileges to access: functional_seq.alltypessmall");
AuthzError("invalidate metadata functional.unknown_table",
"User '%s' does not have privileges to access: functional.unknown_table");
AuthzError("invalidate metadata functional.alltypessmall",
"User '%s' does not have privileges to access: functional.alltypessmall");
AuthzError("refresh functional.alltypessmall",
"User '%s' does not have privileges to access: functional.alltypessmall");
}
@Test
public void TestCreateTable() throws AnalysisException, AuthorizationException {
AuthzOk("create table tpch.new_table (i int)");

View File

@@ -1418,6 +1418,20 @@ public class ParserTest {
ParsesOk("SELECT CAST(a as REAL) from tbl");
}
@Test
public void TestResetMetadata() {
ParsesOk("invalidate metadata");
ParsesOk("invalidate metadata Foo");
ParsesOk("invalidate metadata Foo.S");
ParsesOk("refresh Foo");
ParsesOk("refresh Foo.S");
ParserError("invalidate");
ParserError("invalidate metadata Foo.S.S");
ParserError("REFRESH Foo.S.S");
ParserError("refresh");
}
@Test
public void TestGetErrorMsg() {
@@ -1427,8 +1441,8 @@ public class ParserTest {
"c, b, c from t\n" +
"^\n" +
"Encountered: IDENTIFIER\n" +
"Expected: ALTER, CREATE, DESCRIBE, DROP, EXPLAIN, INSERT, LOAD, SELECT, SHOW, " +
"USE, VALUES, WITH\n");
"Expected: ALTER, CREATE, DESCRIBE, DROP, EXPLAIN, INSERT, INVALIDATE, LOAD, " +
"REFRESH, SELECT, SHOW, USE, VALUES, WITH\n");
// missing select list
ParserError("select from t",

View File

@@ -346,7 +346,7 @@ class ImpalaShell(cmd.Cmd):
self.__print_if_verbose('Server version: %s' % self.server_version)
self.prompt = "[%s:%s] > " % self.impalad
if self.refresh_after_connect:
self.cmdqueue.append('refresh' + ImpalaShell.CMD_DELIM)
self.cmdqueue.append('invalidate metadata' + ImpalaShell.CMD_DELIM)
if self.current_db:
self.cmdqueue.append('use %s' % self.current_db + ImpalaShell.CMD_DELIM)
self.__build_default_query_options_dict()
@@ -815,47 +815,6 @@ class ImpalaShell(cmd.Cmd):
print_to_stderr(explanation.textual)
return True
def do_refresh(self, args):
"""Fast incremental refresh an Impala table metadata"""
status = RpcStatus.ERROR
if not args:
print 'Usage: refresh [databaseName.][tableName]'
return False
else:
db_table_name = self.__parse_table_name_arg(args)
if db_table_name is None:
print 'Usage: refresh [databaseName.][tableName]'
return False
reset_table_req = TResetTableReq(*db_table_name)
reset_table_req.is_refresh = True
start = time.time()
(_, status) = self.__do_rpc(
lambda: self.imp_service.ResetTable(reset_table_req))
end = time.time()
if status == RpcStatus.OK:
self.__print_if_verbose("Successfully refreshed table: %s in %2.2fs" % \
('.'.join(db_table_name), (end-start)))
return status == RpcStatus.OK
def do_invalidate(self, args):
"""Refresh the Impalad catalog"""
status = RpcStatus.ERROR
if not args:
(_, status) = self.__do_rpc(lambda: self.imp_service.ResetCatalog())
if status == RpcStatus.OK:
self.__print_if_verbose("Successfully refreshed catalog")
else:
db_table_name = self.__parse_table_name_arg(args)
if db_table_name is None:
print_to_stderr('Usage: refresh [databaseName.][tableName]')
return False
(_, status) = self.__do_rpc(
lambda: self.imp_service.ResetTable(TResetTableReq(*db_table_name)))
if status == RpcStatus.OK:
self.__print_if_verbose(
"Successfully invalidated table: %s" % '.'.join(db_table_name))
return status == RpcStatus.OK
def do_history(self, args):
"""Display command history"""
# Deal with readline peculiarity. When history does not exists,
@@ -885,8 +844,10 @@ class ImpalaShell(cmd.Cmd):
print_to_stderr('Unable to save history: %s' % i)
def default(self, args):
print_to_stderr("Unrecognized command")
return True
query = self.__create_beeswax_query_handle()
query.query = args
query.configuration = self.__options_to_string_list()
return self.__execute_query(query)
def emptyline(self):
"""If an empty line is entered, do nothing"""

View File

@@ -208,13 +208,12 @@ class ImpalaBeeswaxClient(object):
return self.__do_rpc(lambda: self.imp_service.get_state(query_handle))
def refresh(self):
"""Reload the Impalad catalog"""
return self.__do_rpc(lambda: self.imp_service.ResetCatalog()) == 0
"""Invalidate the Impalad catalog"""
return self.__execute_query("invalidate metadata")
def refresh_table(self, db_name, table_name):
"""Reload a specific table from the catalog"""
return self.__do_rpc(lambda:\
self.imp_service.ResetTable(TResetTableReq(db_name, table_name))) == 0
"""Refresh a specific table from the catalog"""
return self.__execute_query("refresh %s.%s" % (db_name, table_name))
def fetch_results(self, query_string, query_handle):
"""Fetches query results given a handle and query type (insert, use, other)"""