diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 560716cb8..3398c7bf1 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -591,8 +591,7 @@ Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject( // Populate the request header. if (!request.__isset.header) { ThreadDebugInfo* tdi = GetThreadDebugInfo(); - // TODO: After IMPALA-14447, query ids might be missing in some threads. This will - // be addressed in IMPALA-12870. + // Requests from WebUI don't have ThreadDebugInfo and query ids. if (tdi != nullptr) { request.__set_header(TCatalogServiceRequestHeader()); request.header.__set_query_id(tdi->GetQueryId()); @@ -807,6 +806,50 @@ Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents(JNIEnv* env, return result_bytes; } +extern "C" JNIEXPORT jlong JNICALL +Java_org_apache_impala_service_FeSupport_NativeInitThreadDebugInfo(JNIEnv* env, + jclass fe_support_class, jbyteArray thrift_query_id) { + TUniqueId query_id; + THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_id, &query_id), env, + JniUtil::internal_exc_class(), 0); + ThreadDebugInfo* tdi = new ThreadDebugInfo(); + tdi->SetQueryId(query_id); + return reinterpret_cast(tdi); +} + +extern "C" JNIEXPORT void JNICALL +Java_org_apache_impala_service_FeSupport_NativeUpdateThreadDebugInfo(JNIEnv* env, + jclass fe_support_class, jbyteArray thrift_query_id) { + TUniqueId query_id; + THROW_IF_ERROR(DeserializeThriftMsg(env, thrift_query_id, &query_id), env, + JniUtil::internal_exc_class()); + ThreadDebugInfo* tdi = GetThreadDebugInfo(); + DCHECK(tdi != nullptr); + if (tdi == nullptr) { + LOG(WARNING) << "ThreadDebugInfo is null. Not tagging query id " << PrintId(query_id); + return; + } + tdi->SetQueryId(query_id); +} + +extern "C" JNIEXPORT void JNICALL +Java_org_apache_impala_service_FeSupport_NativeResetThreadDebugInfo(JNIEnv* env, + jclass fe_support_class) { + ThreadDebugInfo* tdi = GetThreadDebugInfo(); + // If ThreadDebugInfo is null, no need to reset anything. + if (tdi == nullptr) return; + // A 0 unique id, which indicates that one has not been set. + static const TUniqueId ZERO_UNIQUE_ID; + tdi->SetQueryId(ZERO_UNIQUE_ID); +} + +extern "C" JNIEXPORT void JNICALL +Java_org_apache_impala_service_FeSupport_NativeDeleteThreadDebugInfo(JNIEnv* env, + jclass fe_support_class, jlong tdi) { + if (tdi == 0) return; + delete (ThreadDebugInfo*)tdi; +} + namespace impala { static JNINativeMethod native_methods[] = { @@ -902,6 +945,22 @@ static JNINativeMethod native_methods[] = { const_cast("NativeWaitForHmsEvents"), const_cast("([B[B)[B"), (void*)::Java_org_apache_impala_service_FeSupport_NativeWaitForHmsEvents }, + { + const_cast("NativeInitThreadDebugInfo"), const_cast("([B)J"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeInitThreadDebugInfo + }, + { + const_cast("NativeUpdateThreadDebugInfo"), const_cast("([B)V"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeUpdateThreadDebugInfo + }, + { + const_cast("NativeResetThreadDebugInfo"), const_cast("()V"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeResetThreadDebugInfo + }, + { + const_cast("NativeDeleteThreadDebugInfo"), const_cast("(J)V"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeDeleteThreadDebugInfo + }, }; void InitFeSupport(bool disable_codegen) { diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java index 7ef9290c4..e13695367 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java @@ -52,6 +52,7 @@ import org.apache.impala.catalog.local.LocalCatalogException; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; +import org.apache.impala.common.TaggedThreadFactory; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.Frontend; @@ -66,7 +67,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Loads all table and view metadata relevant for a single SQL statement and returns the @@ -370,10 +370,8 @@ public class StmtMetadataLoader { List> tables = new ArrayList<>(); ExecutorService executorService = null; try { - executorService = Executors.newFixedThreadPool(maxThreads, - new ThreadFactoryBuilder() - .setNameFormat("MissingTableLoaderThread-" + queryIdStr + "-%d") - .build()); + executorService = Executors.newFixedThreadPool(maxThreads, new TaggedThreadFactory( + queryId_, "MissingTableLoaderThread-" + queryIdStr + "-%d")); // Transform tbls to a list of tasks. List>> tasks = tbls.stream() diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 02e8d3eb7..b797c4ede 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -920,8 +920,9 @@ public class CatalogServiceCatalog extends Catalog { .toString(), request.valid_write_ids); } long tableId = request.getTable_id(); + // This is only used in the legacy catalog mode. No query ids are sent here. Table table = getOrLoadTable(tableName.db_name, tableName.table_name, - "needed to fetch partition stats", writeIdList, tableId, + "needed to fetch partition stats", /*queryId*/null, writeIdList, tableId, NoOpEventSequence.INSTANCE); // Table could be null if it does not exist anymore. @@ -2801,7 +2802,7 @@ public class CatalogServiceCatalog extends Catalog { public @Nullable Table getOrLoadTable(String dbName, String tblName, String reason, ValidWriteIdList validWriteIdList) throws CatalogException { - return getOrLoadTable(dbName, tblName, reason, validWriteIdList, + return getOrLoadTable(dbName, tblName, reason, null, validWriteIdList, TABLE_ID_UNAVAILABLE, NoOpEventSequence.INSTANCE); } @@ -2815,8 +2816,8 @@ public class CatalogServiceCatalog extends Catalog { * (not yet loaded table) will be returned. */ public @Nullable Table getOrLoadTable(String dbName, String tblName, String reason, - ValidWriteIdList validWriteIdList, long tableId, EventSequence catalogTimeline) - throws CatalogException { + @Nullable TUniqueId queryId, ValidWriteIdList validWriteIdList, long tableId, + EventSequence catalogTimeline) throws CatalogException { TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase()); Table tbl; TableLoadingMgr.LoadRequest loadReq = null; @@ -2889,7 +2890,7 @@ public class CatalogServiceCatalog extends Catalog { previousCatalogVersion = tbl.getCatalogVersion(); LOG.trace("Loading full table {}", tbl.getFullName()); loadReq = tableLoadingMgr_.loadAsync(tableName, tbl.getCreateEventId(), reason, - catalogTimeline); + queryId, catalogTimeline); } } finally { versionLock_.readLock().unlock(); @@ -4371,9 +4372,11 @@ public class CatalogServiceCatalog extends Catalog { dbName + "." + tblName, req.table_info_selector.valid_write_ids); tableId = req.table_info_selector.getTable_id(); } + TUniqueId queryId = req.isSetHeader() && req.header.isSetQuery_id() ? + req.header.query_id : null; table = getOrLoadTable( objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(), - tableLoadReason, writeIdList, tableId, NoOpEventSequence.INSTANCE); + tableLoadReason, queryId, writeIdList, tableId, NoOpEventSequence.INSTANCE); } catch (DatabaseNotFoundException e) { return createGetPartialCatalogObjectError(req, CatalogLookupStatus.DB_NOT_FOUND); } diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java index 08de02566..5999b2349 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java @@ -31,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.impala.common.Pair; +import org.apache.impala.common.TaggedThreadFactory; import org.apache.impala.thrift.TTableName; +import org.apache.impala.thrift.TUniqueId; import org.apache.impala.util.EventSequence; import org.apache.impala.util.HdfsCachingUtil; import org.apache.impala.util.NoOpEventSequence; @@ -167,7 +169,7 @@ public class TableLoadingMgr { tblLoader_ = new TableLoader(catalog_); numLoadingThreads_ = numLoadingThreads; tblLoadingPool_ = Executors.newFixedThreadPool(numLoadingThreads_, - new ThreadFactoryBuilder().setNameFormat("TableLoadingThread-%d").build()); + new TaggedThreadFactory("TableLoadingThread-%d")); // Start the background table loading submitter threads. startTableLoadingSubmitterThreads(); @@ -236,7 +238,7 @@ public class TableLoadingMgr { * loads of the same table. */ public LoadRequest loadAsync(final TTableName tblName, final long createdEventId, - final String reason, final EventSequence catalogTimeline) + final String reason, final TUniqueId queryId, final EventSequence catalogTimeline) throws DatabaseNotFoundException { final Db parentDb = catalog_.getDb(tblName.getDb_name()); if (parentDb == null) { @@ -244,13 +246,16 @@ public class TableLoadingMgr { "Database '" + tblName.getDb_name() + "' was not found."); } - FutureTask tableLoadTask = new FutureTask
(new Callable
() { - @Override - public Table call() throws Exception { - catalogTimeline.markEvent("Start loading table"); - return tblLoader_.load(parentDb, tblName.table_name, createdEventId, reason, - catalogTimeline); - }}); + FutureTask
tableLoadTask = new FutureTask<>(() -> { + try { + TaggedThreadFactory.updateQueryId(queryId); + catalogTimeline.markEvent("Start loading table"); + return tblLoader_.load(parentDb, tblName.table_name, createdEventId, reason, + catalogTimeline); + } finally { + TaggedThreadFactory.resetQueryId(); + } + }); FutureTask
existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask); if (existingValue == null) { diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java index cc2b141f5..dcdd0f551 100644 --- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java +++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java @@ -873,9 +873,9 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { String destDb = MetaStoreUtils.parseDbName(destDbWithCatalog, serverConf_)[1]; EventSequence catalogTimeline = NoOpEventSequence.INSTANCE; srcTbl = catalogOpExecutor_.getExistingTable(sourceDb, sourceTbl, apiName, - catalogTimeline); + /*queryId*/null, catalogTimeline); destinationTbl = catalogOpExecutor_.getExistingTable(destDb, destTbl, apiName, - catalogTimeline); + /*queryId*/null, catalogTimeline); if (!catalog_.tryWriteLock( new org.apache.impala.catalog.Table[] {srcTbl, destinationTbl})) { @@ -920,9 +920,9 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { String destDb = MetaStoreUtils.parseDbName(destDbWithCatalog, serverConf_)[1]; EventSequence catalogTimeline = NoOpEventSequence.INSTANCE; srcTbl = catalogOpExecutor_.getExistingTable(sourceDb, sourceTbl, apiName, - catalogTimeline); + /*queryId*/null, catalogTimeline); destinationTbl = catalogOpExecutor_.getExistingTable(destDb, destTbl, apiName, - catalogTimeline); + /*queryId*/null, catalogTimeline); if (!catalog_.tryWriteLock( new org.apache.impala.catalog.Table[] {srcTbl, destinationTbl})) { @@ -1216,7 +1216,7 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { org.apache.impala.catalog.Table tbl = null; try { String dbName = MetaStoreUtils.parseDbName(dbNameWithCatalog, serverConf_)[1]; - tbl = catalogOpExecutor_.getExistingTable(dbName, tblName, apiName, + tbl = catalogOpExecutor_.getExistingTable(dbName, tblName, apiName, /*queryId*/null, NoOpEventSequence.INSTANCE); } catch (Exception e) { rethrowException(e, apiName); diff --git a/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java b/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java new file mode 100644 index 000000000..544310719 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/common/TaggedThreadFactory.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.common; + +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TUniqueId; +import org.apache.impala.util.TUniqueIdUtil; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * ThreadFactory to create threads that can be tagged with query ids which will appear in + * their logs. The query id is stored in a thread-local ThreadDebugInfo variable in the + * Backend. GLog retrieves the query id from it and prepend it to the logs. See more + * details in common/thread-debug-info.h and MessageListener() in common/logging.cc. + */ +public class TaggedThreadFactory implements ThreadFactory { + private final static Logger LOG = LoggerFactory.getLogger(TaggedThreadFactory.class); + private final static TUniqueId ZERO_QUERY_ID = new TUniqueId(); + private final static AtomicInteger poolNumber = new AtomicInteger(0); + private final String nameFormat_; + private byte[] thriftQueryId_; + + /** + * Initialize threads without query ids. They can be updated later. + */ + public TaggedThreadFactory(String nameFormat) { + this(ZERO_QUERY_ID, nameFormat); + } + + /** + * Initialize threads with a given query id. + */ + public TaggedThreadFactory(TUniqueId queryId, String nameFormat) { + nameFormat_ = nameFormat; + if (queryId == null) queryId = ZERO_QUERY_ID; + try { + thriftQueryId_ = new TSerializer().serialize(queryId); + } catch (TException e) { + LOG.error("Failed to serialize query id {}", TUniqueIdUtil.PrintId(queryId)); + } + } + + @Override + public Thread newThread(@NotNull Runnable r) { + Runnable initializerRunnable = () -> { + long ptr = 0; + try { + ptr = FeSupport.NativeInitThreadDebugInfo(thriftQueryId_); + r.run(); + } catch (Throwable e) { + LOG.error("Pool thread exception", e); + } finally { + // The thread-local ThreadDebugInfo variable is owned by the Java thread so we + // should delete it at the end. + FeSupport.NativeDeleteThreadDebugInfo(ptr); + } + }; + return new Thread(initializerRunnable, String.format( + nameFormat_, poolNumber.getAndIncrement())); + } + + public static void updateQueryId(TUniqueId queryId) { + if (queryId == null) return; + try { + FeSupport.NativeUpdateThreadDebugInfo(new TSerializer().serialize(queryId)); + } catch (TException e) { + LOG.error("Failed to update query id {}", TUniqueIdUtil.PrintId(queryId)); + } + } + + public static void resetQueryId() { + FeSupport.NativeResetThreadDebugInfo(); + } +} diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index fb5640359..163cca702 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -479,13 +479,14 @@ public class CatalogOpExecutor { tTableName = Optional.of(alter_table_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); alterTable(alter_table_params, debugAction, wantMinimalResult, response, - catalogTimeline); + catalogTimeline, queryId); break; case ALTER_VIEW: TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params(); tTableName = Optional.of(alter_view_params.getView_name()); catalogOpTracker_.increment(ddlRequest, tTableName); - alterView(alter_view_params, wantMinimalResult, response, catalogTimeline); + alterView(alter_view_params, wantMinimalResult, response, catalogTimeline, + queryId); break; case CREATE_DATABASE: TCreateDbParams create_db_params = ddlRequest.getCreate_db_params(); @@ -515,7 +516,7 @@ public class CatalogOpExecutor { tTableName = Optional.of(create_table_like_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); createTableLike(create_table_like_params, response, catalogTimeline, syncDdl, - wantMinimalResult, debugAction); + wantMinimalResult, debugAction, queryId); break; case CREATE_VIEW: TCreateOrAlterViewParams create_view_params = @@ -547,7 +548,7 @@ public class CatalogOpExecutor { tTableName = Optional.of(drop_stats_params.getTable_name()); catalogOpTracker_.increment(ddlRequest, tTableName); dropStats(drop_stats_params, wantMinimalResult, response, catalogTimeline, - ddlRequest.getQuery_options().getDebug_action()); + ddlRequest.getQuery_options().getDebug_action(), queryId); break; case DROP_DATABASE: TDropDbParams drop_db_params = ddlRequest.getDrop_db_params(); @@ -568,7 +569,7 @@ public class CatalogOpExecutor { dropTableOrView(drop_table_or_view_params, response, ddlRequest.getQuery_options().getLock_max_wait_time_s(), ddlRequest.getQuery_options().getKudu_table_reserve_seconds(), - catalogTimeline); + catalogTimeline, queryId); break; case TRUNCATE_TABLE: TTruncateParams truncate_params = ddlRequest.getTruncate_params(); @@ -576,7 +577,7 @@ public class CatalogOpExecutor { catalogOpTracker_.increment(ddlRequest, tTableName); truncateTable(truncate_params, wantMinimalResult, response, ddlRequest.getQuery_options().getLock_max_wait_time_s(), catalogTimeline, - ddlRequest.getQuery_options().getDebug_action()); + ddlRequest.getQuery_options().getDebug_action(), queryId); break; case DROP_FUNCTION: TDropFunctionParams drop_func_params = ddlRequest.getDrop_fn_params(); @@ -1209,8 +1210,8 @@ public class CatalogOpExecutor { * serialized. */ private void alterTable(TAlterTableParams params, @Nullable String debugAction, - boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline) - throws ImpalaException { + boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline, + TUniqueId queryId) throws ImpalaException { // When true, loads the file/block metadata. boolean reloadFileMetadata = false; // When true, loads the table schema and the column stats from the Hive Metastore. @@ -1220,7 +1221,7 @@ public class CatalogOpExecutor { TableName tableName = TableName.fromThrift(params.getTable_name()); Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), - "Load for ALTER TABLE", catalogTimeline); + "Load for ALTER TABLE", queryId, catalogTimeline); if (params.getAlter_type() == TAlterTableType.RENAME_VIEW || params.getAlter_type() == TAlterTableType.RENAME_TABLE) { TableName newTableName = TableName.fromThrift( @@ -1888,14 +1889,15 @@ public class CatalogOpExecutor { * a table instead of a a view. */ private void alterView(TCreateOrAlterViewParams params, boolean wantMinimalResult, - TDdlExecResponse resp, EventSequence catalogTimeline) throws ImpalaException { + TDdlExecResponse resp, EventSequence catalogTimeline, TUniqueId queryId) + throws ImpalaException { TableName tableName = TableName.fromThrift(params.getView_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); Preconditions.checkState(params.getColumns() != null && params.getColumns().size() > 0, "Null or empty column list given as argument to DdlExecutor.alterView"); Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), - "Load for ALTER VIEW", catalogTimeline); + "Load for ALTER VIEW", queryId, catalogTimeline); Preconditions.checkState(tbl instanceof View, "Expected view: %s", tableName); tryWriteLock(tbl, catalogTimeline); @@ -2788,10 +2790,11 @@ public class CatalogOpExecutor { * to protect against concurrent modifications. */ private void dropStats(TDropStatsParams params, boolean wantMinimalResult, - TDdlExecResponse resp, EventSequence catalogTimeline, @Nullable String debugAction) - throws ImpalaException { + TDdlExecResponse resp, EventSequence catalogTimeline, @Nullable String debugAction, + TUniqueId queryId) throws ImpalaException { Table table = getExistingTable(params.getTable_name().getDb_name(), - params.getTable_name().getTable_name(), "Load for DROP STATS", catalogTimeline); + params.getTable_name().getTable_name(), "Load for DROP STATS", queryId, + catalogTimeline); Preconditions.checkNotNull(table); // There is no transactional HMS API to drop stats at the moment (HIVE-22104). Preconditions.checkState(!AcidUtils.isTransactionalTable(table)); @@ -3170,8 +3173,8 @@ public class CatalogOpExecutor { * executing the drop operation. */ private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp, - int lockMaxWaitTime, int kudu_table_reserve_seconds, EventSequence catalogTimeline) - throws ImpalaException { + int lockMaxWaitTime, int kudu_table_reserve_seconds, EventSequence catalogTimeline, + TUniqueId queryId) throws ImpalaException { TableName tableName = TableName.fromThrift(params.getTable_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists, @@ -3197,7 +3200,7 @@ public class CatalogOpExecutor { // we pass null validWriteIdList here since we don't really care what version of // table is loaded, eventually its going to be dropped below. catalog_.getOrLoadTable(params.getTable_name().db_name, - params.getTable_name().table_name, "Load for DROP TABLE/VIEW", null, + params.getTable_name().table_name, "Load for DROP TABLE/VIEW", queryId, null, TABLE_ID_UNAVAILABLE, catalogTimeline); catalogTimeline.markEvent("Loaded catalog table"); } catch (CatalogException e) { @@ -3435,12 +3438,12 @@ public class CatalogOpExecutor { */ private void truncateTable(TTruncateParams params, boolean wantMinimalResult, TDdlExecResponse resp, int lockMaxWaitTime, EventSequence catalogTimeline, - @Nullable String debugAction) throws ImpalaException { + @Nullable String debugAction, TUniqueId queryId) throws ImpalaException { TTableName tblName = params.getTable_name(); Table table = null; try { table = getExistingTable(tblName.getDb_name(), tblName.getTable_name(), - "Load for TRUNCATE TABLE", catalogTimeline); + "Load for TRUNCATE TABLE", queryId, catalogTimeline); } catch (TableNotFoundException e) { if (params.if_exists) { addSummary(resp, "Table does not exist."); @@ -4503,7 +4506,7 @@ public class CatalogOpExecutor { */ private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response, EventSequence catalogTimeline, boolean syncDdl, boolean wantMinimalResult, - @Nullable String debugAction) throws ImpalaException { + @Nullable String debugAction, TUniqueId queryId) throws ImpalaException { Preconditions.checkNotNull(params); THdfsFileFormat fileFormat = params.isSetFile_format() ? params.getFile_format() : null; @@ -4548,7 +4551,7 @@ public class CatalogOpExecutor { return; } Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl(), - "Load source for CREATE TABLE LIKE", catalogTimeline); + "Load source for CREATE TABLE LIKE", queryId, catalogTimeline); org.apache.hadoop.hive.metastore.api.Table tbl = srcTable.getMetaStoreTable().deepCopy(); tbl.setDbName(tblName.getDb()); @@ -7367,8 +7370,10 @@ public class CatalogOpExecutor { // If the table is not loaded, no need to perform refresh after the initial // metadata load. boolean isTableLoadedInCatalog = tbl.isLoaded(); + TUniqueId queryId = req.isSetHeader() && req.getHeader().isSetQuery_id() ? + req.header.query_id : null; tbl = getExistingTable(tblName.getDb(), tblName.getTbl(), - "Load triggered by " + cmdString, catalogTimeline); + "Load triggered by " + cmdString, queryId, catalogTimeline); CatalogObject.ThriftObjectType resultType = req.header.want_minimal_response ? CatalogObject.ThriftObjectType.INVALIDATION : @@ -7587,9 +7592,11 @@ public class CatalogOpExecutor { throws ImpalaException { EventSequence catalogTimeline = new EventSequence(CATALOG_TIMELINE_NAME); TUpdateCatalogResponse response = new TUpdateCatalogResponse(); + TUniqueId queryId = update.isSetHeader() && update.header.isSetQuery_id() ? + update.header.query_id : null; // Only update metastore for Hdfs tables. Table table = getExistingTable(update.getDb_name(), update.getTarget_table(), - "Load for INSERT", catalogTimeline); + "Load for INSERT", queryId, catalogTimeline); if (!(table instanceof FeFsTable)) { throw new InternalException("Unexpected table type: " + update.getTarget_table()); @@ -8223,10 +8230,10 @@ public class CatalogOpExecutor { * know when a table has been dropped and re-created with the same name. */ public Table getExistingTable(String dbName, String tblName, String reason, - EventSequence catalogTimeline) throws CatalogException { + TUniqueId queryId, EventSequence catalogTimeline) throws CatalogException { // passing null validWriteIdList makes sure that we return the table if it is // already loaded. - Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason, null, + Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason, queryId, null, TABLE_ID_UNAVAILABLE, catalogTimeline); if (tbl == null) { throw new TableNotFoundException("Table not found: " + dbName + "." + tblName); @@ -8257,6 +8264,8 @@ public class CatalogOpExecutor { throws ImpalaRuntimeException, CatalogException, InternalException { Preconditions.checkState(tTableName.isPresent()); TCommentOnParams params = ddlRequest.getComment_on_params(); + TUniqueId queryId = ddlRequest.isSetHeader() && ddlRequest.header.isSetQuery_id() ? + ddlRequest.header.query_id : null; if (params.getDb() != null) { Preconditions.checkArgument(!params.isSetTable_name() && !params.isSetColumn_name()); @@ -8271,7 +8280,7 @@ public class CatalogOpExecutor { catalogOpTracker_.increment(ddlRequest, tTableName); alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()), params.getComment(), wantMinimalResult, response, catalogTimeline, - ddlRequest.getQuery_options().getDebug_action()); + ddlRequest.getQuery_options().getDebug_action(), queryId); } else if (params.getColumn_name() != null) { Preconditions.checkArgument(!params.isSetDb() && !params.isSetTable_name()); TColumnName columnName = params.getColumn_name(); @@ -8280,7 +8289,7 @@ public class CatalogOpExecutor { catalogOpTracker_.increment(ddlRequest, tTableName); alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()), columnName.getColumn_name(), params.getComment(), wantMinimalResult, response, - catalogTimeline, ddlRequest.getQuery_options().getDebug_action()); + catalogTimeline, ddlRequest.getQuery_options().getDebug_action(), queryId); } else { throw new UnsupportedOperationException("Unsupported COMMENT ON operation"); } @@ -8402,10 +8411,10 @@ public class CatalogOpExecutor { private void alterCommentOnTableOrView(TableName tableName, String comment, boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline, - @Nullable String debugAction) + @Nullable String debugAction, TUniqueId queryId) throws CatalogException, InternalException, ImpalaRuntimeException { Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), - "Load for ALTER COMMENT", catalogTimeline); + "Load for ALTER COMMENT", queryId, catalogTimeline); tryWriteLock(tbl, catalogTimeline); try { InProgressTableModification modification = @@ -8434,10 +8443,10 @@ public class CatalogOpExecutor { private void alterCommentOnColumn(TableName tableName, String columnName, String comment, boolean wantMinimalResult, TDdlExecResponse response, - EventSequence catalogTimeline, @Nullable String debugAction) + EventSequence catalogTimeline, @Nullable String debugAction, TUniqueId queryId) throws CatalogException, InternalException, ImpalaRuntimeException { Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(), - "Load for ALTER COLUMN COMMENT", catalogTimeline); + "Load for ALTER COLUMN COMMENT", queryId, catalogTimeline); tryWriteLock(tbl, catalogTimeline); try { InProgressTableModification modification = diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index 59bba2542..ee8314e63 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -160,6 +160,21 @@ public class FeSupport { public native static byte[] NativeWaitForHmsEvents(byte[] thriftReq, byte[] thriftQueryOptions); + // Initialize ThreadDebugInfo for the current thread. Should only be used once for a + // thread. + public native static long NativeInitThreadDebugInfo(byte[] thriftQueryId); + + // Update the ThreadDebugInfo to track a new query id. So logs of the current thread + // can be tagged with that query id. + public native static void NativeUpdateThreadDebugInfo(byte[] thriftQueryId); + + // Reset the ThreadDebugInfo to not tag on any query ids. + public native static void NativeResetThreadDebugInfo(); + + // Delete the ThreadDebugInfo created in Backend. Should only be used once for a thread + // that has initialized ThreadDebugInfo. + public native static void NativeDeleteThreadDebugInfo(long ptr); + /** * Locally caches the jar at the specified HDFS location. * diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java index 1991e7133..766a4a43e 100644 --- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java +++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java @@ -29,6 +29,7 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.BackendConfig; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TDescribeOutputStyle; import org.apache.impala.thrift.TPrivilegeLevel; import org.apache.impala.thrift.TQueryOptions; @@ -66,6 +67,10 @@ public class AuthorizationStmtTest extends AuthorizationTestBase { super(authzProvider); } + static { + FeSupport.loadLibrary(); + } + @BeforeClass public static void setUp() { RuntimeEnv.INSTANCE.setTestEnv(true); diff --git a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java index 8c7e60cc5..587f37fe0 100644 --- a/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java +++ b/fe/src/test/java/org/apache/impala/authorization/ranger/RangerAuditLogTest.java @@ -26,6 +26,7 @@ import org.apache.impala.authorization.AuthorizationPolicy; import org.apache.impala.authorization.AuthorizationProvider; import org.apache.impala.authorization.AuthorizationTestBase; import org.apache.impala.common.ImpalaException; +import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TPrivilegeLevel; import org.apache.ranger.audit.model.AuthzAuditEvent; @@ -44,6 +45,9 @@ import static org.junit.Assert.assertTrue; public class RangerAuditLogTest extends AuthorizationTestBase { private static RangerAuthorizationCheckerSpy authzChecker_ = null; + static { + FeSupport.loadLibrary(); + } private static class RangerAuthorizationCheckerSpy extends RangerAuthorizationChecker { private AuthorizationContext authzCtx_; diff --git a/tests/custom_cluster/test_observability.py b/tests/custom_cluster/test_observability.py index 0a620f0ff..14554c056 100644 --- a/tests/custom_cluster/test_observability.py +++ b/tests/custom_cluster/test_observability.py @@ -35,7 +35,7 @@ class TestObservability(CustomClusterTestSuite): "select {0}.gc(int_col) from functional.alltypes limit 1000".format( unique_database)).runtime_profile - gc_count_regex = "GcCount:.*\((.*)\)" + gc_count_regex = r"GcCount:.*\((.*)\)" gc_count_match = re.search(gc_count_regex, profile) assert gc_count_match, profile assert int(gc_count_match.group(1)) > 0, profile @@ -44,3 +44,39 @@ class TestObservability(CustomClusterTestSuite): gc_time_millis_match = re.search(gc_time_millis_regex, profile) assert gc_time_millis_match, profile assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--catalog_topic_mode=minimal", + impalad_args="--use_local_catalog=true", + disable_log_buffering=True) + def test_query_id_in_logs(self, unique_database): + res = self.execute_query("create table %s.tbl (i int)" % unique_database) + self.assert_catalogd_log_contains( + "INFO", "{}] execDdl request: CREATE_TABLE {}.tbl issued by" + .format(res.query_id, unique_database)) + + res = self.execute_query("explain select * from %s.tbl" % unique_database) + self.assert_catalogd_log_contains( + "INFO", r"{}] Loading metadata for: {}.tbl \(needed by coordinator\)" + .format(res.query_id, unique_database)) + + res = self.execute_query( + "create table %s.tbl2 as select * from functional.alltypes" % unique_database) + self.assert_catalogd_log_contains( + "INFO", "%s] Loading metadata for table: functional.alltypes" % res.query_id) + self.assert_catalogd_log_contains( + "INFO", "%s] Remaining items in queue: 0. Loads in progress: 1" % res.query_id, + expected_count=-1) + self.assert_catalogd_log_contains( + "INFO", r"{}] Loading metadata for: functional.alltypes \(needed by coordinator\)" + .format(res.query_id)) + self.assert_catalogd_log_contains( + "INFO", "{}] execDdl request: CREATE_TABLE_AS_SELECT {}.tbl2 issued by" + .format(res.query_id, unique_database)) + self.assert_catalogd_log_contains( + "INFO", "{}] updateCatalog request: Update catalog for {}.tbl2" + .format(res.query_id, unique_database)) + self.assert_catalogd_log_contains( + "INFO", r"{}] Loading metadata for: {}.tbl2 \(Load for INSERT\)" + .format(res.query_id, unique_database))