Files
impala/tests/custom_cluster/test_observability.py
stiga-huang ff8bb33b91 IMPALA-12870: Tag query id for Java pool threads
Logs from Java threads running in ExecutorService are missing the query
id which is stored in the C++ thread-local ThreadDebugInfo variable.
This patch adds JNI calls for Java threads to manage the ThreadDebugInfo
variable. Currently two thread pools are changed:
 - MissingTable loading pool in StmtMetadataLoader.parallelTableLoad().
 - Table loading pool in TableLoadingMgr.

MissingTable loading pool only lives within the parallelTableLoad()
method. So we initialize ThreadDebugInfo with the queryId at the
beginning of the thread and delete it at the end of the thread. Note
that a thread might be reused to load different tables, but they all
belong to the same query.

Table loading pool is a long running pool in catalogd that never
shut down. Threads in it is used to load tables triggered by different
queries. We initialize ThreadDebugInfo as the above but update it when
the thread starts loading table for a different query id, and reset it
when the loading is done. The query id is passed down from the catalogd
RPC request headers.

Tests:
 - Added e2e test to verify the logs.
 - Ran existing CORE tests.

Change-Id: I83cca55edc72de35f5e8c5422efc104e6aa894c1
Reviewed-on: http://gerrit.cloudera.org:8080/23558
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-10-23 03:35:29 +00:00

83 lines
3.8 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
import re
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.util.filesystem_utils import get_fs_path
from tests.util.parse_util import parse_duration_string_ms
class TestObservability(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_host_profile_jvm_gc_metrics(self, unique_database):
self.execute_query_expect_success(self.client,
"create function {0}.gc(int) returns int location '{1}' \
symbol='org.apache.impala.JavaGcUdfTest'".format(
unique_database, get_fs_path('/test-warehouse/impala-hive-udfs.jar')))
profile = self.execute_query_expect_success(self.client,
"select {0}.gc(int_col) from functional.alltypes limit 1000".format(
unique_database)).runtime_profile
gc_count_regex = r"GcCount:.*\((.*)\)"
gc_count_match = re.search(gc_count_regex, profile)
assert gc_count_match, profile
assert int(gc_count_match.group(1)) > 0, profile
gc_time_millis_regex = "GcTimeMillis: (.*)"
gc_time_millis_match = re.search(gc_time_millis_regex, profile)
assert gc_time_millis_match, profile
assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--catalog_topic_mode=minimal",
impalad_args="--use_local_catalog=true",
disable_log_buffering=True)
def test_query_id_in_logs(self, unique_database):
res = self.execute_query("create table %s.tbl (i int)" % unique_database)
self.assert_catalogd_log_contains(
"INFO", "{}] execDdl request: CREATE_TABLE {}.tbl issued by"
.format(res.query_id, unique_database))
res = self.execute_query("explain select * from %s.tbl" % unique_database)
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: {}.tbl \(needed by coordinator\)"
.format(res.query_id, unique_database))
res = self.execute_query(
"create table %s.tbl2 as select * from functional.alltypes" % unique_database)
self.assert_catalogd_log_contains(
"INFO", "%s] Loading metadata for table: functional.alltypes" % res.query_id)
self.assert_catalogd_log_contains(
"INFO", "%s] Remaining items in queue: 0. Loads in progress: 1" % res.query_id,
expected_count=-1)
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: functional.alltypes \(needed by coordinator\)"
.format(res.query_id))
self.assert_catalogd_log_contains(
"INFO", "{}] execDdl request: CREATE_TABLE_AS_SELECT {}.tbl2 issued by"
.format(res.query_id, unique_database))
self.assert_catalogd_log_contains(
"INFO", "{}] updateCatalog request: Update catalog for {}.tbl2"
.format(res.query_id, unique_database))
self.assert_catalogd_log_contains(
"INFO", r"{}] Loading metadata for: {}.tbl2 \(Load for INSERT\)"
.format(res.query_id, unique_database))