mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Logs from Java threads running in ExecutorService are missing the query id which is stored in the C++ thread-local ThreadDebugInfo variable. This patch adds JNI calls for Java threads to manage the ThreadDebugInfo variable. Currently two thread pools are changed: - MissingTable loading pool in StmtMetadataLoader.parallelTableLoad(). - Table loading pool in TableLoadingMgr. MissingTable loading pool only lives within the parallelTableLoad() method. So we initialize ThreadDebugInfo with the queryId at the beginning of the thread and delete it at the end of the thread. Note that a thread might be reused to load different tables, but they all belong to the same query. Table loading pool is a long running pool in catalogd that never shut down. Threads in it is used to load tables triggered by different queries. We initialize ThreadDebugInfo as the above but update it when the thread starts loading table for a different query id, and reset it when the loading is done. The query id is passed down from the catalogd RPC request headers. Tests: - Added e2e test to verify the logs. - Ran existing CORE tests. Change-Id: I83cca55edc72de35f5e8c5422efc104e6aa894c1 Reviewed-on: http://gerrit.cloudera.org:8080/23558 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
83 lines
3.8 KiB
Python
83 lines
3.8 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import pytest
|
|
import re
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.util.filesystem_utils import get_fs_path
|
|
from tests.util.parse_util import parse_duration_string_ms
|
|
|
|
|
|
class TestObservability(CustomClusterTestSuite):
|
|
@pytest.mark.execute_serially
|
|
def test_host_profile_jvm_gc_metrics(self, unique_database):
|
|
self.execute_query_expect_success(self.client,
|
|
"create function {0}.gc(int) returns int location '{1}' \
|
|
symbol='org.apache.impala.JavaGcUdfTest'".format(
|
|
unique_database, get_fs_path('/test-warehouse/impala-hive-udfs.jar')))
|
|
profile = self.execute_query_expect_success(self.client,
|
|
"select {0}.gc(int_col) from functional.alltypes limit 1000".format(
|
|
unique_database)).runtime_profile
|
|
|
|
gc_count_regex = r"GcCount:.*\((.*)\)"
|
|
gc_count_match = re.search(gc_count_regex, profile)
|
|
assert gc_count_match, profile
|
|
assert int(gc_count_match.group(1)) > 0, profile
|
|
|
|
gc_time_millis_regex = "GcTimeMillis: (.*)"
|
|
gc_time_millis_match = re.search(gc_time_millis_regex, profile)
|
|
assert gc_time_millis_match, profile
|
|
assert parse_duration_string_ms(gc_time_millis_match.group(1)) > 0
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
catalogd_args="--catalog_topic_mode=minimal",
|
|
impalad_args="--use_local_catalog=true",
|
|
disable_log_buffering=True)
|
|
def test_query_id_in_logs(self, unique_database):
|
|
res = self.execute_query("create table %s.tbl (i int)" % unique_database)
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", "{}] execDdl request: CREATE_TABLE {}.tbl issued by"
|
|
.format(res.query_id, unique_database))
|
|
|
|
res = self.execute_query("explain select * from %s.tbl" % unique_database)
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", r"{}] Loading metadata for: {}.tbl \(needed by coordinator\)"
|
|
.format(res.query_id, unique_database))
|
|
|
|
res = self.execute_query(
|
|
"create table %s.tbl2 as select * from functional.alltypes" % unique_database)
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", "%s] Loading metadata for table: functional.alltypes" % res.query_id)
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", "%s] Remaining items in queue: 0. Loads in progress: 1" % res.query_id,
|
|
expected_count=-1)
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", r"{}] Loading metadata for: functional.alltypes \(needed by coordinator\)"
|
|
.format(res.query_id))
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", "{}] execDdl request: CREATE_TABLE_AS_SELECT {}.tbl2 issued by"
|
|
.format(res.query_id, unique_database))
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", "{}] updateCatalog request: Update catalog for {}.tbl2"
|
|
.format(res.query_id, unique_database))
|
|
self.assert_catalogd_log_contains(
|
|
"INFO", r"{}] Loading metadata for: {}.tbl2 \(Load for INSERT\)"
|
|
.format(res.query_id, unique_database))
|