Files
impala/tests/custom_cluster/test_krpc_metrics.py
Mihaly Szjatinya e0cb533c25 IMPALA-13912: Use SHARED_CLUSTER_ARGS in more custom cluster tests
In addition to IMPALA-13503 which allowed having the single cluster
running for the entire test class, this attempts to minimize restarting
between the existing tests without modifying any of their code.

This changeset saves the command line with which
'start-impala-cluster.py' has been run and skips the restarting if the
command line is the same for the next test.

Some tests however do require restart due to the specific metrics being
tested. Such tests are defined with the 'force_restart' flag within the
'with_args' decorator. NOTE: there might be more tests like that
revealed after running the tests in different order resulting in test
failures.

Experimentally, this results in ~150 fewer restarts, mostly coming from
restarts between tests. As for restarts between different variants of
the same test, most of the cluster tests are restricted to single
variant, although multi-variant tests occur occasionally.

Change-Id: I7c9115d4d47b9fe0bfd9dbda218aac2fb02dbd09
Reviewed-on: http://gerrit.cloudera.org:8080/22901
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-06-19 17:48:25 +00:00

82 lines
3.4 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.common.skip import SkipIf, SkipIfBuildType
from tests.verifiers.mem_usage_verifier import MemUsageVerifier
class TestKrpcMetrics(CustomClusterTestSuite):
"""Test for KRPC metrics that require special arguments during cluster startup."""
RPCZ_URL = 'http://localhost:25000/rpcz?json'
TEST_QUERY = 'select count(*) from tpch_parquet.lineitem l1 \
join tpch_parquet.lineitem l2 where l1.l_orderkey = l2.l_orderkey;'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestKrpcMetrics, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
-datastream_service_num_svc_threads=1')
def test_krpc_queue_overflow_rpcz(self, vector):
"""Test that rejected RPCs show up on the /rpcz debug web page.
"""
def get_rpc_overflows():
rpcz = self.get_debug_page(self.RPCZ_URL)
assert len(rpcz['services']) > 0
for s in rpcz['services']:
if s['service_name'] == 'impala.DataStreamService':
return int(s['rpcs_queue_overflow'])
assert False, "Could not find DataStreamService metrics"
before = get_rpc_overflows()
assert before == 0
self.client.execute(self.TEST_QUERY)
after = get_rpc_overflows()
assert before < after
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
-datastream_service_num_svc_threads=1',
force_restart=True)
def test_krpc_queue_overflow_metrics(self, vector):
"""Test that rejected RPCs show up on the /metrics debug web page.
"""
metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow'
before = self.get_metric(metric_name)
assert before == 0
self.client.execute(self.TEST_QUERY)
after = self.get_metric(metric_name)
assert before < after
@pytest.mark.execute_serially
def test_krpc_service_queue_metrics(self, vector):
"""Test that memory usage metrics for the data stream service queue show up on the
/metrics debug web page.
"""
self.client.execute(self.TEST_QUERY)
assert self.get_metric('mem-tracker.DataStreamService.current_usage_bytes') >= 0
assert self.get_metric('mem-tracker.DataStreamService.peak_usage_bytes') > 0