IMPALA-13005: Create Query Live table in HMS

Creates the 'sys.impala_query_live' table in HMS using a similar 'CREATE
TABLE' command to 'sys.impala_query_log'. Updates frontend to identify a
System Table based on the '__IMPALA_SYSTEM_TABLE' property. Tables
improperly marked with '__IMPALA_SYSTEM_TABLE' will error when
attempting to scan them because no relevant scanner will be available.

Creating the table in HMS simplifies supporting 'SHOW CREATE TABLE' and
'DESCRIBE EXTENDED', so allows them for parity with Query Log.
Explicitly disables 'COMPUTE STATS' on system tables as it doesn't work
correctly.

Makes System Tables work with local catalog mode, fixing

  LocalCatalogException: Unknown table type for table sys.impala_query_live

Updates workload management implementation to rely more on
SystemTables.thrift definition, and adds DCHECKs to verify completeness
and ordering.

Testing:
- adds additional test cases for changes to introspection commands
- passes existing test_query_live and test_query_log suites

Change-Id: Idf302ee54a819fdee2db0ae582a5eeddffe4a5b4
Reviewed-on: http://gerrit.cloudera.org:8080/21302
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Michael Smith
2024-04-15 15:10:43 -07:00
parent 29e4186793
commit 73a9ef9c4c
26 changed files with 435 additions and 294 deletions

View File

@@ -75,6 +75,7 @@ set(SRC_FILES
StatestoreService_types.cpp
StatestoreSubscriber.cpp
Status_types.cpp
SystemTables_types.cpp
Types_types.cpp
Zip_types.cpp
)

View File

@@ -54,7 +54,7 @@ static constexpr double NANOS_TO_MILLIS = 1000000;
Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile* profile,
TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner) {
switch (table_name) {
case TSystemTableName::QUERY_LIVE:
case TSystemTableName::IMPALA_QUERY_LIVE:
*scanner = make_unique<QueryScanner>(state, profile);
break;
default:
@@ -174,6 +174,8 @@ Status QueryScanner::MaterializeNextTuple(
const QueryStateExpanded& query = *query_records_.front();
const QueryStateRecord& record = *query.base_state;
ExecEnv* exec_env = ExecEnv::GetInstance();
// Verify there are no clustering columns (partitions) to offset col_pos.
DCHECK_EQ(0, tuple_desc->table_desc()->num_clustering_cols());
for (const SlotDescriptor* slot_desc : tuple_desc->slots()) {
void* slot = tuple->GetSlot(slot_desc->tuple_offset());

View File

@@ -92,35 +92,35 @@ static void _write_event(FieldParserContext& ctx, QueryEvent target_event) {
}
/// List of query table columns. Must be kept in-sync with SystemTables.thrift
const list<FieldDefinition> FIELD_DEFINITIONS = {
const array<FieldDefinition, NumQueryTableColumns> FIELD_DEFINITIONS{{
// Cluster Id
// Required
FieldDefinition("cluster_id", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::CLUSTER_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.cluster_id << "'";
}),
// Query Id
FieldDefinition("query_id", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::QUERY_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintId(ctx.record->base_state->id) << "'";
}),
// Session Id
FieldDefinition("session_id", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::SESSION_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintId(ctx.record->session_id) << "'";
}),
// Session Type
FieldDefinition("session_type", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::SESSION_TYPE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->session_type << "'";
}),
// Hiveserver2 Protocol Version
FieldDefinition("hiveserver2_protocol_version", TPrimitiveType::STRING,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION,
TPrimitiveType::STRING, [](FieldParserContext& ctx){
ctx.sql << "'";
if (ctx.record->session_type == TSessionType::HIVESERVER2) {
ctx.sql << ctx.record->hiveserver2_protocol_version;
@@ -129,32 +129,32 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Effective User
FieldDefinition("db_user", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::DB_USER, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->effective_user << "'";
}),
// DB User
FieldDefinition("db_user_connection", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::DB_USER_CONNECTION, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->db_user_connection << "'";
}),
// Default DB
FieldDefinition("db_name", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::DB_NAME, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->default_db << "'";
}),
// Impala Coordinator
FieldDefinition("impala_coordinator", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::IMPALA_COORDINATOR, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" <<TNetworkAddressToString(
ExecEnv::GetInstance()->configured_backend_address()) << "'";
}),
// Query Status
FieldDefinition("query_status", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::QUERY_STATUS, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'";
if (ctx.record->base_state->query_status.ok()) {
@@ -166,72 +166,72 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Query State
FieldDefinition("query_state", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::QUERY_STATE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->query_state << "'";
}),
// Impala Query End State
FieldDefinition("impala_query_end_state",
TPrimitiveType::STRING, [](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::IMPALA_QUERY_END_STATE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->impala_query_end_state << "'";
}),
// Query Type
FieldDefinition("query_type", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::QUERY_TYPE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->stmt_type << "'";
}),
// Client Network Address
FieldDefinition("network_address", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::NETWORK_ADDRESS, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << TNetworkAddressToString(ctx.record->client_address) << "'";
}),
// Query Start Time in UTC
// Required
FieldDefinition("start_time_utc", TPrimitiveType::TIMESTAMP,
FieldDefinition(TQueryTableColumn::START_TIME_UTC, TPrimitiveType::TIMESTAMP,
[](FieldParserContext& ctx){
ctx.sql << "UNIX_MICROS_TO_UTC_TIMESTAMP(" <<
ctx.record->base_state->start_time_us << ")";
}),
// Query Duration
FieldDefinition("total_time_ms", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::TOTAL_TIME_MS, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, (ctx.record->base_state->end_time_us -
ctx.record->base_state->start_time_us), MICROS_TO_MILLIS);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Query Options set by Configuration
FieldDefinition("query_opts_config", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::QUERY_OPTS_CONFIG, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
const string opts_str = DebugQueryOptions(ctx.record->query_options);
ctx.sql << "'" << EscapeSql(opts_str) << "'";
}),
// Resource Pool
FieldDefinition("resource_pool", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::RESOURCE_POOL, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->base_state->resource_pool) << "'";
}),
// Per-host Memory Estimate
FieldDefinition("per_host_mem_estimate", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::PER_HOST_MEM_ESTIMATE, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->per_host_mem_estimate;
}),
// Dedicated Coordinator Memory Estimate
FieldDefinition("dedicated_coord_mem_estimate", TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE,
TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
ctx.sql << ctx.record->dedicated_coord_mem_estimate;
}),
// Per-Host Fragment Instances
FieldDefinition("per_host_fragment_instances", TPrimitiveType::STRING,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES,
TPrimitiveType::STRING, [](FieldParserContext& ctx){
ctx.sql << "'";
if (!ctx.record->per_host_state.empty()) {
@@ -246,7 +246,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Backends Count
FieldDefinition("backends_count", TPrimitiveType::INT,
FieldDefinition(TQueryTableColumn::BACKENDS_COUNT, TPrimitiveType::INT,
[](FieldParserContext& ctx){
if (ctx.record->per_host_state.empty()) {
ctx.sql << 0;
@@ -256,133 +256,133 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Admission Result
FieldDefinition("admission_result", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::ADMISSION_RESULT, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->admission_result << "'";
}),
// Cluster Memory Admitted
FieldDefinition("cluster_memory_admitted", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::CLUSTER_MEMORY_ADMITTED, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->base_state->cluster_mem_est;
}),
// Executor Group
FieldDefinition("executor_group", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::EXECUTOR_GROUP, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->executor_group << "'";
}),
// Executor Groups
FieldDefinition("executor_groups", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::EXECUTOR_GROUPS, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->executor_groups) << "'";
}),
// Exec Summary (also known as the operator summary)
FieldDefinition("exec_summary", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::EXEC_SUMMARY, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->exec_summary) << "'";
}),
// Number of rows fetched
FieldDefinition("num_rows_fetched", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::NUM_ROWS_FETCHED, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->base_state->num_rows_fetched;
}),
// Row Materialization Rate
FieldDefinition("row_materialization_rows_per_sec", TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC,
TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
ctx.sql << ctx.record->row_materialization_rate;
}),
// Row Materialization Time
FieldDefinition("row_materialization_time_ms", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->row_materialization_time, NANOS_TO_MILLIS);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Compressed Bytes Spilled to Disk
FieldDefinition("compressed_bytes_spilled", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::COMPRESSED_BYTES_SPILLED, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->compressed_bytes_spilled;
}),
// Events Timeline Planning Finished
FieldDefinition("event_planning_finished", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::EVENT_PLANNING_FINISHED, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, PLANNING_FINISHED);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Submit for Admission
FieldDefinition("event_submit_for_admission", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, SUBMIT_FOR_ADMISSION);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Completed Admission
FieldDefinition("event_completed_admission", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::EVENT_COMPLETED_ADMISSION,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, COMPLETED_ADMISSION);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline All Execution Backends Started
FieldDefinition("event_all_backends_started", TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
FieldDefinition(TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, ALL_BACKENDS_STARTED);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Rows Available
FieldDefinition("event_rows_available", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::EVENT_ROWS_AVAILABLE, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, ROWS_AVAILABLE);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline First Row Fetched
FieldDefinition("event_first_row_fetched", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::EVENT_FIRST_ROW_FETCHED, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, FIRST_ROW_FETCHED);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Last Row Fetched
FieldDefinition("event_last_row_fetched", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::EVENT_LAST_ROW_FETCHED, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, LAST_ROW_FETCHED);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Unregister Query
FieldDefinition("event_unregister_query", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::EVENT_UNREGISTER_QUERY, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, UNREGISTER_QUERY);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Total
FieldDefinition("read_io_wait_total_ms", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::READ_IO_WAIT_TOTAL_MS, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->read_io_wait_time_total, NANOS_TO_MILLIS);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Mean
FieldDefinition("read_io_wait_mean_ms", TPrimitiveType::DECIMAL,
FieldDefinition(TQueryTableColumn::READ_IO_WAIT_MEAN_MS, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->read_io_wait_time_mean, NANOS_TO_MILLIS);
}, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Bytes Read from the Data Cache Total
FieldDefinition("bytes_read_cache_total", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::BYTES_READ_CACHE_TOTAL, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->bytes_read_cache_total;
}),
// Bytes Read Total
FieldDefinition("bytes_read_total", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::BYTES_READ_TOTAL, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->bytes_read_total;
}),
// Per-Node Peak Memory Usage Min
FieldDefinition("pernode_peak_mem_min", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MIN, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
auto min_elem = min_element(ctx.record->per_host_state.cbegin(),
ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator);
@@ -395,7 +395,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Per-Node Peak Memory Usage Max
FieldDefinition("pernode_peak_mem_max", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MAX, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
auto max_elem = max_element(ctx.record->per_host_state.cbegin(),
ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator);
@@ -408,7 +408,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Per-Node Peak Memory Usage Mean
FieldDefinition("pernode_peak_mem_mean", TPrimitiveType::BIGINT,
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MEAN, TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
int64_t calc_mean = 0;
@@ -424,14 +424,14 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// SQL Statement
FieldDefinition("sql", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::SQL, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" <<
EscapeSql(ctx.record->redacted_sql, FLAGS_query_log_max_sql_length) << "'";
}),
// Query Plan
FieldDefinition("plan", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::PLAN, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'"
<< EscapeSql(ctx.record->base_state->plan, FLAGS_query_log_max_plan_length)
@@ -439,12 +439,12 @@ const list<FieldDefinition> FIELD_DEFINITIONS = {
}),
// Tables Queried
FieldDefinition("tables_queried", TPrimitiveType::STRING,
FieldDefinition(TQueryTableColumn::TABLES_QUERIED, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'";
}),
}; // FIELDS_PARSERS constant list
}}; // FIELDS_PARSERS const array
} //namespace workload_management

View File

@@ -26,6 +26,7 @@
#include <thread>
#include <utility>
#include <boost/algorithm/string/case_conv.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gutil/strings/strcat.h>
@@ -33,8 +34,10 @@
#include "common/compiler-util.h"
#include "common/logging.h"
#include "common/status.h"
#include "gen-cpp/CatalogObjects_constants.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Query_types.h"
#include "gen-cpp/SystemTables_types.h"
#include "gen-cpp/Types_types.h"
#include "runtime/query-driver.h"
#include "service/client-request-state.h"
@@ -93,20 +96,33 @@ static inline bool MaxRecordsExceeded(size_t record_count) noexcept {
return FLAGS_query_log_max_queued > 0 && record_count > FLAGS_query_log_max_queued;
} // function MaxRecordsExceeded
/// Sets up the completed queries database and table by generating and executing the
/// necessary DML statements.
static const Status SetupDbTable(InternalServer* server, const string& table_name) {
/// Sets up the sys database generating and executing the necessary DML statements.
static const Status SetupDb(InternalServer* server) {
insert_query_opts.__set_sync_ddl(true);
RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
StrCat("create database if not exists ", DB, " comment "
StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
"'System database for Impala introspection'"), insert_query_opts, false));
insert_query_opts.__set_sync_ddl(false);
return Status::OK();
} // function SetupDb
/// Returns column name as lower-case to match common SQL style.
static string GetColumnName(const FieldDefinition& field) {
std::string column_name = to_string(field.db_column);
boost::algorithm::to_lower(column_name);
return column_name;
}
/// Sets up the query table by generating and executing the necessary DML statements.
static const Status SetupTable(InternalServer* server, const string& table_name,
bool is_system_table = false) {
insert_query_opts.__set_sync_ddl(true);
StringStreamPop create_table_sql;
create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
create_table_sql << field.db_column_name << " " << field.db_column_type;
create_table_sql << GetColumnName(field) << " " << field.db_column_type;
if (field.db_column_type == TPrimitiveType::DECIMAL) {
create_table_sql << "(" << field.precision << "," << field.scale << ")";
@@ -116,16 +132,23 @@ static const Status SetupDbTable(InternalServer* server, const string& table_nam
}
create_table_sql.move_back();
create_table_sql << ") PARTITIONED BY SPEC(identity(cluster_id), HOUR(start_time_utc)) "
<< "STORED AS iceberg ";
create_table_sql << ") ";
if (!FLAGS_query_log_table_location.empty()) {
create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' ";
if (!is_system_table) {
create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id), HOUR(start_time_utc)) "
<< "STORED AS iceberg ";
if (!FLAGS_query_log_table_location.empty()) {
create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' ";
}
}
create_table_sql << "TBLPROPERTIES ('schema_version'='1.0.0','format-version'='2'";
if (!FLAGS_query_log_table_props.empty()) {
if (is_system_table) {
create_table_sql << ",'"
<< g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE <<"'='true'";
} else if (!FLAGS_query_log_table_props.empty()) {
create_table_sql << "," << FLAGS_query_log_table_props;
}
@@ -136,12 +159,11 @@ static const Status SetupDbTable(InternalServer* server, const string& table_nam
insert_query_opts.__set_sync_ddl(false);
LOG(INFO) << "Completed query log initialization. storage_type=\""
<< FLAGS_enable_workload_mgmt << "\" write_interval=\"" <<
LOG(INFO) << "Completed " << table_name << " initialization. write_interval=\"" <<
FLAGS_query_log_write_interval_s << "s\"";
return Status::OK();
} // function SetupDbTable
} // function SetupTable
/// Iterates through the list of field in `FIELDS_PARSERS` executing each parser for the
/// given `QueryStateExpanded` object. This function builds the `FieldParserContext`
@@ -179,6 +201,13 @@ size_t ImpalaServer::NumLiveQueries() {
}
Status ImpalaServer::InitWorkloadManagement() {
// Verify FIELD_DEFINITIONS includes all QueryTableColumns.
DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(), FIELD_DEFINITIONS.size());
for (const auto& field : FIELD_DEFINITIONS) {
// Verify all fields match their column position.
DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column);
}
if (FLAGS_enable_workload_mgmt) {
return Thread::Create("impala-server", "completed-queries",
bind<void>(&ImpalaServer::CompletedQueriesThread, this),
@@ -273,6 +302,17 @@ void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle,
PrintId(query_handle->query_id()) << "'";
} // ImpalaServer::EnqueueCompletedQuery
static string get_insert_prefix(const string& table_name) {
StringStreamPop fields;
fields << "INSERT INTO " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
fields << GetColumnName(field) << ",";
}
fields.move_back();
fields << ") VALUES ";
return fields.str();
}
void ImpalaServer::CompletedQueriesThread() {
{
lock_guard<mutex> l(completed_queries_threadstate_mu_);
@@ -288,21 +328,19 @@ void ImpalaServer::CompletedQueriesThread() {
}
// Fully qualified table name based on startup flags.
const string table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
// Non-values portion of the completed queries insert dml. Does not change across
// queries.
StringStreamPop fields;
fields << "INSERT INTO " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
fields << field.db_column_name << ",";
}
fields.move_back();
fields << ") VALUES ";
_insert_dml = fields.str();
_insert_dml = get_insert_prefix(log_table_name);
// The initialization code only works when run in a separate thread for reasons unknown.
ABORT_IF_ERROR(SetupDbTable(internal_server_.get(), table_name));
ABORT_IF_ERROR(SetupDb(internal_server_.get()));
ABORT_IF_ERROR(SetupTable(internal_server_.get(), log_table_name));
std::string live_table_name = to_string(TSystemTableName::IMPALA_QUERY_LIVE);
boost::algorithm::to_lower(live_table_name);
ABORT_IF_ERROR(SetupTable(internal_server_.get(),
StrCat(DB, ".", live_table_name), true));
{
lock_guard<mutex> l(completed_queries_threadstate_mu_);
@@ -373,7 +411,7 @@ void ImpalaServer::CompletedQueriesThread() {
for (auto iter = queries_to_insert.begin(); iter != queries_to_insert.end();
iter++) {
if (iter->insert_attempts_count >= FLAGS_query_log_max_insert_attempts) {
LOG(ERROR) << "could not write completed query table=\"" << table_name <<
LOG(ERROR) << "could not write completed query table=\"" << log_table_name <<
"\" query_id=\"" << PrintId(iter->query->base_state->id) << "\"";
iter = queries_to_insert.erase(iter);
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(-1);
@@ -424,7 +462,7 @@ void ImpalaServer::CompletedQueriesThread() {
&tmp_query_id);
if (ret_status.ok()) {
LOG(INFO) << "wrote completed queries table=\"" << table_name << "\" "
LOG(INFO) << "wrote completed queries table=\"" << log_table_name << "\" "
"record_count=\"" << queries_to_insert.size() << "\"";
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(
queries_to_insert.size() * -1);
@@ -432,8 +470,8 @@ void ImpalaServer::CompletedQueriesThread() {
ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(
queries_to_insert.size());
} else {
LOG(WARNING) << "failed to write completed queries table=\"" << table_name <<
"\" record_count=\"" << queries_to_insert.size() << "\"";
LOG(WARNING) << "failed to write completed queries table=\"" <<
log_table_name << "\" record_count=\"" << queries_to_insert.size() << "\"";
LOG(WARNING) << ret_status.GetDetail();
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
completed_queries_lock_.lock();

View File

@@ -17,13 +17,14 @@
#pragma once
#include <list>
#include <array>
#include <memory>
#include <string>
#include <utility>
#include <gflags/gflags.h>
#include "gen-cpp/SystemTables_types.h"
#include "gen-cpp/Types_types.h"
#include "service/query-state-record.h"
#include "util/string-util.h"
@@ -50,24 +51,27 @@ using FieldParser = void (*)(FieldParserContext&);
/// Contains all necessary information for the definition and parsing of a single field
/// in workload management.
struct FieldDefinition {
const std::string db_column_name;
const TQueryTableColumn::type db_column;
const TPrimitiveType::type db_column_type;
const FieldParser parser;
const int16_t precision;
const int16_t scale;
FieldDefinition(const std::string db_col_name, const TPrimitiveType::type db_col_type,
const FieldParser fp, const int16_t precision = 0,
const int16_t scale = 0) : db_column_name(std::move(db_col_name)),
db_column_type(std::move(db_col_type)), parser(std::move(fp)),
precision(precision), scale(scale) {}
FieldDefinition(const TQueryTableColumn::type db_col,
const TPrimitiveType::type db_col_type, const FieldParser fp,
const int16_t precision = 0, const int16_t scale = 0) :
db_column(std::move(db_col)), db_column_type(std::move(db_col_type)),
parser(std::move(fp)), precision(precision), scale(scale) {}
}; // struct FieldDefinition
/// Number of query table columns
constexpr size_t NumQueryTableColumns = TQueryTableColumn::TABLES_QUERIED + 1;
/// This list is the main data structure for workload management. Each list entry
/// contains the name of a column in the completed queries table, the type of that column,
/// and an implementation of a FieldParser for generating the value of that column from a
/// `QueryStateExpanded` object.
extern const std::list<FieldDefinition> FIELD_DEFINITIONS;
extern const std::array<FieldDefinition, NumQueryTableColumns> FIELD_DEFINITIONS;
/// Track the state of the thread that processes the completed queries queue. Access to
/// the ThreadState variable must only happen after taking a lock on the associated mutex.

View File

@@ -146,6 +146,9 @@ enum TTablePropertyType {
SERDE_PROPERTY = 1
}
// Table properties used by Impala
const string TBL_PROP_SYSTEM_TABLE = "__IMPALA_SYSTEM_TABLE"
// The access level that is available to Impala on the Catalog object.
enum TAccessLevel {
NONE = 0
@@ -669,10 +672,10 @@ struct TIcebergTable {
10: optional map<string, TIcebergPartitionStats> partition_stats;
}
// Describes the purpose of a particular system table.
// Table names can be found in SystemTable.java
// System Table identifiers.
// These are used as the table name, so should not be changed.
enum TSystemTableName {
QUERY_LIVE = 0
IMPALA_QUERY_LIVE = 0
}
// Represents a System Table

View File

@@ -19,6 +19,7 @@ namespace cpp impala
namespace java org.apache.impala.thrift
# Must be kept in-sync with workload-management-fields.cc
# Used as column names, so do not change existing enums.
enum TQueryTableColumn {
CLUSTER_ID
QUERY_ID

View File

@@ -59,6 +59,7 @@ import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeIncompleteTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.IcebergTimeTravelTable;
@@ -68,7 +69,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.TypeCompatibility;
@@ -982,12 +982,14 @@ public class Analyzer {
if (table instanceof IcebergMetadataTable) {
return new IcebergMetadataTableRef(tableRef, resolvedPath);
}
if (table instanceof FeSystemTable) {
return new SystemTableRef(tableRef, resolvedPath);
}
// The table must be a base table.
Preconditions.checkState(table instanceof FeFsTable ||
table instanceof FeKuduTable ||
table instanceof FeHBaseTable ||
table instanceof FeDataSourceTable ||
table instanceof SystemTable);
table instanceof FeDataSourceTable);
return new BaseTableRef(tableRef, resolvedPath);
} else {
return new CollectionTableRef(tableRef, resolvedPath, false);

View File

@@ -403,6 +403,10 @@ public class ComputeStatsStmt extends StatementBase {
throw new AnalysisException(String.format(
"COMPUTE STATS not supported for nested collection: %s", tableName_));
}
if (tableRef instanceof SystemTableRef) {
throw new AnalysisException(String.format(
"COMPUTE STATS not supported for system table: %s", tableName_));
}
table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT);
if (!(table_ instanceof FeFsTable)) {

View File

@@ -24,7 +24,6 @@ import org.apache.impala.analysis.Path.PathType;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
import org.apache.impala.common.AnalysisException;
@@ -139,7 +138,6 @@ public class DescribeTableStmt extends StatementBase {
analyzer.getTable(table_.getTableName(), /* add column-level privilege */ true,
Privilege.ANY);
checkMinimalForIcebergMetadataTable();
checkMinimalForSystemTable();
if (!targetsTable()) analyzeComplexType(analyzer);
}
@@ -176,13 +174,6 @@ public class DescribeTableStmt extends StatementBase {
}
}
private void checkMinimalForSystemTable() throws AnalysisException {
if (table_ instanceof SystemTable && outputStyle_ != TDescribeOutputStyle.MINIMAL) {
throw new AnalysisException(
"DESCRIBE FORMATTED|EXTENDED cannot refer to a system table.");
}
}
public TDescribeTableParams toThrift() {
TDescribeTableParams params = new TDescribeTableParams();
params.setOutput_style(outputStyle_);

View File

@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TTableName;
@@ -77,8 +76,6 @@ public class ShowCreateTableStmt extends StatementBase {
// statement references a column by its implicitly defined column names.
viewAnalyzer.setUseHiveColLabels(true);
viewQuery.analyze(viewAnalyzer);
} else if (table instanceof SystemTable) {
throw new AnalysisException("Not supported on system tables.");
}
}

View File

@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import org.apache.impala.catalog.FeSystemTable;
import com.google.common.base.Preconditions;
/**
* TableRef class for system tables.
*
* Represents a table that is registered as a normal table in HMS, but content is
* constructed in-memory. Currently COMPUTE STATS does not work on these tables, and
* write operations are not allowed.
*/
public class SystemTableRef extends BaseTableRef {
public SystemTableRef(TableRef tableRef, Path resolvedPath) {
super(tableRef, resolvedPath);
Preconditions.checkState(resolvedPath.getRootTable() instanceof FeSystemTable);
}
}

View File

@@ -126,6 +126,7 @@ import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TSystemTableName;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableType;
@@ -392,7 +393,8 @@ public class CatalogServiceCatalog extends Catalog {
Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0,
"topic_update_tbl_max_wait_time_ms must be positive");
impalaSysTables = Arrays.asList(
BackendConfig.INSTANCE.queryLogTableName(), SystemTable.QUERY_LIVE);
BackendConfig.INSTANCE.queryLogTableName(),
TSystemTableName.IMPALA_QUERY_LIVE.toString().toLowerCase());
tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
loadInBackground_ = loadInBackground;
try {

View File

@@ -133,13 +133,6 @@ public class Db extends CatalogObjectImpl implements FeDb {
setMetastoreDb(name, msDb);
tableCache_ = new CatalogObjectCache<>();
functions_ = new HashMap<>();
// This constructor is called from a static initializer in tests.
if (BackendConfig.INSTANCE != null && BackendConfig.INSTANCE.enableWorkloadMgmt() &&
name.equalsIgnoreCase(SYS)) {
// Add built-in tables.
addTable(SystemTable.CreateQueryLiveTable(this, getOwnerUser()));
}
}
public long getCreateEventId() { return createEventId_; }

View File

@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TSystemTableName;
/**
* Represents a system table backed by internal memory.
*/
public interface FeSystemTable extends FeTable {
// Gets the system table identifier.
TSystemTableName getSystemTableName();
// TODO(todd): it seems like all FeTables implement this, perhaps
// this should just be a method on FeTable and simplify the code
// in Frontend.getTableStats?
TResultSet getTableStats();
}

View File

@@ -17,24 +17,17 @@
package org.apache.impala.catalog;
import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READ;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.impala.common.InternalException;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.CatalogObjectsConstants;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TQueryTableColumn;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TSystemTable;
@@ -44,95 +37,37 @@ import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.TResultRowBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Preconditions;
/**
* Represents a system table reflecting backend internal state.
*/
public final class SystemTable extends Table {
public static final String QUERY_LIVE = "impala_query_live";
private static final Map<String, TSystemTableName> SYSTEM_TABLE_NAME_MAP =
ImmutableMap.of(QUERY_LIVE, TSystemTableName.QUERY_LIVE);
// Constants declaring how durations measured in milliseconds will be stored in the db.
// Must match constants with the same name declared in workload-management-fields.cc.
private static final int DURATION_DECIMAL_PRECISION = 18;
private static final int DURATION_DECIMAL_SCALE = 3;
private SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
public final class SystemTable extends Table implements FeSystemTable {
protected SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
String name, String owner) {
super(msTable, db, name, owner);
// System Tables are read-only.
accessLevel_ = TAccessLevel.READ_ONLY;
}
// Get Type for a TQueryTableColumn
private static Type getColumnType(TQueryTableColumn column) {
switch (column) {
case START_TIME_UTC:
return Type.TIMESTAMP;
case PER_HOST_MEM_ESTIMATE:
case DEDICATED_COORD_MEM_ESTIMATE:
case CLUSTER_MEMORY_ADMITTED:
case NUM_ROWS_FETCHED:
case ROW_MATERIALIZATION_ROWS_PER_SEC:
case COMPRESSED_BYTES_SPILLED:
case BYTES_READ_CACHE_TOTAL:
case BYTES_READ_TOTAL:
case PERNODE_PEAK_MEM_MIN:
case PERNODE_PEAK_MEM_MAX:
case PERNODE_PEAK_MEM_MEAN:
return Type.BIGINT;
case BACKENDS_COUNT:
return Type.INT;
case TOTAL_TIME_MS:
case ROW_MATERIALIZATION_TIME_MS:
case EVENT_PLANNING_FINISHED:
case EVENT_SUBMIT_FOR_ADMISSION:
case EVENT_COMPLETED_ADMISSION:
case EVENT_ALL_BACKENDS_STARTED:
case EVENT_ROWS_AVAILABLE:
case EVENT_FIRST_ROW_FETCHED:
case EVENT_LAST_ROW_FETCHED:
case EVENT_UNREGISTER_QUERY:
case READ_IO_WAIT_TOTAL_MS:
case READ_IO_WAIT_MEAN_MS:
return ScalarType.createDecimalType(
DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE);
default:
return Type.STRING;
}
}
public static SystemTable CreateQueryLiveTable(Db db, String owner) {
List<FieldSchema> fsList = new ArrayList<FieldSchema>();
for (TQueryTableColumn column : TQueryTableColumn.values()) {
// The type string must be lowercase for Hive to read the column metadata properly.
String typeSql = getColumnType(column).toSql().toLowerCase();
FieldSchema fs = new FieldSchema(column.name().toLowerCase(), typeSql, "");
fsList.add(fs);
}
org.apache.hadoop.hive.metastore.api.Table msTable =
createMetastoreTable(db.getName(), QUERY_LIVE, owner, fsList);
SystemTable table = new SystemTable(msTable, db, QUERY_LIVE, owner);
for (TQueryTableColumn column : TQueryTableColumn.values()) {
table.addColumn(new Column(
column.name().toLowerCase(), getColumnType(column), column.ordinal()));
}
return table;
}
@Override // FeSystemTable
public TSystemTableName getSystemTableName() {
return SYSTEM_TABLE_NAME_MAP.get(getName());
return TSystemTableName.valueOf(getName().toUpperCase());
}
public static boolean isSystemTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
String value = msTbl.getParameters().get(
CatalogObjectsConstants.TBL_PROP_SYSTEM_TABLE);
return value != null && BooleanUtils.toBoolean(value);
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
// Create thrift descriptors to send to the BE.
TTableDescriptor tableDescriptor =
new TTableDescriptor(tableId, TTableType.SYSTEM_TABLE, getTColumnDescriptors(),
numClusteringCols_, name_, db_.getName());
TTableDescriptor tableDescriptor = new TTableDescriptor(tableId,
TTableType.SYSTEM_TABLE, getTColumnDescriptors(),
getNumClusteringCols(), getName(), getDb().getName());
tableDescriptor.setSystemTable(getTSystemTable());
return tableDescriptor;
}
@@ -164,8 +99,13 @@ public final class SystemTable extends Table {
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl, String reason,
EventSequence catalogTimeline) throws TableLoadingException {
// Table is always loaded.
Preconditions.checkState(false);
int pos = colsByPos_.size();
// Should be no partition columns.
Preconditions.checkState(pos == 0);
for (FieldSchema s: msTbl.getSd().getCols()) {
Type type = FeCatalogUtils.parseColumnType(s, getName());
addColumn(new Column(s.getName(), type, s.getComment(), pos++));
}
}
/**
@@ -184,6 +124,7 @@ public final class SystemTable extends Table {
* TABLE STATS statement. The schema of the returned TResultSet is set inside
* this method.
*/
@Override // FeSystemTable
public TResultSet getTableStats() {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
@@ -194,32 +135,4 @@ public final class SystemTable extends Table {
result.addToRows(rowBuilder.get());
return result;
}
private static org.apache.hadoop.hive.metastore.api.Table
createMetastoreTable(String dbName, String tableName, String owner,
List<FieldSchema> columns) {
// Based on CatalogOpExecutor#createMetaStoreTable
org.apache.hadoop.hive.metastore.api.Table tbl =
new org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tableName);
tbl.setOwner(owner);
tbl.setParameters(new HashMap<String, String>());
tbl.setTableType(TableType.MANAGED_TABLE.toString());
tbl.setPartitionKeys(new ArrayList<FieldSchema>());
if (MetastoreShim.getMajorVersion() > 2) {
MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READ);
}
StorageDescriptor sd = new StorageDescriptor();
sd.setSerdeInfo(new org.apache.hadoop.hive.metastore.api.SerDeInfo());
sd.getSerdeInfo().setParameters(new HashMap<>());
sd.setCompressed(false);
sd.setBucketCols(new ArrayList<>(0));
sd.setSortCols(new ArrayList<>(0));
sd.setCols(columns);
tbl.setSd(sd);
return tbl;
}
}

View File

@@ -549,6 +549,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// have a special table property to indicate that Impala should use an external
// data source.
table = new DataSourceTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (SystemTable.isSystemTable(msTbl)) {
table = new SystemTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
table = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
}

View File

@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.local;
import java.util.Set;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.catalog.FeCatalogUtils;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.InternalException;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TSystemTable;
import org.apache.impala.thrift.TSystemTableName;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.TResultRowBuilder;
import com.google.common.base.Preconditions;
/**
* System table instance loaded from {@link LocalCatalog}.
*
* System tables are identified by the TBL_PROP_SYSTEM_TABLE table parameter.
*/
public class LocalSystemTable extends LocalTable implements FeSystemTable {
public static LocalSystemTable load(LocalDb db, Table msTbl, TableMetaRef ref) {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(msTbl);
return new LocalSystemTable(db, msTbl, ref);
}
private LocalSystemTable(LocalDb db, Table msTbl, TableMetaRef ref) {
super(db, msTbl, ref);
}
@Override // FeSystemTable
public TSystemTableName getSystemTableName() {
return TSystemTableName.valueOf(getName().toUpperCase());
}
@Override
public long getNumRows() {
try {
// Return an estimate of the number of live queries assuming balanced load across
// coordinators.
return FeSupport.NumLiveQueries() * FeSupport.GetCoordinators().getAddressesSize();
} catch (InternalException e) {
return super.getNumRows();
}
}
/**
* Returns statistics on this table as a tabular result set. Used for the
* SHOW TABLE STATS statement. The schema of the returned TResultSet is set
* inside this method.
*/
@Override // FeSystemTable
public TResultSet getTableStats() {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
result.setSchema(resultSchema);
TResultRowBuilder rowBuilder = new TResultRowBuilder();
rowBuilder.add(getNumRows());
result.addToRows(rowBuilder.get());
return result;
}
@Override
public TTableDescriptor toThriftDescriptor(
int tableId, Set<Long> referencedPartitions) {
TTableDescriptor tableDescriptor = new TTableDescriptor(tableId,
TTableType.SYSTEM_TABLE, FeCatalogUtils.getTColumnDescriptors(this),
getNumClusteringCols(), getName(), getDb().getName());
tableDescriptor.setSystemTable(getTSystemTable());
return tableDescriptor;
}
/**
* Returns a thrift structure for the system table.
*/
private TSystemTable getTSystemTable() {
return new TSystemTable(getSystemTableName());
}
}

View File

@@ -46,6 +46,7 @@ import org.apache.impala.catalog.SideloadTableStats;
import org.apache.impala.catalog.SqlConstraints;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.VirtualColumn;
import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
@@ -131,6 +132,8 @@ abstract class LocalTable implements FeTable {
t = LocalIcebergTable.loadIcebergTableViaMetaProvider(db, msTbl, ref);
} else if (DataSourceTable.isDataSourceTable(msTbl)) {
t = LocalDataSourceTable.load(db, msTbl, ref);
} else if (SystemTable.isSystemTable(msTbl)) {
t = LocalSystemTable.load(db, msTbl, ref);
} else if (HdfsFileFormat.isHdfsInputFormatClass(
msTbl.getSd().getInputFormat())) {
t = LocalFsTable.load(db, msTbl, ref);

View File

@@ -67,10 +67,10 @@ import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
import org.apache.impala.common.AnalysisException;
@@ -1902,7 +1902,7 @@ public class SingleNodePlanner {
return scanNode;
} else if (table instanceof IcebergMetadataTable) {
return createIcebergMetadataScanNode(tblRef, conjuncts, analyzer);
} else if (table instanceof SystemTable) {
} else if (table instanceof FeSystemTable) {
scanNode = new SystemTableScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
scanNode.addConjuncts(conjuncts);
scanNode.init(analyzer);

View File

@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.service.FeSupport;
@@ -43,10 +43,10 @@ import com.google.common.collect.Lists;
public class SystemTableScanNode extends ScanNode {
public SystemTableScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "SCAN SYSTEM_TABLE");
table_ = (SystemTable) desc_.getTable();
table_ = (FeSystemTable) desc_.getTable();
}
private final SystemTable table_;
private final FeSystemTable table_;
@Override
public void init(Analyzer analyzer) throws ImpalaException {

View File

@@ -120,6 +120,7 @@ import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.IcebergPositionDeleteTable;
@@ -129,7 +130,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -1665,8 +1665,8 @@ public class Frontend {
}
} else if (table instanceof MaterializedViewHdfsTable) {
return ((MaterializedViewHdfsTable) table).getTableStats();
} else if (table instanceof SystemTable) {
return ((SystemTable) table).getTableStats();
} else if (table instanceof FeSystemTable) {
return ((FeSystemTable) table).getTableStats();
} else {
throw new InternalException("Invalid table class: " + table.getClass());
}

View File

@@ -17,8 +17,6 @@
package org.apache.impala.catalog;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.common.FrontendTestBase;
import org.apache.impala.thrift.TSystemTableName;
import org.junit.Test;
@@ -32,7 +30,8 @@ public class SystemTableTest extends FrontendTestBase {
@Test
public void testSystemTableNames() {
Db sysDb = feFixture_.addTestDb(Db.SYS, "system db");
SystemTable queryLiveTable = SystemTable.CreateQueryLiveTable(sysDb, "impala");
assertEquals(TSystemTableName.QUERY_LIVE, queryLiveTable.getSystemTableName());
SystemTable queryLiveTable = new SystemTable(
null, sysDb, "impala_query_live", "impala");
assertEquals(TSystemTableName.IMPALA_QUERY_LIVE, queryLiveTable.getSystemTableName());
}
}

View File

@@ -1552,16 +1552,4 @@ public class PlannerTest extends PlannerTestBase {
Lists.newArrayList(2, 3)),
IcebergScanPlanner.getOrderedEqualityFieldIds(inp));
}
/**
* Test queries against sys.impala_query_live.
*/
@Test
public void testQueryLive() {
boolean savedEnableWorkloadMgmt = BackendConfig.INSTANCE.enableWorkloadMgmt();
BackendConfig.INSTANCE.setEnableWorkloadMgmt(true);
addTestDb(Db.SYS, "ensure system db");
runPlannerTestFile("impala-query-live");
BackendConfig.INSTANCE.setEnableWorkloadMgmt(savedEnableWorkloadMgmt);
}
}

View File

@@ -1,34 +0,0 @@
# Can query impala_query_live. Skips DISTRIBUTEDPLAN because that requires
# coordinators, which are not configured in frontend tests.
select count(*) from sys.impala_query_live
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|
00:SCAN SYSTEM_TABLE [sys.impala_query_live]
row-size=0B cardinality=1
====
# Error trying to create new sys.impala_query_live
create table sys.impala_query_live (i int)
---- PLAN
AnalysisException: Table already exists: sys.impala_query_live
====
drop table sys.impala_query_live
---- PLAN
AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY
====
insert into sys.impala_query_live values (1)
---- PLAN
AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY
====
update sys.impala_query_live set query_id = "nonsense"
---- PLAN
AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live
====
delete sys.impala_query_live where query_id = "nonsense"
---- PLAN
AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live
====

View File

@@ -110,6 +110,62 @@ class TestQueryLive(CustomClusterTestSuite):
assert result5.data[0] == \
"functional.alltypes,functional.alltypestiny,functional.alltypessmall"
# describe query
describe_result = self.execute_query('describe sys.impala_query_live')
assert len(describe_result.data) == 49
describe_ext_result = self.execute_query('describe extended sys.impala_query_live')
assert len(describe_ext_result.data) == 82
# show create table
show_create_tbl = self.execute_query('show create table sys.impala_query_live')
assert len(show_create_tbl.data) == 1
assert 'CREATE EXTERNAL TABLE sys.impala_query_live' in show_create_tbl.data[0]
assert "'__IMPALA_SYSTEM_TABLE'='true'" in show_create_tbl.data[0]
# cannot compute stats or perform write operations
compute_stats_result = self.execute_query_expect_failure(self.client,
'compute stats sys.impala_query_live')
assert 'AnalysisException: COMPUTE STATS not supported for system table: '\
'sys.impala_query_live' in str(compute_stats_result)
create_result = self.execute_query_expect_failure(self.client,
'create table sys.impala_query_live (i int)')
assert 'AnalysisException: Table already exists: sys.impala_query_live'\
in str(create_result)
insert_result = self.execute_query_expect_failure(self.client,
'insert into sys.impala_query_live select * from sys.impala_query_live limit 1')
assert 'UnsupportedOperationException: Cannot create data sink into table of type: '\
'org.apache.impala.catalog.SystemTable' in str(insert_result)
update_result = self.execute_query_expect_failure(self.client,
'update sys.impala_query_live set query_id = ""')
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
'but the following table is neither: sys.impala_query_live'\
in str(update_result)
delete_result = self.execute_query_expect_failure(self.client,
'delete from sys.impala_query_live')
assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\
'but the following table is neither: sys.impala_query_live'\
in str(delete_result)
# Drop table at the end, it's only recreated on impalad startup.
self.execute_query_expect_success(self.client, 'drop table sys.impala_query_live')
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live "
"--use_local_catalog=true",
catalogd_args="--enable_workload_mgmt "
"--catalog_topic_mode=minimal")
def test_local_catalog(self):
"""Asserts the query live table works with local catalog mode."""
result = self.client.execute("select * from functional.alltypes",
fetch_profile_after_close=True)
assert_query('sys.impala_query_live', self.client, 'test_query_live',
result.runtime_profile)
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
"--cluster_id=test_query_live",
catalogd_args="--enable_workload_mgmt",