diff --git a/be/src/common/status.cc b/be/src/common/status.cc index 443f75706..932dabdab 100644 --- a/be/src/common/status.cc +++ b/be/src/common/status.cc @@ -61,6 +61,28 @@ Status& Status::operator=(const TStatus& status) { return *this; } +Status::Status(const apache::hive::service::cli::thrift::TStatus& hs2_status) + : error_detail_( + hs2_status.statusCode + == apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS ? NULL + : new ErrorDetail( + static_cast(hs2_status.statusCode), + hs2_status.errorMessage)) { +} + +Status& Status::operator=( + const apache::hive::service::cli::thrift::TStatus& hs2_status) { + delete error_detail_; + if (hs2_status.statusCode + == apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS) { + error_detail_ = NULL; + } else { + error_detail_ = new ErrorDetail( + static_cast(hs2_status.statusCode), hs2_status.errorMessage); + } + return *this; +} + void Status::AddErrorMsg(TStatusCode::type code, const std::string& msg) { if (error_detail_ == NULL) { error_detail_ = new ErrorDetail(code, msg); diff --git a/be/src/common/status.h b/be/src/common/status.h index c1be423cf..09ec8b4e4 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -22,6 +22,7 @@ #include "common/logging.h" #include "common/compiler-util.h" #include "gen-cpp/Status_types.h" // for TStatus +#include "gen-cpp/cli_service_types.h" // for HS2 TStatus namespace impala { @@ -94,6 +95,12 @@ class Status { // same as previous c'tor Status& operator=(const TStatus& status); + // "Copy c'tor from HS2 TStatus. + Status(const apache::hive::service::cli::thrift::TStatus& hs2_status); + + // same as previous c'tor + Status& operator=(const apache::hive::service::cli::thrift::TStatus& hs2_status); + // assign from stringstream Status& operator=(const std::stringstream& stream); diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index d2289c853..854d41f55 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -17,12 +17,14 @@ #include "exec/catalog-op-executor.h" #include "common/status.h" #include "service/impala-server.h" +#include "util/string-parser.h" #include "gen-cpp/CatalogService.h" #include "gen-cpp/CatalogService_types.h" using namespace std; using namespace impala; +using namespace apache::hive::service::cli::thrift; DECLARE_int32(catalog_service_port); DECLARE_string(catalog_service_host); @@ -32,6 +34,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) { FLAGS_catalog_service_port, NULL, ThriftServer::ThreadPool); switch (request.op_type) { case TCatalogOpType::DDL: { + // Compute stats stmts must be executed via ExecComputeStats(). + DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS); RETURN_IF_ERROR(client.Open()); catalog_update_result_.reset(new TCatalogUpdateResult()); exec_response_.reset(new TDdlExecResponse()); @@ -58,3 +62,88 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) { } } } + +Status CatalogOpExecutor::ExecComputeStats( + const TComputeStatsParams& compute_stats_params, + const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data, + const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) { + // Create a new DDL request to alter the table's statistics. + TCatalogOpRequest catalog_op_req; + catalog_op_req.__isset.ddl_params = true; + catalog_op_req.__set_op_type(TCatalogOpType::DDL); + TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params; + update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE); + + TAlterTableUpdateStatsParams& update_stats_params = + update_stats_req.alter_table_params.update_stats_params; + update_stats_req.__isset.alter_table_params = true; + update_stats_req.alter_table_params.__set_alter_type(TAlterTableType::UPDATE_STATS); + update_stats_req.alter_table_params.__set_table_name(compute_stats_params.table_name); + update_stats_req.alter_table_params.__isset.update_stats_params = true; + update_stats_params.__set_table_name(compute_stats_params.table_name); + + // Fill the alteration request based on the child-query results. + SetTableStats(tbl_stats_schema, tbl_stats_data, &update_stats_params); + SetColumnStats(col_stats_schema, col_stats_data, &update_stats_params); + + // Execute the 'alter table update stats' request. + RETURN_IF_ERROR(Exec(catalog_op_req)); + return Status::OK; +} + +void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema, + const TRowSet& tbl_stats_data, TAlterTableUpdateStatsParams* params) { + // Accumulate total number of rows in the table. + long total_num_rows = 0; + // Set per-partition stats. + BOOST_FOREACH(const TRow& row, tbl_stats_data.rows) { + DCHECK_GT(row.colVals.size(), 0); + // The first column is the COUNT(*) expr of the original query. + DCHECK(row.colVals[0].__isset.i64Val); + int64_t num_rows = row.colVals[0].i64Val.value; + // The remaining columns are partition columns that the results are grouped by. + vector partition_key_vals; + partition_key_vals.reserve(row.colVals.size()); + for (int j = 1; j < row.colVals.size(); ++j) { + // The partition-key values have been explicitly cast to string in the select list. + DCHECK(row.colVals[j].__isset.stringVal); + partition_key_vals.push_back(row.colVals[j].stringVal.value); + } + params->partition_stats[partition_key_vals].__set_num_rows(num_rows); + total_num_rows += num_rows; + } + params->__isset.partition_stats = true; + + // Set per-table stats. + params->table_stats.__set_num_rows(total_num_rows); + params->__isset.table_stats = true; +} + +void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema, + const TRowSet& col_stats_data, TAlterTableUpdateStatsParams* params) { + // Expect exactly one result row. + DCHECK_EQ(1, col_stats_data.rows.size()); + const TRow& col_stats_row = col_stats_data.rows[0]; + + // Set per-column stats. For a column at position i in its source table, + // the NDVs and the number of NULLs are at position i and i + 1 of the + // col_stats_row, respectively. + for (int i = 0; i < col_stats_row.colVals.size(); i += 2) { + // The NDVs are written as a string column by the estimation function. + StringParser::ParseResult parse_result; + const string& ndvs_str = col_stats_row.colVals[i].stringVal.value; + int64_t ndvs = StringParser::StringToInt(ndvs_str.data(), + ndvs_str.size(), &parse_result); + DCHECK_EQ(StringParser::PARSE_SUCCESS, parse_result); + + TColumnStats col_stats; + col_stats.__set_num_distinct_values(ndvs); + col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value); + // TODO: Gather and set the maxColLen/avgColLen stats as well. The planner + // currently does not rely on them significantly. + col_stats.__set_avg_size(-1); + col_stats.__set_max_size(-1); + params->column_stats[col_stats_schema.columns[i].columnName] = col_stats; + } + params->__isset.column_stats = true; +} diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h index 2994e0f9d..e05634294 100644 --- a/be/src/exec/catalog-op-executor.h +++ b/be/src/exec/catalog-op-executor.h @@ -17,6 +17,7 @@ #define IMPALA_EXEC_CATALOG_OP_EXECUTOR_H #include +#include "gen-cpp/cli_service_types.h" #include "gen-cpp/Frontend_types.h" namespace impala { @@ -34,6 +35,15 @@ class CatalogOpExecutor { // Executes the given catalog operation against the catalog server. Status Exec(const TCatalogOpRequest& catalog_op); + // Translates the given compute stats params and its child-query results into + // a new table alteration request for updating the stats metadata, and executes + // the alteration via Exec(); + Status ExecComputeStats(const TComputeStatsParams& compute_stats_params, + const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema, + const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data, + const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema, + const apache::hive::service::cli::thrift::TRowSet& col_stats_data); + // Set in Exec(), returns a pointer to the TDdlExecResponse of the DDL execution. // If called before Exec(), this will return NULL. Only set if the // TCatalogOpType is DDL. @@ -49,6 +59,17 @@ class CatalogOpExecutor { } private: + // Helper functions used in ExecComputeStats() for setting the thrift structs in params + // for the table/column stats based on the results of the corresponding child query. + static void SetTableStats( + const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema, + const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data, + TAlterTableUpdateStatsParams* params); + static void SetColumnStats( + const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema, + const apache::hive::service::cli::thrift::TRowSet& col_stats_data, + TAlterTableUpdateStatsParams* params); + // Response from executing the DDL request, see ddl_exec_response(). boost::scoped_ptr exec_response_; diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 96af2c197..78c770422 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -165,6 +165,9 @@ class Coordinator { const ProgressUpdater& progress() { return progress_; } + // Returns query_status_. + Status GetStatus(); + private: class BackendExecState; @@ -421,9 +424,6 @@ class Coordinator { // reached. void CancelRemoteFragments(); - // Returns query_status_. - Status GetStatus(); - // Acquires lock_ and updates query_status_ with 'status' if it's not already // an error status, and returns the current query_status_. // Calls CancelInternal() when switching to an error status. diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 90529c0a7..c3b012311 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(Service impala-hs2-server.cc impala-beeswax-server.cc query-exec-state.cc + child-query.cc ) # this shared library provides Impala executor functionality to FE test. diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc new file mode 100644 index 000000000..5228eeb65 --- /dev/null +++ b/be/src/service/child-query.cc @@ -0,0 +1,122 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "service/child-query.h" +#include "service/query-exec-state.h" +#include "util/debug-util.h" +#include "gen-cpp/cli_service_types.h" + +using namespace std; +using namespace impala; +using namespace boost; +using namespace apache::hive::service::cli::thrift; + +namespace impala { + +// To detect cancellation of the parent query, this function acquires the parent query's +// lock_ and checks the parent's parent_status before any HS2 "RPC" into the impala +// server. It is important not to hold any locks (in particular the parent query's +// lock_) while invoking HS2 functions to avoid deadlock. +Status ChildQuery::ExecAndFetch() { + const TUniqueId& session_id = parent_exec_state_->session_id(); + VLOG_QUERY << "Executing child query: " << query_ << " in session " + << PrintId(session_id); + + // Create HS2 request and response structs. + Status status; + TExecuteStatementResp exec_stmt_resp; + TExecuteStatementReq exec_stmt_req; + ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id, + &exec_stmt_req.sessionHandle.sessionId); + exec_stmt_req.__set_statement(query_); + + // Starting executing of the child query and setting is_running are not made atomic + // because holding a lock while calling into the parent_server_ may result in deadlock. + // Cancellation is checked immediately after setting is_running_ below. + // The order of the following three steps is important: + // 1. Start query execution before setting is_running_ to ensure that + // a concurrent Cancel() initiated by the parent is a no-op. + // 2. Set the hs2_handle_ before is_running_ to ensure there is a proper handle + // for Cancel() to use. + // 3. Set is_running_ to true. Once is_running_ is set, the child query + // can be cancelled via Cancel(). + RETURN_IF_ERROR(CheckParentStatus()); + parent_server_->ExecuteStatement(exec_stmt_resp, exec_stmt_req); + hs2_handle_ = exec_stmt_resp.operationHandle; + { + lock_guard l(lock_); + is_running_ = true; + } + status = exec_stmt_resp.status; + RETURN_IF_ERROR(status); + + TGetResultSetMetadataReq meta_req; + meta_req.operationHandle = exec_stmt_resp.operationHandle; + RETURN_IF_ERROR(CheckParentStatus()); + parent_server_->GetResultSetMetadata(meta_resp_, meta_req); + status = meta_resp_.status; + RETURN_IF_ERROR(status); + + // Fetch all results. + TFetchResultsReq fetch_req; + fetch_req.operationHandle = exec_stmt_resp.operationHandle; + fetch_req.maxRows = 1024; + do { + RETURN_IF_ERROR(CheckParentStatus()); + parent_server_->FetchResults(fetch_resp_, fetch_req); + status = fetch_resp_.status; + } while (status.ok() && fetch_resp_.hasMoreRows); + RETURN_IF_ERROR(CheckParentStatus()); + + TCloseOperationResp close_resp; + TCloseOperationReq close_req; + close_req.operationHandle = exec_stmt_resp.operationHandle; + parent_server_->CloseOperation(close_resp, close_req); + { + lock_guard l(lock_); + is_running_ = false; + } + status = close_resp.status; + return status; +} + +void ChildQuery::Cancel() { + lock_guard l(lock_); + if (!is_running_) return; + VLOG_QUERY << "Cancelling and closing child query with operation id: " + << hs2_handle_.operationId.guid; + // Ignore return statuses because they are not actionable. + TCancelOperationResp cancel_resp; + TCancelOperationReq cancel_req; + cancel_req.operationHandle = hs2_handle_; + parent_server_->CancelOperation(cancel_resp, cancel_req); + TCloseOperationResp close_resp; + TCloseOperationReq close_req; + close_req.operationHandle = hs2_handle_; + parent_server_->CloseOperation(close_resp, close_req); + is_running_ = false; +} + +Status ChildQuery::CheckParentStatus() { + Status parent_status; + { + lock_guard l(*parent_exec_state_->lock()); + parent_status = parent_exec_state_->query_status(); + } + // Cancel this child query if the parent was cancelled or has failed. + if (!parent_status.ok()) Cancel(); + return parent_status; +} + +} diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h new file mode 100644 index 000000000..0bd9014ff --- /dev/null +++ b/be/src/service/child-query.h @@ -0,0 +1,130 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_SERVICE_CHILD_QUERY_H +#define IMPALA_SERVICE_CHILD_QUERY_H + +#include +#include + +#include "common/status.h" +#include "impala-server.h" +#include "gen-cpp/cli_service_types.h" + +namespace impala { + +class ImpalaServer; + +// Child queries are used for implementing statements that consist of one or several +// query statements (e.g., compute stats) that require independent query handles for +// fetching results. Such queries are 'children' of a parent exec state in the sense +// that they are executed in the same session and that child queries are cancelled if +// the parent is cancelled (but not necessarily vice versa). +// For simplicity and consistency, child queries are always executed via HiveServer2, +// regardless of whether the parent session is Beeswax or HiveServer2. +// +// Parent queries are expected to call ExecAndWait() of a child query in a +// separate thread, and then join that thread to wait for child-query completion. +// The parent QueryExecState is independent of the child query's QueryExecState, +// with the exception that the child query selectively checks the parent's status +// for failure/cancellation detection. Child queries should never call into their +// parent's QueryExecState to avoid deadlock. +// +// TODO: Compute stats is the only stmt that requires child queries. Once the +// CatalogService performs background stats gathering the concept of child queries +// will likely become obsolete. Remove this class and all child-query related code. +class ChildQuery { + public: + ChildQuery(const std::string& query, ImpalaServer::QueryExecState* parent_exec_state, + ImpalaServer* parent_server) + : query_(query), + parent_exec_state_(parent_exec_state), + parent_server_(parent_server), + is_running_(false) { + DCHECK(!query_.empty()); + DCHECK(parent_exec_state_ != NULL); + DCHECK(parent_server_ != NULL); + } + + // Allow child queries to be added to std collections. + // (boost::mutex's operator= and copy c'tor are private) + ChildQuery(const ChildQuery& other) + : query_(other.query_), + parent_exec_state_(other.parent_exec_state_), + parent_server_(other.parent_server_), + is_running_(other.is_running_) {} + + // Allow child queries to be added to std collections. + // (boost::mutex's operator= and copy c'tor are private) + ChildQuery& operator=(const ChildQuery& other) { + query_ = other.query_; + parent_exec_state_ = other.parent_exec_state_; + parent_server_ = other.parent_server_; + is_running_ = other.is_running_; + return *this; + } + + // Executes this child query through HiveServer2 and fetches all its results. + Status ExecAndFetch(); + + // Cancels and closes the given child query if it is running. No-op if the query is + // not running. Child queries can be cancelled by the parent query through + // QueryExecState::Cancel() or inside this child query's own ExecAndFetch(). + // Child queries should never cancel their parent to avoid deadlock (but the parent + // query may decide to cancel itself based on a non-OK status from a child query). + // Note that child queries have a different QueryExecState than their parent query, + // so cancellation of a child query does not call into the parent's QueryExecState. + void Cancel(); + + const apache::hive::service::cli::thrift::TTableSchema& result_schema() { + return meta_resp_.schema; + } + + const apache::hive::service::cli::thrift::TRowSet& result_data() { + return fetch_resp_.results; + } + + private: + // Checks whether the parent query has failed or been cancelled. If so, cancels this + // child query (but not the parent query). Returns the status of the parent query. + Status CheckParentStatus(); + + // SQL string to be executed. + std::string query_; + + // Execution state of parent query. Used to synchronize and propagate parent + // cancellations/failures to this child query. Not owned. + ImpalaServer::QueryExecState* parent_exec_state_; + + // Parent Impala server used for executing this child query. Not owned. + ImpalaServer* parent_server_; + + // Result metadata and result rows of query. + apache::hive::service::cli::thrift::TGetResultSetMetadataResp meta_resp_; + apache::hive::service::cli::thrift::TFetchResultsResp fetch_resp_; + + // HS2 query handle. Set in ExecChildQuery(). + apache::hive::service::cli::thrift::TOperationHandle hs2_handle_; + + // Protects is_running_ to ensure idempotent cancellations. + boost::mutex lock_; + + // Indicates whether this query is running. False if the query has not started yet + // or if the query has finished either successfully or because of an error. + bool is_running_; +}; + +} + +#endif diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 9d375cf33..20dc682ed 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -240,6 +240,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, private: class FragmentExecState; + friend class ChildQuery; // Query result set stores converted rows returned by QueryExecState.fetchRows(). It // provides an interface to convert Impala rows to external API rows. diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 38110da20..f4dea90e6 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -109,49 +109,8 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) { return Status::OK; } case TStmtType::DDL: { - string op_type = catalog_op_type() == TCatalogOpType::DDL ? - PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type()); - summary_profile_.AddInfoString("DDL Type", op_type); - - if (catalog_op_type() != TCatalogOpType::DDL && - catalog_op_type() != TCatalogOpType::RESET_METADATA) { - Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request); - lock_guard l(lock_); - return UpdateQueryStatus(status); - } - - catalog_op_executor_.reset(new CatalogOpExecutor()); - Status status = catalog_op_executor_->Exec(exec_request->catalog_op_request); - { - lock_guard l(lock_); - RETURN_IF_ERROR(UpdateQueryStatus(status)); - } - - // If this is a CTAS request, there will usually be more work to do - // after executing the CREATE TABLE statement (the INSERT portion of the operation). - // The exception is if the user specified IF NOT EXISTS and the table already - // existed, in which case we do not execute the INSERT. - if (catalog_op_type() == TCatalogOpType::DDL && - ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { - if (catalog_op_executor_->ddl_exec_response()->new_table_created) { - // At this point, the remainder of the CTAS request executes - // like a normal DML request. As with other DML requests, it will - // wait for another catalog update if any partitions were altered as a result - // of the operation. - DCHECK(exec_request_.__isset.query_exec_request); - RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request)); - } else { - DCHECK(exec_request_.catalog_op_request. - ddl_params.create_table_params.if_not_exists); - } - } else { - // CREATE TABLE AS SELECT performs its catalog update once the DML - // portion of the operation has completed. - RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( - *catalog_op_executor_->update_catalog_result(), - exec_request_.query_options.sync_ddl)); - } - return Status::OK; + DCHECK(exec_request_.__isset.catalog_op_request); + return ExecDdlRequest(); } case TStmtType::LOAD: { DCHECK(exec_request_.__isset.load_data_request); @@ -177,7 +136,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) { } default: stringstream errmsg; - errmsg << "Unknown exec request stmt type: " << exec_request->stmt_type; + errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type; return Status(errmsg.str()); } } @@ -301,6 +260,64 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( return Status::OK; } +Status ImpalaServer::QueryExecState::ExecDdlRequest() { + string op_type = catalog_op_type() == TCatalogOpType::DDL ? + PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type()); + summary_profile_.AddInfoString("DDL Type", op_type); + + if (catalog_op_type() != TCatalogOpType::DDL && + catalog_op_type() != TCatalogOpType::RESET_METADATA) { + Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request); + lock_guard l(lock_); + return UpdateQueryStatus(status); + } + + if (ddl_type() == TDdlType::COMPUTE_STATS) { + TComputeStatsParams& compute_stats_params = + exec_request_.catalog_op_request.ddl_params.compute_stats_params; + // Add child queries for computing table and column stats. + child_queries_.push_back( + ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_)); + child_queries_.push_back( + ChildQuery(compute_stats_params.col_stats_query, this, parent_server_)); + ExecChildQueriesAsync(); + return Status::OK; + } + + catalog_op_executor_.reset(new CatalogOpExecutor()); + Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request); + { + lock_guard l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + + // If this is a CTAS request, there will usually be more work to do + // after executing the CREATE TABLE statement (the INSERT portion of the operation). + // The exception is if the user specified IF NOT EXISTS and the table already + // existed, in which case we do not execute the INSERT. + if (catalog_op_type() == TCatalogOpType::DDL && + ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) { + if (catalog_op_executor_->ddl_exec_response()->new_table_created) { + // At this point, the remainder of the CTAS request executes + // like a normal DML request. As with other DML requests, it will + // wait for another catalog update if any partitions were altered as a result + // of the operation. + DCHECK(exec_request_.__isset.query_exec_request); + RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request)); + } else { + DCHECK(exec_request_.catalog_op_request. + ddl_params.create_table_params.if_not_exists); + } + } else { + // CREATE TABLE AS SELECT performs its catalog update once the DML + // portion of the operation has completed. + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( + *catalog_op_executor_->update_catalog_result(), + exec_request_.query_options.sync_ddl)); + } + return Status::OK; +} + void ImpalaServer::QueryExecState::Done() { unique_lock l(lock_); MarkActive(); @@ -310,7 +327,6 @@ void ImpalaServer::QueryExecState::Done() { query_events_->MarkEvent("Unregister query"); } - Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& exec_request) { TResultSet metadata_op_result; // Like the other Exec(), fill out as much profile information as we're able to. @@ -324,11 +340,16 @@ Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& exec_request } Status ImpalaServer::QueryExecState::Wait() { + RETURN_IF_ERROR(WaitForChildQueries()); if (coord_.get() != NULL) { RETURN_IF_ERROR(coord_->Wait()); RETURN_IF_ERROR(UpdateCatalog()); } + if (ddl_type() == TDdlType::COMPUTE_STATS) { + RETURN_IF_ERROR(UpdateTableAndColumnStats()); + } + if (!returns_result_set()) { // Queries that do not return a result are finished at this point. This includes // DML operations and a subset of the DDL operations. @@ -455,6 +476,11 @@ Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector* r } void ImpalaServer::QueryExecState::Cancel(const Status* cause) { + // Cancel and close child queries before cancelling parent. + BOOST_FOREACH(ChildQuery& child_query, child_queries_) { + child_query.Cancel(); + } + // If the query is completed, no need to cancel. if (eos_) return; // we don't want multiple concurrent cancel calls to end up executing @@ -580,4 +606,59 @@ void ImpalaServer::QueryExecState::MarkActive() { ++ref_count_; } +Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats() { + DCHECK(child_queries_.size() == 2); + catalog_op_executor_.reset(new CatalogOpExecutor()); + Status status = catalog_op_executor_->ExecComputeStats( + exec_request_.catalog_op_request.ddl_params.compute_stats_params, + child_queries_[0].result_schema(), + child_queries_[0].result_data(), + child_queries_[1].result_schema(), + child_queries_[1].result_data()); + { + lock_guard l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( + *catalog_op_executor_->update_catalog_result(), + exec_request_.query_options.sync_ddl)); + + // Set the results to be reported to the client. + const TDdlExecResponse* ddl_resp = catalog_op_executor_->ddl_exec_response(); + if (ddl_resp != NULL && ddl_resp->__isset.result_set) { + result_metadata_ = ddl_resp->result_set.schema; + request_result_set_.reset(new vector); + request_result_set_->assign( + ddl_resp->result_set.rows.begin(), ddl_resp->result_set.rows.end()); + } + + query_events_->MarkEvent("Metastore update finished"); + return Status::OK; +} + +void ImpalaServer::QueryExecState::ExecChildQueriesAsync() { + DCHECK(child_queries_thread_.get() == NULL); + child_queries_thread_.reset(new Thread("query-exec-state", "async child queries", + bind(&ImpalaServer::QueryExecState::ExecChildQueries, this))); +} + +void ImpalaServer::QueryExecState::ExecChildQueries() { + for (int i = 0; i < child_queries_.size(); ++i) { + if (!child_queries_status_.ok()) return; + child_queries_status_ = child_queries_[i].ExecAndFetch(); + } +} + +Status ImpalaServer::QueryExecState::WaitForChildQueries() { + if (child_queries_thread_.get() == NULL) return Status::OK; + child_queries_thread_->Join(); + { + lock_guard l(lock_); + RETURN_IF_ERROR(query_status_); + RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status_)); + } + query_events_->MarkEvent("Child queries finished"); + return Status::OK; +} + } diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h index 0887a24ab..bcc0acaaa 100644 --- a/be/src/service/query-exec-state.h +++ b/be/src/service/query-exec-state.h @@ -19,8 +19,9 @@ #include "exec/catalog-op-executor.h" #include "util/runtime-profile.h" #include "runtime/timestamp-value.h" -#include "gen-cpp/Frontend_types.h" +#include "service/child-query.h" #include "service/impala-server.h" +#include "gen-cpp/Frontend_types.h" #include #include @@ -46,6 +47,9 @@ class QueryExecStateCleaner; // To avoid deadlocks, the caller must *not* acquire query_exec_state_map_lock_ // while holding the exec state's lock. // TODO: Consider renaming to RequestExecState for consistency. +// TODO: Compute stats is the only stmt that requires child queries. Once the +// CatalogService performs background stats gathering the concept of child queries +// will likely become obsolete. Remove all child-query related code from this class. class ImpalaServer::QueryExecState { public: QueryExecState(ExecEnv* exec_env, Frontend* frontend, @@ -68,7 +72,7 @@ class ImpalaServer::QueryExecState { Status Exec(const TMetadataOpRequest& exec_request); // Call this to ensure that rows are ready when calling FetchRows(). - // Must be preceded by call to Exec(). + // Must be preceded by call to Exec(). Waits for all child queries to complete. Status Wait(); // Return at most max_rows from the current batch. If the entire current batch has @@ -180,8 +184,8 @@ class ImpalaServer::QueryExecState { // not set for ddl queries, or queries with "limit 0" boost::scoped_ptr coord_; - // Runs statements that query or modify the catalog via the CatalogService. - boost::scoped_ptr catalog_op_executor_; + // Runs statements that query or modify the catalog via the CatalogService. + boost::scoped_ptr catalog_op_executor_; // Result set used for requests that return results and are not QUERY // statements. For example, EXPLAIN, LOAD, and SHOW use this. @@ -235,6 +239,16 @@ class ImpalaServer::QueryExecState { // Start/end time of the query TimestampValue start_time_, end_time_; + // List of child queries to be executed on behalf of this query. + std::vector child_queries_; + + // Thread to execute child_queries_ in and the resulting status. The status is OK iff + // all child queries complete successfully. Otherwise, status contains the error of the + // first child query that failed (child queries are executed serially and abort on the + // first error). + Status child_queries_status_; + boost::scoped_ptr child_queries_thread_; + // Executes a local catalog operation (an operation that does not need to execute // against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements. Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op); @@ -254,6 +268,10 @@ class ImpalaServer::QueryExecState { // Non-blocking. Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request); + // Core logic of executing a ddl statement. May internally initiate execution of + // queries (e.g., compute stats) or dml (e.g., create table as select) + Status ExecDdlRequest(); + // Executes a LOAD DATA Status ExecLoadDataRequest(); @@ -283,6 +301,30 @@ class ImpalaServer::QueryExecState { // ready until all BEs complete execution. This can be called as part of Wait(), // at which point results will be avilable. void SetCreateTableAsSelectResultSet(); + + // Updates the metastore's table and column statistics based on the child-query results + // of a compute stats command. + // TODO: Unify the various ways that the Metastore is updated for DDL/DML. + // For example, INSERT queries update partition metadata in UpdateCatalog() using a + // TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar + // purposes. Perhaps INSERT should use a TCatalogOpRequest as well. + Status UpdateTableAndColumnStats(); + + // Asynchronously executes all child_queries_ one by one. Calls ExecChildQueries() + // in a new child_queries_thread_. + void ExecChildQueriesAsync(); + + // Serially executes the queries in child_queries_ by calling the child query's + // ExecAndWait(). This function is blocking and is intended to be run in a separate + // thread to ensure that Exec() remains non-blocking. Sets child_queries_status_. + // Must not be called while holding lock_. + void ExecChildQueries(); + + // Waits for all child queries to complete successfully or with an error, by joining + // child_queries_thread_. Returns a non-OK status if a child query fails or if the + // parent query is cancelled (subsequent children will not be executed). Returns OK + // if child_queries_thread_ is not set or if all child queries finished successfully. + Status WaitForChildQueries(); }; } diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index cb3ea8dbe..d261c64ed 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -19,6 +19,7 @@ include "CatalogObjects.thrift" include "JniCatalog.thrift" include "Types.thrift" include "Status.thrift" +include "Data.thrift" // CatalogServer service API and related structs. @@ -81,6 +82,9 @@ struct TDdlExecRequest { // Parameters for DROP FUNCTION 12: optional JniCatalog.TDropFunctionParams drop_fn_params + + // Parameters for COMPUTE STATS + 13: optional JniCatalog.TComputeStatsParams compute_stats_params } // Response from executing a TDdlExecRequest @@ -93,6 +97,10 @@ struct TDdlExecResponse { // table or whether creation was skipped because the table already existed, in which // case this flag would be false 2: optional bool new_table_created; + + // Result of DDL operation to be returned to the client. Currently only set + // by COMPUTE STATS. + 3: optional Data.TResultSet result_set } // Updates the metastore with new partition information and returns a response diff --git a/common/thrift/Data.thrift b/common/thrift/Data.thrift index e6047b0ae..357aea8cd 100644 --- a/common/thrift/Data.thrift +++ b/common/thrift/Data.thrift @@ -16,6 +16,7 @@ namespace cpp impala namespace java com.cloudera.impala.thrift include "Types.thrift" +include "CatalogObjects.thrift" // Serialized, self-contained version of a RowBatch (in be/src/runtime/row-batch.h). struct TRowBatch { @@ -51,3 +52,14 @@ struct TColumnValue { struct TResultRow { 1: list colVals } + +struct TResultSetMetadata { + 1: required list columns +} + +// List of rows and metadata describing their columns. +struct TResultSet { + 1: required list rows + 2: required TResultSetMetadata schema +} + diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 203816493..d37c202cf 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -54,16 +54,6 @@ struct TSessionState { 4: required Types.TNetworkAddress network_address } -struct TResultSetMetadata { - 1: required list columns -} - -// List of rows and metadata describing their columns. -struct TResultSet { - 1: required list rows - 2: required TResultSetMetadata schema -} - // Struct for HiveUdf expr to create the proper execution object in the FE // java side. See exprs/hive-udf-call.h for how hive Udfs are executed in general. // TODO: this could be the UdfID, collapsing the first 3 arguments but synchronizing @@ -304,7 +294,7 @@ struct TQueryExecRequest { per_node_scan_ranges // Metadata of the query result set (only for select) - 5: optional TResultSetMetadata result_set_metadata + 5: optional Data.TResultSetMetadata result_set_metadata // Set if the query needs finalization after it executes 6: optional TFinalizeParams finalize_params @@ -437,7 +427,7 @@ struct TExecRequest { 4: optional TCatalogOpRequest catalog_op_request // Metadata of the query result set (not set for DML) - 5: optional TResultSetMetadata result_set_metadata + 5: optional Data.TResultSetMetadata result_set_metadata // Result of EXPLAIN. Set iff stmt_type is EXPLAIN 6: optional TExplainResult explain_result diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index 54777140c..d0fccc471 100644 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -18,6 +18,7 @@ namespace java com.cloudera.impala.thrift include "CatalogObjects.thrift" include "Types.thrift" include "Status.thrift" +include "cli_service.thrift" // Structs used to execute DDL operations using the JniCatalog. @@ -30,6 +31,7 @@ enum TDdlType { CREATE_TABLE_LIKE, CREATE_VIEW, CREATE_FUNCTION, + COMPUTE_STATS, DROP_DATABASE, DROP_TABLE, DROP_VIEW, @@ -48,6 +50,8 @@ enum TAlterTableType { SET_FILE_FORMAT, SET_LOCATION, SET_TBL_PROPERTIES, + // Used internally by the COMPUTE STATS DDL command. + UPDATE_STATS } // Parameters of CREATE DATABASE commands @@ -167,6 +171,23 @@ struct TAlterTableSetLocationParams { 2: optional list partition_spec } +// Parameters for updating the table and/or column statistics +// of a table. Used internally by a COMPUTE STATS command. +struct TAlterTableUpdateStatsParams { + // Fully qualified name of the table to be updated. + 1: required CatalogObjects.TTableName table_name + + // Table-level stats. + 2: optional CatalogObjects.TTableStats table_stats + + // Partition-level stats. Maps from a list of partition-key values + // to its partition stats. + 3: optional map, CatalogObjects.TTableStats> partition_stats + + // Column-level stats. Maps from column name to column stats. + 4: optional map column_stats +} + // Parameters for all ALTER TABLE commands. struct TAlterTableParams { 1: required TAlterTableType alter_type @@ -200,6 +221,9 @@ struct TAlterTableParams { // Parameters for ALTER TABLE SET TBLPROPERTIES 11: optional TAlterTableSetTblPropertiesParams set_tbl_properties_params + + // Parameters for updating table/column stats. Used internally by COMPUTE STATS + 12: optional TAlterTableUpdateStatsParams update_stats_params } // Parameters of CREATE TABLE LIKE commands @@ -296,6 +320,18 @@ struct TCreateOrAlterViewParams { 7: optional bool if_not_exists } +// Parameters of a COMPUTE STATS command +struct TComputeStatsParams { + // Fully qualified name of the table to compute stats for. + 1: required CatalogObjects.TTableName table_name + + // Query for gathering per-partition row count. + 2: required string tbl_stats_query + + // Query for gethering per-column NDVs and number of NULLs. + 3: required string col_stats_query +} + // Parameters of DROP DATABASE commands struct TDropDbParams { // Name of the database to drop diff --git a/fe/src/main/cup/sql-parser.y b/fe/src/main/cup/sql-parser.y index 01a0b584b..016fbd261 100644 --- a/fe/src/main/cup/sql-parser.y +++ b/fe/src/main/cup/sql-parser.y @@ -201,7 +201,7 @@ parser code {: terminal KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_AS, KW_ASC, KW_AVG, KW_AVROFILE, KW_BETWEEN, KW_BIGINT, KW_BOOLEAN, KW_BY, KW_CASE, KW_CAST, - KW_CHANGE, KW_CHAR, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COUNT, KW_CREATE, + KW_CHANGE, KW_CHAR, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPUTE, KW_COUNT, KW_CREATE, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, @@ -311,6 +311,7 @@ nonterminal Qualifier union_op; nonterminal AlterTableStmt alter_tbl_stmt; nonterminal StatementBase alter_view_stmt; +nonterminal ComputeStatsStmt compute_stats_stmt; nonterminal DropDbStmt drop_db_stmt; nonterminal DropTableOrViewStmt drop_tbl_or_view_stmt; nonterminal CreateDbStmt create_db_stmt; @@ -412,6 +413,8 @@ stmt ::= {: RESULT = alter_tbl; :} | alter_view_stmt:alter_view {: RESULT = alter_view; :} + | compute_stats_stmt:compute_stats + {: RESULT = compute_stats; :} | create_tbl_as_select_stmt:create_tbl_as_select {: RESULT = create_tbl_as_select; :} | create_tbl_like_stmt:create_tbl_like @@ -821,6 +824,11 @@ alter_view_stmt ::= {: RESULT = new AlterTableOrViewRenameStmt(before_table, new_table, false); :} ; +compute_stats_stmt ::= + KW_COMPUTE KW_STATS table_name:table + {: RESULT = new ComputeStatsStmt(table); :} + ; + drop_db_stmt ::= KW_DROP db_or_schema_kw if_exists_val:if_exists IDENT:db_name {: RESULT = new DropDbStmt(db_name, if_exists); :} diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java index 275f44bbd..9d5c63362 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java @@ -55,6 +55,10 @@ public class AnalysisContext { return stmt instanceof AlterViewStmt; } + public boolean isComputeStatsStmt() { + return stmt instanceof ComputeStatsStmt; + } + public boolean isQueryStmt() { return stmt instanceof QueryStmt; } @@ -150,9 +154,9 @@ public class AnalysisContext { isShowCreateTableStmt() || isDescribeStmt() || isCreateTableLikeStmt() || isCreateTableStmt() || isCreateViewStmt() || isCreateDbStmt() || isDropDbStmt() || isDropTableOrViewStmt() || isResetMetadataStmt() || - isAlterTableStmt() || isAlterViewStmt() || isCreateUdfStmt() || - isCreateUdaStmt() || isShowFunctionsStmt() || isDropFunctionStmt() || - isCreateTableAsSelectStmt(); + isAlterTableStmt() || isAlterViewStmt() || isComputeStatsStmt() || + isCreateUdfStmt() || isCreateUdaStmt() || isShowFunctionsStmt() || + isDropFunctionStmt() || isCreateTableAsSelectStmt(); } public boolean isDmlStmt() { @@ -169,6 +173,11 @@ public class AnalysisContext { return (AlterViewStmt) stmt; } + public ComputeStatsStmt getComputeStatsStmt() { + Preconditions.checkState(isComputeStatsStmt()); + return (ComputeStatsStmt) stmt; + } + public CreateTableLikeStmt getCreateTableLikeStmt() { Preconditions.checkState(isCreateTableLikeStmt()); return (CreateTableLikeStmt) stmt; diff --git a/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java new file mode 100644 index 000000000..164b3eb7a --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/analysis/ComputeStatsStmt.java @@ -0,0 +1,129 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.analysis; + +import java.util.List; + +import org.apache.log4j.Logger; + +import com.cloudera.impala.authorization.Privilege; +import com.cloudera.impala.catalog.AuthorizationException; +import com.cloudera.impala.catalog.Column; +import com.cloudera.impala.catalog.HdfsTable; +import com.cloudera.impala.catalog.Table; +import com.cloudera.impala.catalog.View; +import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.thrift.TComputeStatsParams; +import com.cloudera.impala.thrift.TTableName; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Represents an COMPUTE STATS statement for statistics collection. The + * statement gathers all table and column stats for a given table and stores them in + * the Metastore via the CatalogService. All existing stats for that table are replaced + * and no existing stats are reused. + * + * TODO: Allow more coarse/fine grained (db, column) and/or incremental stats collection. + */ +public class ComputeStatsStmt extends StatementBase { + private static final Logger LOG = Logger.getLogger(ComputeStatsStmt.class); + + protected final TableName tableName_; + + // Set during analysis. + protected Table table_; + + // Query for getting the per-partition row count and the total row count. + // Set during analysis. + protected String tableStatsQueryStr_; + + // Query for getting the per-column NDVs and number of NULLs. + // Set during analysis. + protected String columnStatsQueryStr_; + + protected ComputeStatsStmt(TableName tableName) { + Preconditions.checkState(tableName != null && !tableName.isEmpty()); + this.tableName_ = tableName; + this.table_ = null; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, + AuthorizationException { + table_ = analyzer.getTable(tableName_, Privilege.ALTER); + if (table_ instanceof View) { + throw new AnalysisException(String.format( + "COMPUTE STATS not allowed on a view: %s", table_.getFullName())); + } + + // Query for getting the per-partition row count and the total row count. + StringBuilder tableStatsQueryBuilder = new StringBuilder("SELECT "); + List tableStatsSelectList = Lists.newArrayList(); + tableStatsSelectList.add("COUNT(*)"); + List groupByCols = Lists.newArrayList(); + // Only add group by clause for HdfsTables. + if (table_ instanceof HdfsTable) { + for (int i = 0; i < table_.getNumClusteringCols(); ++i) { + String colName = table_.getColumns().get(i).getName(); + groupByCols.add(colName); + // For the select list, wrap the group by columns in a cast to string because + // the Metastore stores them as strings. + tableStatsSelectList.add("cast(" + colName + " as string)"); + } + } + tableStatsQueryBuilder.append(Joiner.on(", ").join(tableStatsSelectList)); + tableStatsQueryBuilder.append(" FROM " + table_.getFullName()); + if (!groupByCols.isEmpty()) { + tableStatsQueryBuilder.append(" GROUP BY "); + tableStatsQueryBuilder.append(Joiner.on(", ").join(groupByCols)); + } + tableStatsQueryStr_ = tableStatsQueryBuilder.toString(); + LOG.debug(tableStatsQueryStr_); + + // Query for getting the per-column NDVs and number of NULLs. + StringBuilder columnStatsQueryBuilder = new StringBuilder("SELECT "); + List columnStatsSelectList = Lists.newArrayList(); + // Exclude partition columns from stats gathering because Hive's framework cannot + // handle storing them as part of the non-partition column stats. + for (int i = table_.getNumClusteringCols(); i < table_.getColumns().size(); ++i) { + Column c = table_.getColumns().get(i); + // NDV approximation function. Add explicit alias for later identification when + // updating the Metastore. + columnStatsSelectList.add("NDV(" + c.getName() + ") AS " + c.getName()); + // Count the number of NULL values. + columnStatsSelectList.add("COUNT(IF(" + c.getName() + " IS NULL, 1, NULL))"); + } + columnStatsQueryBuilder.append(Joiner.on(", ").join(columnStatsSelectList)); + columnStatsQueryBuilder.append(" FROM " + table_.getFullName()); + columnStatsQueryStr_ = columnStatsQueryBuilder.toString(); + LOG.debug(columnStatsQueryStr_); + } + + public String getTblStatsQuery() { return tableStatsQueryStr_; } + public String getColStatsQuery() { return columnStatsQueryStr_; } + + @Override + public String toSql() { return "COMPUTE STATS " + tableName_.toString(); } + + public TComputeStatsParams toThrift() { + TComputeStatsParams params = new TComputeStatsParams(); + params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName())); + params.setTbl_stats_query(tableStatsQueryStr_); + params.setCol_stats_query(columnStatsQueryStr_); + return params; + } +} diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java b/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java index 9eb1a3899..7aab1199d 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/ColumnStats.java @@ -42,7 +42,8 @@ public class ColumnStats { private final static EnumSet SUPPORTED_COL_TYPES = EnumSet.of( PrimitiveType.BIGINT, PrimitiveType.BINARY, PrimitiveType.BOOLEAN, PrimitiveType.DOUBLE, PrimitiveType.FLOAT, PrimitiveType.INT, - PrimitiveType.SMALLINT, PrimitiveType.STRING, PrimitiveType.TINYINT); + PrimitiveType.SMALLINT, PrimitiveType.STRING, PrimitiveType.TIMESTAMP, + PrimitiveType.TINYINT); // in bytes: excludes serialization overhead private double avgSize; @@ -144,12 +145,14 @@ public class ColumnStats { if (isCompatible) { BooleanColumnStatsData boolStats = statsData.getBooleanStats(); numNulls = boolStats.getNumNulls(); + numDistinctValues = (numNulls > 0) ? 3 : 2; } break; case TINYINT: case SMALLINT: case INT: case BIGINT: + case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps. isCompatible = statsData.isSetLongStats(); if (isCompatible) { LongColumnStatsData longStats = statsData.getLongStats(); diff --git a/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java b/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java index 880f64d81..0556a55e3 100644 --- a/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java +++ b/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java @@ -26,13 +26,22 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.ql.stats.StatsSetupConst; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -40,10 +49,12 @@ import com.cloudera.impala.analysis.FunctionName; import com.cloudera.impala.analysis.TableName; import com.cloudera.impala.catalog.Catalog; import com.cloudera.impala.catalog.CatalogException; +import com.cloudera.impala.catalog.Column; import com.cloudera.impala.catalog.ColumnNotFoundException; import com.cloudera.impala.catalog.DatabaseNotFoundException; import com.cloudera.impala.catalog.Db; import com.cloudera.impala.catalog.Function; +import com.cloudera.impala.catalog.HBaseTable; import com.cloudera.impala.catalog.HdfsPartition; import com.cloudera.impala.catalog.HdfsTable; import com.cloudera.impala.catalog.HiveStorageDescriptorFactory; @@ -66,10 +77,13 @@ import com.cloudera.impala.thrift.TAlterTableParams; import com.cloudera.impala.thrift.TAlterTableSetFileFormatParams; import com.cloudera.impala.thrift.TAlterTableSetLocationParams; import com.cloudera.impala.thrift.TAlterTableSetTblPropertiesParams; +import com.cloudera.impala.thrift.TAlterTableUpdateStatsParams; import com.cloudera.impala.thrift.TCatalogObject; import com.cloudera.impala.thrift.TCatalogObjectType; import com.cloudera.impala.thrift.TCatalogUpdateResult; import com.cloudera.impala.thrift.TColumn; +import com.cloudera.impala.thrift.TColumnStats; +import com.cloudera.impala.thrift.TColumnValue; import com.cloudera.impala.thrift.TCreateDbParams; import com.cloudera.impala.thrift.TCreateFunctionParams; import com.cloudera.impala.thrift.TCreateOrAlterViewParams; @@ -84,10 +98,14 @@ import com.cloudera.impala.thrift.TDropTableOrViewParams; import com.cloudera.impala.thrift.THdfsFileFormat; import com.cloudera.impala.thrift.TPartitionKeyValue; import com.cloudera.impala.thrift.TPrimitiveType; +import com.cloudera.impala.thrift.TResultRow; +import com.cloudera.impala.thrift.TResultSet; +import com.cloudera.impala.thrift.TResultSetMetadata; import com.cloudera.impala.thrift.TStatus; import com.cloudera.impala.thrift.TStatusCode; import com.cloudera.impala.thrift.TTable; import com.cloudera.impala.thrift.TTableName; +import com.cloudera.impala.thrift.TTableStats; import com.cloudera.impala.thrift.TUpdateCatalogRequest; import com.cloudera.impala.thrift.TUpdateCatalogResponse; import com.google.common.base.Joiner; @@ -152,6 +170,9 @@ public class DdlExecutor { case CREATE_FUNCTION: createFunction(ddlRequest.getCreate_fn_params(), response); break; + case COMPUTE_STATS: + Preconditions.checkState(false, "Compute stats should trigger an ALTER TABLE."); + break; case DROP_DATABASE: dropDatabase(ddlRequest.getDrop_db_params(), response); break; @@ -239,6 +260,10 @@ public class DdlExecutor { alterTableSetTblProperties(TableName.fromThrift(params.getTable_name()), params.getSet_tbl_properties_params()); break; + case UPDATE_STATS: + Preconditions.checkState(params.isSetUpdate_stats_params()); + alterTableUpdateStats(params.getUpdate_stats_params(), response); + break; default: throw new UnsupportedOperationException( "Unknown ALTER TABLE operation type: " + params.getAlter_type()); @@ -282,17 +307,183 @@ public class DdlExecutor { resp.result.setVersion(resp.result.getUpdated_catalog_object().getCatalog_version()); } + /** + * Alters an existing table's table and column statistics. + */ + private void alterTableUpdateStats(TAlterTableUpdateStatsParams params, + TDdlExecResponse resp) throws NoSuchObjectException, MetaException, TException, + CatalogException { + Preconditions.checkState(params.isSetColumn_stats() && params.isSetPartition_stats() + && params.isSetTable_stats()); + + TableName tableName = TableName.fromThrift(params.getTable_name()); + Preconditions.checkState(tableName != null && tableName.isFullyQualified()); + LOG.info(String.format("Updating table stats for %s", tableName)); + + Table table = catalog.getTable(tableName.getDb(), tableName.getTbl()); + // Deep copy the msTbl to avoid updating our cache before successfully persisting + // the results to the metastore. + org.apache.hadoop.hive.metastore.api.Table msTbl = + table.getMetaStoreTable().deepCopy(); + List msPartitions = + Lists.newArrayList(); + if (table instanceof HdfsTable) { + // Fill the msPartitions from the the cached metadata. + HdfsTable hdfsTable = (HdfsTable) table; + for (HdfsPartition p: hdfsTable.getPartitions()) { + if (p.getMetaStorePartition() != null) { + msPartitions.add(p.getMetaStorePartition()); + } + } + } + + MetaStoreClient msClient = catalog.getMetaStoreClient(); + int numUpdatedPartitions; + int numUpdatedColumns; + try { + // Update the table and partition row counts based on the query results. + numUpdatedPartitions = updateTableStats(table, params, msTbl, msPartitions); + + // Create Hive column stats from the query results. + ColumnStatistics colStats = createHiveColStats(params.getColumn_stats(), table); + numUpdatedColumns = colStats.getStatsObjSize(); + + // Ensure updates are atomic with respect to conflicting DDL operations. + synchronized (metastoreDdlLock) { + // Alter all partitions in bulk. + msClient.getHiveClient().alter_partitions(tableName.getDb(), + tableName.getTbl(), msPartitions); + + // Update column stats. + msClient.getHiveClient().updateTableColumnStatistics(colStats); + + // Update the table stats. Apply the table alteration last to ensure the + // lastDdlTime is as accurate as possible. + applyAlterTable(msTbl); + } + } finally { + msClient.release(); + } + + // Set the results to be reported to the client. + TResultSet resultSet = new TResultSet(); + resultSet.setSchema(new TResultSetMetadata(Lists.newArrayList( + new TColumn("summary", TPrimitiveType.STRING)))); + TColumnValue resultColVal = new TColumnValue(); + resultColVal.setStringVal("Updated " + numUpdatedPartitions + " partition(s) and " + + numUpdatedColumns + " column(s)."); + TResultRow resultRow = new TResultRow(); + resultRow.setColVals(Lists.newArrayList(resultColVal)); + resultSet.setRows(Lists.newArrayList(resultRow)); + resp.setResult_set(resultSet); + } + + /** + * Updates the row counts of the given Hive partitions and the total row count of the + * given Hive table based on the given update stats parameters. + * Missing or new partitions as a result of concurrent table alterations are ignored. + * Returns the number of successfully updated partitions. + */ + private int updateTableStats(Table table, TAlterTableUpdateStatsParams params, + org.apache.hadoop.hive.metastore.api.Table msTbl, + List msPartitions) { + Preconditions.checkState(params.isSetPartition_stats()); + Preconditions.checkState(params.isSetTable_stats()); + + // Update the partitions' ROW_COUNT parameter. + int numUpdatedPartitions = 0; + for(org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { + TTableStats partitionStats = params.partition_stats.get(msPartition.getValues()); + if (partitionStats == null) continue; + LOG.debug(String.format("Updating stats for partition %s: numRows=%s", + Joiner.on(",").join(msPartition.getValues()), partitionStats.num_rows)); + msPartition.putToParameters(StatsSetupConst.ROW_COUNT, + String.valueOf(partitionStats.num_rows)); + ++numUpdatedPartitions; + } + // For unpartitioned tables and HBase tables report a single updated partition. + if (table.getNumClusteringCols() == 0 || table instanceof HBaseTable) { + numUpdatedPartitions = 1; + } + + // Update the table's ROW_COUNT parameter. + msTbl.putToParameters(StatsSetupConst.ROW_COUNT, + String.valueOf(params.getTable_stats().num_rows)); + return numUpdatedPartitions; + } + + /** + * Create Hive column statistics for the given table based on the give map from column + * name to column stats. Missing or new columns as a result of concurrent table + * alterations are ignored. + */ + private static ColumnStatistics createHiveColStats( + Map columnStats, Table table) { + // Collection of column statistics objects to be returned. + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc( + new ColumnStatisticsDesc(true, table.getDb().getName(), table.getName())); + // Generate Hive column stats objects from the update stats params. + for (Map.Entry entry: columnStats.entrySet()) { + String colName = entry.getKey(); + Column tableCol = table.getColumn(entry.getKey()); + // Ignore columns that were dropped in the meantime. + if (tableCol == null) continue; + ColumnStatisticsData colStatsData = + createHiveColStatsData(entry.getValue(), tableCol.getType()); + if (colStatsData == null) continue; + LOG.debug(String.format("Updating column stats for %s: numDVs=%s numNulls=%s", + colName, entry.getValue().getNum_distinct_values(), + entry.getValue().getNum_nulls())); + ColumnStatisticsObj colStatsObj = new ColumnStatisticsObj(colName, + tableCol.getType().toString(), colStatsData); + colStats.addToStatsObj(colStatsObj); + } + return colStats; + } + + private static ColumnStatisticsData createHiveColStatsData(TColumnStats colStats, + PrimitiveType colType) { + ColumnStatisticsData colStatsData = new ColumnStatisticsData(); + long ndvs = colStats.getNum_distinct_values(); + long numNulls = colStats.getNum_nulls(); + switch(colType) { + case BOOLEAN: + // TODO: Gather and set the numTrues and numFalse stats as well. The planner + // currently does not rely on them. + colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls)); + break; + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps. + // TODO: Gather and set the min/max values stats as well. The planner + // currently does not rely on them. + colStatsData.setLongStats(new LongColumnStatsData(-1, -1, numNulls, ndvs)); + break; + case FLOAT: + case DOUBLE: + // TODO: Gather and set the min/max values stats as well. The planner + // currently does not rely on them. + colStatsData.setDoubleStats(new DoubleColumnStatsData(-1, -1, numNulls, ndvs)); + break; + case STRING: + // TODO: Gather and set the maxColLen/avgColLen stats as well. The planner + // currently does not rely on them significantly. + colStatsData.setStringStats(new StringColumnStatsData(-1, -1, numNulls, ndvs)); + break; + default: + return null; + } + return colStatsData; + } + /** * Creates a new database in the metastore and adds the db name to the internal * metadata cache, marking its metadata to be lazily loaded on the next access. * Re-throws any Hive Meta Store exceptions encountered during the create, these * may vary depending on the Meta Store connection type (thrift vs direct db). - * - * @param dbName - The name of the new database. - * @param comment - Comment to attach to the database, or null for no comment. - * @param location - Hdfs path to use as the default location for new table data or - * null to use default location. - * @param ifNotExists - If true, no errors are thrown if the database already exists */ private void createDatabase(TCreateDbParams params, TDdlExecResponse resp) throws MetaException, AlreadyExistsException, InvalidObjectException, @@ -357,9 +548,6 @@ public class DdlExecutor { * Drops a database from the metastore and removes the database's metadata from the * internal cache. The database must be empty (contain no tables) for the drop operation * to succeed. Re-throws any Hive Meta Store exceptions encountered during the drop. - * - * @param dbName - The name of the database to drop - * @param ifExists - If true, no errors will be thrown if the database does not exist. */ private void dropDatabase(TDropDbParams params, TDdlExecResponse resp) throws MetaException, NoSuchObjectException, InvalidOperationException, @@ -451,21 +639,6 @@ public class DdlExecutor { * Creates a new table in the metastore and adds an entry to the metadata cache to * lazily load the new metadata on the next access. Re-throws any Hive Meta Store * exceptions encountered during the create. - * - * @param tableName - Fully qualified name of the new table. - * @param column - List of column definitions for the new table. - * @param partitionColumn - List of partition column definitions for the new table. - * @param owner - Owner of this table. - * @param isExternal - * If true, table is created as external which means the data will not be deleted - * if dropped. External tables can also be created on top of existing data. - * @param comment - Optional comment to attach to the table (null for no comment). - * @param location - Hdfs path to use as the location for table data or null to use - * default location. - * @param ifNotExists - If true, no errors are thrown if the table already exists - * @return Returns true if a new table was created in the metastore as a result of this - * call. Returns false if creation was skipped - this indicates the table already - * existed and the caller specified IF NOT EXISTS. */ private boolean createTable(TCreateTableParams params, TDdlExecResponse response) throws MetaException, NoSuchObjectException, AlreadyExistsException, @@ -523,20 +696,6 @@ public class DdlExecutor { * No data is copied as part of this process, it is a metadata only operation. If the * creation succeeds, an entry is added to the metadata cache to lazily load the new * table's metadata on the next access. - * - * @param tableName - Fully qualified name of the new table. - * @param srcTableName - Fully qualified name of the old table. - * @param owner - Owner of this table. - * @param isExternal - * If true, table is created as external which means the data will not be deleted - * if dropped. External tables can also be created on top of existing data. - * @param comment - Optional comment to attach to the table or an empty string for no - comment. Null to copy comment from the source table. - * @param fileFormat - The file format for the new table or null to copy file format - * from source table. - * @param location - Hdfs path to use as the location for table data or null to use - * default location. - * @param ifNotExists - If true, no errors are thrown if the table already exists */ private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response) throws MetaException, NoSuchObjectException, AlreadyExistsException, @@ -544,7 +703,8 @@ public class DdlExecutor { ImpalaException, TableLoadingException, TableNotFoundException { Preconditions.checkNotNull(params); - THdfsFileFormat fileFormat = params.isSetFile_format() ? params.getFile_format() : null; + THdfsFileFormat fileFormat = + params.isSetFile_format() ? params.getFile_format() : null; String comment = params.isSetComment() ? params.getComment() : null; TableName tblName = TableName.fromThrift(params.getTable_name()); TableName srcTblName = TableName.fromThrift(params.getSrc_table_name()); @@ -587,6 +747,8 @@ public class DdlExecutor { if (fileFormat != null) { setStorageDescriptorFileFormat(tbl.getSd(), fileFormat); } + // Set the row count of this table to unknown. + tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1"); LOG.debug(String.format("Creating table %s LIKE %s", tblName, srcTblName)); createTable(tbl, params.if_not_exists, response); } diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index 7f398bfaa..e32fd8f6a 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -243,6 +243,13 @@ public class Frontend { req.setCreate_fn_params(stmt.toThrift()); ddl.setDdl_params(req); metadata.setColumns(Collections.emptyList()); + } else if (analysis.isComputeStatsStmt()) { + ddl.op_type = TCatalogOpType.DDL; + TDdlExecRequest req = new TDdlExecRequest(); + req.setDdl_type(TDdlType.COMPUTE_STATS); + req.setCompute_stats_params(analysis.getComputeStatsStmt().toThrift()); + ddl.setDdl_params(req); + metadata.setColumns(Collections.emptyList()); } else if (analysis.isDropDbStmt()) { ddl.op_type = TCatalogOpType.DDL; TDdlExecRequest req = new TDdlExecRequest(); diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 8bab83da9..af7aaec74 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -69,6 +69,7 @@ import com.cloudera.impala.analysis.SqlParserSymbols; keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN)); keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS)); keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT)); + keywordMap.put("compute", new Integer(SqlParserSymbols.KW_COMPUTE)); keywordMap.put("count", new Integer(SqlParserSymbols.KW_COUNT)); keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE)); keywordMap.put("data", new Integer(SqlParserSymbols.KW_DATA)); diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java index c4ade2f5e..b2b32764b 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java @@ -14,6 +14,8 @@ package com.cloudera.impala.analysis; +import static org.junit.Assert.assertTrue; + import java.util.ArrayList; import junit.framework.Assert; @@ -449,6 +451,29 @@ public class AnalyzeDDLTest extends AnalyzerTest { "ALTER VIEW not allowed on a table: functional.alltypes"); } + @Test + public void TestComputeStats() throws AnalysisException { + // Analyze the stmt itself as well as the generated child queries. + ParseNode parseNode = AnalyzesOk("compute stats functional.alltypes"); + assertTrue(parseNode instanceof ComputeStatsStmt); + ComputeStatsStmt stmt = (ComputeStatsStmt) parseNode; + AnalyzesOk(stmt.getTblStatsQuery()); + AnalyzesOk(stmt.getColStatsQuery()); + + parseNode = AnalyzesOk("compute stats functional_hbase.alltypes"); + assertTrue(parseNode instanceof ComputeStatsStmt); + stmt = (ComputeStatsStmt) parseNode; + AnalyzesOk(stmt.getTblStatsQuery()); + AnalyzesOk(stmt.getColStatsQuery()); + + // Cannot compute stats on a database. + AnalysisError("compute stats tbl_does_not_exist", + "Table does not exist: default.tbl_does_not_exist"); + // Cannot compute stats on a view. + AnalysisError("compute stats functional.alltypes_view", + "COMPUTE STATS not allowed on a view: functional.alltypes_view"); + } + @Test public void TestDrop() throws AnalysisException { AnalyzesOk("drop database functional"); diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AuditingTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AuditingTest.java index 2dba780d7..e4f5ea386 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AuditingTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AuditingTest.java @@ -219,6 +219,15 @@ public class AuditingTest extends AnalyzerTest { new TAccessEvent("functional_seq_snap.v1", TCatalogObjectType.VIEW, "CREATE"))); } + @Test + public void TestComputeStats() throws AnalysisException, AuthorizationException { + List accessEvents = AnalyzeAccessEvents( + "COMPUTE STATS functional_seq_snap.alltypes"); + Assert.assertEquals(accessEvents, Lists.newArrayList( + new TAccessEvent( + "functional_seq_snap.alltypes", TCatalogObjectType.TABLE, "ALTER"))); + } + @Test public void TestDescribe() throws AuthorizationException, AnalysisException { List accessEvents = diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java index d3ae8c48f..dfe25119f 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java @@ -697,6 +697,17 @@ public class AuthorizationTest { "functional.alltypes_view"); } + @Test + public void TestComputeStatsTable() throws AnalysisException, AuthorizationException { + AuthzOk("compute stats functional_seq_snap.alltypes"); + + AuthzError("compute stats functional.alltypes", + "User '%s' does not have privileges to execute 'ALTER' on: functional.alltypes"); + AuthzError("compute stats functional.alltypesagg", + "User '%s' does not have privileges to execute 'ALTER' on: " + + "functional.alltypesagg"); + } + @Test public void TestDescribe() throws AuthorizationException, AnalysisException { AuthzOk("describe functional.alltypesagg"); diff --git a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java index 24cf98e4f..c48d2a4f5 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java @@ -1879,6 +1879,23 @@ public class ParserTest { ParserError("refresh"); } + @Test + public void TestComputeStats() { + ParsesOk("compute stats bar"); + ParsesOk("compute stats `bar`"); + ParsesOk("compute stats foo.bar"); + ParsesOk("compute stats `foo`.`bar`"); + + // Missing table name. + ParserError("compute stats"); + // Missing 'stats' keyword. + ParserError("compute foo"); + // Cannot use string literal as table name. + ParserError("compute stats 'foo'"); + // Cannot analyze multiple tables in one stmt. + ParserError("compute stats foo bar"); + } + @Test public void TestGetErrorMsg() { @@ -1888,8 +1905,8 @@ public class ParserTest { "c, b, c from t\n" + "^\n" + "Encountered: IDENTIFIER\n" + - "Expected: ALTER, CREATE, DESCRIBE, DROP, EXPLAIN, INSERT, INVALIDATE, LOAD, " + - "REFRESH, SELECT, SHOW, USE, VALUES, WITH\n"); + "Expected: ALTER, COMPUTE, CREATE, DESCRIBE, DROP, EXPLAIN, INSERT, INVALIDATE, " + + "LOAD, REFRESH, SELECT, SHOW, USE, VALUES, WITH\n"); // missing select list ParserError("select from t", diff --git a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java index 73a3f4bc7..3c0b88792 100644 --- a/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java +++ b/fe/src/test/java/com/cloudera/impala/catalog/CatalogTest.java @@ -418,14 +418,6 @@ public class CatalogTest { // Now try to apply a matching column stats data and ensure it succeeds. assertTrue(table.getColumn("string_col").updateStats(stringColStatsData)); assertEquals(3, table.getColumn("string_col").getStats().getMaxSize()); - - // Finally, try to update stats for a column type that doesn't support stats - try { - table.getColumn("timestamp_col").updateStats(stringColStatsData); - assertTrue("Expected update to fail for unsupported stat column type.", false); - } catch (IllegalStateException e) { - // Ignore - } } finally { // Make sure to invalidate the metadata so the next test isn't using bad col stats catalog.getDb("functional").invalidateTable("functional"); diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test new file mode 100644 index 000000000..d2c0e999f --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats.test @@ -0,0 +1,310 @@ +==== +---- QUERY +# test computing stats on a partitioned text table with all types +create table compute_stats_db.alltypes like functional.alltypes; +insert into compute_stats_db.alltypes partition(year, month) +select * from functional.alltypes; +==== +---- QUERY +compute stats compute_stats_db.alltypes +---- RESULTS +'Updated 24 partition(s) and 11 column(s).' +---- TYPES +STRING +==== +---- QUERY +show table stats compute_stats_db.alltypes +---- LABELS +YEAR, MONTH, #ROWS, #FILES, SIZE, FORMAT +---- RESULTS +2009,1,310,1,'24.56KB','TEXT' +2009,2,280,1,'22.27KB','TEXT' +2009,3,310,1,'24.67KB','TEXT' +2009,4,300,1,'24.06KB','TEXT' +2009,5,310,1,'24.97KB','TEXT' +2009,6,300,1,'24.16KB','TEXT' +2009,7,310,1,'24.97KB','TEXT' +2009,8,310,1,'24.97KB','TEXT' +2009,9,300,1,'24.16KB','TEXT' +2009,10,310,1,'24.97KB','TEXT' +2009,11,300,1,'24.16KB','TEXT' +2009,12,310,1,'24.97KB','TEXT' +2010,1,310,1,'24.97KB','TEXT' +2010,2,280,1,'22.54KB','TEXT' +2010,3,310,1,'24.97KB','TEXT' +2010,4,300,1,'24.16KB','TEXT' +2010,5,310,1,'24.97KB','TEXT' +2010,6,300,1,'24.16KB','TEXT' +2010,7,310,1,'24.97KB','TEXT' +2010,8,310,1,'24.97KB','TEXT' +2010,9,300,1,'24.16KB','TEXT' +2010,10,310,1,'24.97KB','TEXT' +2010,11,300,1,'24.16KB','TEXT' +2010,12,310,1,'24.97KB','TEXT' +Total,,7300,24,'586.84KB','' +---- TYPES +INT, INT, BIGINT, BIGINT, STRING, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypes +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',8161,0,4,4 +'bool_col','BOOLEAN',2,0,1,1 +'tinyint_col','TINYINT',10,0,1,1 +'smallint_col','SMALLINT',10,0,2,2 +'int_col','INT',10,0,4,4 +'bigint_col','BIGINT',10,0,8,8 +'float_col','FLOAT',10,0,4,4 +'double_col','DOUBLE',10,0,8,8 +'date_string_col','STRING',666,0,-1,-1 +'string_col','STRING',10,0,-1,-1 +'timestamp_col','TIMESTAMP',5678,0,16,16 +'year','INT',2,0,4,4 +'month','INT',12,0,4,4 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== +---- QUERY +# test computing stats on an partitioned text table with all types +create table compute_stats_db.alltypesnopart like functional.alltypesnopart; +insert into compute_stats_db.alltypesnopart +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, +double_col, date_string_col, string_col, timestamp_col +from functional.alltypessmall; +==== +---- QUERY +compute stats compute_stats_db.alltypesnopart +---- RESULTS +'Updated 1 partition(s) and 11 column(s).' +---- TYPES +STRING +==== +---- QUERY +show table stats compute_stats_db.alltypesnopart +---- LABELS +#ROWS, #FILES, SIZE, FORMAT +---- RESULTS +100,3,'7.73KB','TEXT' +---- TYPES +BIGINT, BIGINT, STRING, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypesnopart +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',105,0,4,4 +'bool_col','BOOLEAN',2,0,1,1 +'tinyint_col','TINYINT',10,0,1,1 +'smallint_col','SMALLINT',10,0,2,2 +'int_col','INT',10,0,4,4 +'bigint_col','BIGINT',10,0,8,8 +'float_col','FLOAT',10,0,4,4 +'double_col','DOUBLE',10,0,8,8 +'date_string_col','STRING',12,0,-1,-1 +'string_col','STRING',10,0,-1,-1 +'timestamp_col','TIMESTAMP',101,0,16,16 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== +---- QUERY +# test computing stats on a partitioned parquet table with all types +create table compute_stats_db.alltypes_parquet +like functional_parquet.alltypes; +insert into compute_stats_db.alltypes_parquet partition(year, month) +select * from functional.alltypes; +==== +---- QUERY +compute stats compute_stats_db.alltypes_parquet +---- RESULTS +'Updated 24 partition(s) and 11 column(s).' +---- TYPES +STRING +==== +---- QUERY +show table stats compute_stats_db.alltypes_parquet +---- LABELS +YEAR, MONTH, #ROWS, #FILES, SIZE, FORMAT +---- RESULTS +2009,1,310,1,'6.60KB','PARQUET' +2009,2,280,1,'6.14KB','PARQUET' +2009,3,310,1,'6.60KB','PARQUET' +2009,4,300,1,'6.46KB','PARQUET' +2009,5,310,1,'6.60KB','PARQUET' +2009,6,300,1,'6.46KB','PARQUET' +2009,7,310,1,'6.60KB','PARQUET' +2009,8,310,1,'6.60KB','PARQUET' +2009,9,300,1,'6.46KB','PARQUET' +2009,10,310,1,'6.60KB','PARQUET' +2009,11,300,1,'6.46KB','PARQUET' +2009,12,310,1,'6.60KB','PARQUET' +2010,1,310,1,'6.60KB','PARQUET' +2010,2,280,1,'6.14KB','PARQUET' +2010,3,310,1,'6.62KB','PARQUET' +2010,4,300,1,'6.47KB','PARQUET' +2010,5,310,1,'6.60KB','PARQUET' +2010,6,300,1,'6.46KB','PARQUET' +2010,7,310,1,'6.60KB','PARQUET' +2010,8,310,1,'6.60KB','PARQUET' +2010,9,300,1,'6.46KB','PARQUET' +2010,10,310,1,'6.60KB','PARQUET' +2010,11,300,1,'6.46KB','PARQUET' +2010,12,310,1,'6.60KB','PARQUET' +Total,,7300,24,'156.36KB','' +---- TYPES +INT, INT, BIGINT, BIGINT, STRING, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypes_parquet +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',8161,0,4,4 +'bool_col','BOOLEAN',2,0,1,1 +'tinyint_col','TINYINT',10,0,1,1 +'smallint_col','SMALLINT',10,0,2,2 +'int_col','INT',10,0,4,4 +'bigint_col','BIGINT',10,0,8,8 +'float_col','FLOAT',10,0,4,4 +'double_col','DOUBLE',10,0,8,8 +'date_string_col','STRING',666,0,-1,-1 +'string_col','STRING',10,0,-1,-1 +'timestamp_col','TIMESTAMP',5678,0,16,16 +'year','INT',2,0,4,4 +'month','INT',12,0,4,4 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== +---- QUERY +# test computing stats on an HBase table +create table compute_stats_db.alltypessmall_hbase +like functional_hbase.alltypessmall; +==== +---- QUERY +compute stats compute_stats_db.alltypessmall_hbase +---- RESULTS +'Updated 1 partition(s) and 12 column(s).' +---- TYPES +STRING +==== +---- QUERY +show table stats compute_stats_db.alltypessmall_hbase +---- LABELS +REGION LOCATION, START ROWKEY, EST. #ROWS, SIZE +---- RESULTS: VERIFY_IS_EQUAL +regex:.+,'',5,'1.37KB' +regex:.+,'1',42,'10.73KB' +regex:.+,'3',42,'10.77KB' +regex:.+,'5',42,'10.76KB' +regex:.+,'7',42,'10.73KB' +regex:.+,'9',23,'5.86KB' +'Total','',196,'50.22KB' +---- TYPES +STRING, STRING, BIGINT, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypessmall_hbase +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',-1,-1,4,4 +'bigint_col','BIGINT',10,0,8,8 +'bool_col','BOOLEAN',2,0,1,1 +'date_string_col','STRING',12,0,-1,-1 +'double_col','DOUBLE',10,0,8,8 +'float_col','FLOAT',10,0,4,4 +'int_col','INT',10,0,4,4 +'month','INT',4,0,4,4 +'smallint_col','SMALLINT',10,0,2,2 +'string_col','STRING',10,0,-1,-1 +'timestamp_col','TIMESTAMP',101,0,16,16 +'tinyint_col','TINYINT',10,0,1,1 +'year','INT',1,0,4,4 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== +---- QUERY +# test computing stats on an binary HBase table +create table compute_stats_db.alltypessmall_hbase_bin +like functional_hbase.alltypessmallbinary; +==== +---- QUERY +compute stats compute_stats_db.alltypessmall_hbase_bin +---- RESULTS +'Updated 1 partition(s) and 12 column(s).' +---- TYPES +STRING +==== +---- QUERY: VERIFY_IS_EQUAL +show table stats compute_stats_db.alltypessmall_hbase_bin +---- LABELS +REGION LOCATION, START ROWKEY, EST. #ROWS, SIZE +---- RESULTS +regex:.+,'',1,'315B' +---- TYPES +STRING, STRING, BIGINT, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypessmall_hbase_bin +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',-1,-1,4,4 +'bigint_col','BIGINT',10,0,8,8 +'bool_col','BOOLEAN',2,0,1,1 +'date_string_col','STRING',12,0,-1,-1 +'double_col','DOUBLE',10,0,8,8 +'float_col','FLOAT',10,0,4,4 +'int_col','INT',10,0,4,4 +'month','INT',4,0,4,4 +'smallint_col','SMALLINT',10,0,2,2 +'string_col','STRING',10,0,-1,-1 +'timestamp_col','TIMESTAMP',101,0,16,16 +'tinyint_col','TINYINT',10,0,1,1 +'year','INT',1,0,4,4 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== +---- QUERY +# test computing stats on an empty table +create table compute_stats_db.alltypes_empty like functional_rc_snap.alltypes +==== +---- QUERY +compute stats compute_stats_db.alltypes_empty +---- RESULTS +'Updated 0 partition(s) and 11 column(s).' +---- TYPES +STRING +==== +---- QUERY +show table stats compute_stats_db.alltypes_empty +---- LABELS +YEAR, MONTH, #ROWS, #FILES, SIZE, FORMAT +---- RESULTS +Total,,0,0,'0B','' +---- TYPES +INT, INT, BIGINT, BIGINT, STRING, STRING +==== +---- QUERY +show column stats compute_stats_db.alltypes_empty +---- LABELS +COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE +---- RESULTS +'id','INT',0,0,4,4 +'bool_col','BOOLEAN',2,0,1,1 +'tinyint_col','TINYINT',0,0,1,1 +'smallint_col','SMALLINT',0,0,2,2 +'int_col','INT',0,0,4,4 +'bigint_col','BIGINT',0,0,8,8 +'float_col','FLOAT',0,0,4,4 +'double_col','DOUBLE',0,0,8,8 +'date_string_col','STRING',0,0,-1,-1 +'string_col','STRING',0,0,-1,-1 +'timestamp_col','TIMESTAMP',0,0,16,16 +'year','INT',0,0,4,4 +'month','INT',0,0,4,4 +---- TYPES +STRING, STRING, BIGINT, BIGINT, DOUBLE, DOUBLE +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/show.test b/testdata/workloads/functional-query/queries/QueryTest/show.test index e9b4f5aa9..b7a5198dd 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/show.test +++ b/testdata/workloads/functional-query/queries/QueryTest/show.test @@ -217,6 +217,8 @@ STRING ---- QUERY # Stats on a partitioned Hdfs table stored as text show table stats alltypes +---- LABELS +YEAR, MONTH, #ROWS, #FILES, SIZE, FORMAT ---- RESULTS 2009,1,310,1,'19.95KB','TEXT' 2009,2,280,1,'18.12KB','TEXT' @@ -326,7 +328,7 @@ show column stats alltypes COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE ---- RESULTS 'id','INT',9299,0,4,4 -'bool_col','BOOLEAN',-1,0,1,1 +'bool_col','BOOLEAN',2,0,1,1 'tinyint_col','TINYINT',11,0,1,1 'smallint_col','SMALLINT',11,0,2,2 'int_col','INT',11,0,4,4 @@ -335,7 +337,7 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE 'double_col','DOUBLE',11,0,8,8 'date_string_col','STRING',822,0,8,8 'string_col','STRING',9,0,1,1 -'timestamp_col','TIMESTAMP',-1,-1,16,16 +'timestamp_col','TIMESTAMP',7171,0,16,16 'year','INT',2,0,4,4 'month','INT',12,0,4,4 ---- TYPES @@ -349,7 +351,7 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE ---- RESULTS 'id','INT',63,0,4,4 'bigint_col','BIGINT',7,0,8,8 -'bool_col','BOOLEAN',-1,0,1,1 +'bool_col','BOOLEAN',2,0,1,1 'date_string_col','STRING',12,0,8,8 'double_col','DOUBLE',11,0,8,8 'float_col','FLOAT',15,0,4,4 @@ -357,7 +359,7 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE 'month','INT',5,0,4,4 'smallint_col','SMALLINT',11,0,2,2 'string_col','STRING',9,0,1,1 -'timestamp_col','TIMESTAMP',-1,-1,16,16 +'timestamp_col','TIMESTAMP',107,0,16,16 'tinyint_col','TINYINT',11,0,1,1 'year','INT',1,0,4,4 ---- TYPES diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index 8ed928e6d..d74035714 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -17,7 +17,7 @@ from tests.verifiers.metric_verifier import MetricVerifier QUERIES = ['select l_returnflag from lineitem', 'select count(l_returnflag) from lineitem', 'select * from lineitem limit 50', - ] + 'compute stats lineitem'] QUERY_TYPE = ["SELECT", "CTAS"] @@ -47,6 +47,14 @@ class TestCancellation(ImpalaTestSuite): v.get_value('table_format').file_format in ['text', 'parquet'] and\ v.get_value('table_format').compression_codec == 'none')) cls.TestMatrix.add_constraint(lambda v: v.get_value('exec_option')['batch_size'] == 0) + # Ignore 'compute stats' queries for the CTAS query type. + cls.TestMatrix.add_constraint(lambda v: not (v.get_value('query_type') == 'CTAS' and + v.get_value('query').startswith('compute stats'))) + # Ignore debug actions for 'compute stats' because cancellation of 'compute stats' + # relies on child queries eventually making forward progress, but debug actions + # will cause child queries to hang indefinitely. + cls.TestMatrix.add_constraint(lambda v: not (v.get_value('action') == 'WAIT' and + v.get_value('query').startswith('compute stats'))) # tpch tables are not generated for hbase as the data loading takes a very long time. # TODO: Add cancellation tests for hbase. cls.TestMatrix.add_constraint(lambda v:\ @@ -132,7 +140,8 @@ class TestCancellationSerial(TestCancellation): @classmethod def add_test_dimensions(cls): super(TestCancellationSerial, cls).add_test_dimensions() - cls.TestMatrix.add_constraint(lambda v: v.get_value('query_type') == 'CTAS') + cls.TestMatrix.add_constraint(lambda v: v.get_value('query_type') == 'CTAS' or + v.get_value('query').startswith('compute stats')) cls.TestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') != 0) cls.TestMatrix.add_constraint(lambda v: v.get_value('action') is None) # Don't run across all cancel delay options unless running in exhaustive mode diff --git a/tests/query_test/test_compute_stats.py b/tests/query_test/test_compute_stats.py new file mode 100644 index 000000000..51861999e --- /dev/null +++ b/tests/query_test/test_compute_stats.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# Copyright (c) 2012 Cloudera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from tests.common.test_vector import * +from tests.common.impala_test_suite import * + +# Tests the COMPUTE STATS command for gathering table and column stats. +# TODO: Merge this test file with test_col_stats.py +class TestComputeStats(ImpalaTestSuite): + TEST_DB_NAME = "compute_stats_db" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestComputeStats, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(create_single_exec_option_dimension()) + # Do not run these tests using all dimensions because the expected results + # are different for different file formats. + cls.TestMatrix.add_constraint(lambda v:\ + v.get_value('table_format').file_format == 'text' and\ + v.get_value('table_format').compression_codec == 'none') + + def setup_method(self, method): + # cleanup and create a fresh test database + self.cleanup_db(self.TEST_DB_NAME) + self.client.refresh() + self.execute_query("create database %s" % (self.TEST_DB_NAME)) + + def teardown_method(self, method): + self.cleanup_db(self.TEST_DB_NAME) + + def test_compute_stats(self, vector): + self.run_test_case('QueryTest/compute-stats', vector)