Files
impala/tests/custom_cluster/test_services_rpc_errors.py
Riza Suminto 95f353ac4a IMPALA-13507: Allow disabling glog buffering via with_args fixture
We have plenty of custom_cluster tests that assert against content of
Impala daemon log files while the process is still running using
assert_log_contains() and it's wrappers. The method specifically mention
about disabling glog buffering ('-logbuflevel=-1'), but not all
custom_cluster tests do that. This often result in flaky test that hard
to triage and often neglected if it does not frequently run in core
exploration.

This patch adds boolean param 'disable_log_buffering' into
CustomClusterTestSuite.with_args for test to declare intention to
inspect log files in live minicluster. If it is True, start minicluster
with '-logbuflevel=-1' for all daemons. If it is False, log WARNING on
any calls to assert_log_contains().

There are several complex custom_cluster tests that left unchanged and
print out such WARNING logs, such as:
- TestQueryLive
- TestQueryLogTableBeeswax
- TestQueryLogOtherTable
- TestQueryLogTableHS2
- TestQueryLogTableAll
- TestQueryLogTableBufferPool
- TestStatestoreRpcErrors
- TestWorkloadManagementInitWait
- TestWorkloadManagementSQLDetails

This patch also fixed some small flake8 issues on modified tests.

There is a flakiness sign at test_query_live.py where test query is
submitted to coordinator and fail because sys.impala_query_live table
has not exist yet from coordinator's perspective. This patch modify
test_query_live.py to wait for few seconds until sys.impala_query_live
is queryable.

Testing:
- Pass custom_cluster tests in exhaustive exploration.

Change-Id: I56fb1746b8f3cea9f3db3514a86a526dffb44a61
Reviewed-on: http://gerrit.cloudera.org:8080/22015
Reviewed-by: Jason Fehr <jfehr@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2024-11-05 04:49:05 +00:00

111 lines
4.3 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestStatestoreRpcErrors(CustomClusterTestSuite):
"""Tests for statestore RPC handling."""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestStatestoreRpcErrors, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
" --debug_actions=REGISTER_SUBSCRIBER_FIRST_ATTEMPT:FAIL@1.0")
def test_register_subscriber_rpc_error(self):
self.assert_impalad_log_contains("INFO",
"Injected RPC error.*Debug Action: REGISTER_SUBSCRIBER_FIRST_ATTEMPT")
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=" --debug_actions=GET_PROTOCOL_VERSION_FIRST_ATTEMPT:FAIL@1.0",
disable_log_buffering=True)
def test_get_protocol_version_rpc_error(self):
self.assert_impalad_log_contains("INFO",
"Injected RPC error.*Debug Action: GET_PROTOCOL_VERSION_FIRST_ATTEMPT")
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
class TestCatalogRpcErrors(CustomClusterTestSuite):
"""Tests for catalog RPC handling."""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestCatalogRpcErrors, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=" --debug_actions=CATALOG_RPC_FIRST_ATTEMPT:FAIL@1.0",
disable_log_buffering=True)
def test_register_subscriber_rpc_error(self, unique_database):
"""Validate that RPCs to the catalogd are retried by injecting a failure into the
first RPC attempt for any catalogd RPC. Run a variety of queries that require
catalogd interaction to ensure all RPCs are retried."""
# Validate create table queries.
result = self.execute_query("create table {0}.tmp (col int)".format(unique_database))
assert result.success
# Validate insert queries.
result = self.execute_query("insert into table {0}.tmp values (1)"
.format(unique_database))
assert result.success
# Validate compute stats queries.
result = self.execute_query("compute stats {0}.tmp".format(unique_database))
assert result.success
# Validate refresh table queries.
result = self.execute_query("refresh {0}.tmp".format(unique_database))
assert result.success
# Validate drop table queries.
result = self.execute_query("drop table {0}.tmp".format(unique_database))
assert result.success
# Validate select queries against pre-existing, but not-loaded tables.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
# The 6 queries above each should have triggered the DEBUG_ACTION, so assert that
# the DEBUG_ACTION was triggered 8 times (an extra 2 for the DROP and CREATE DATABASE
# queries needed to make the unique_database).
self.assert_impalad_log_contains("INFO",
"Injected RPC error.*Debug Action: CATALOG_RPC_FIRST_ATTEMPT", 8)