Files
impala/tests/custom_cluster/test_services_rpc_errors.py
Riza Suminto 1cead45114 IMPALA-13947: Test local catalog mode by default
Local catalog mode has been the default and works well in downstream
Impala for over 5 years. This patch turn on local catalog mode by
default (--catalog_topic_mode=minimal and --use_local_catalog=true) as
preferred mode going forward.

Implemented LocalCatalog.setIsReady() to facilitate using local catalog
mode for FE tests. Some FE tests fail due to behavior differences in
local catalog mode like IMPALA-7539. This is probably OK since Impala
now largely hand over FileSystem permission check to Apache Ranger.

The following custom cluster tests are pinned to evaluate under legacy
catalog mode because their behavior changed in local catalog mode:

TestCalcitePlanner.test_calcite_frontend
TestCoordinators.test_executor_only_lib_cache
TestMetadataReplicas
TestTupleCacheCluster
TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal

At TestHBaseHmsColumnOrder.test_hbase_hms_column_order, set
--use_hms_column_order_for_hbase_tables=true flag for both impalad and
catalogd to get consistent column order in either local or legacy
catalog mode.

Changed TestCatalogRpcErrors.test_register_subscriber_rpc_error
assertions to be more fine grained by matching individual query id.

Move most of test methods from TestRangerLegacyCatalog to
TestRangerLocalCatalog, except for some that do need to run in legacy
catalog mode. Also renamed TestRangerLocalCatalog to
TestRangerDefaultCatalog. Table ownership issue in local catalog mode
remains unresolved (see IMPALA-8937).

Testing:
Pass exhaustive tests.

Change-Id: Ie303e294972d12b98f8354bf6bbc6d0cb920060f
Reviewed-on: http://gerrit.cloudera.org:8080/23080
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-08-06 21:42:24 +00:00

104 lines
4.4 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestStatestoreRpcErrors(CustomClusterTestSuite):
"""Tests for statestore RPC handling."""
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestStatestoreRpcErrors, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
" --debug_actions=REGISTER_SUBSCRIBER_FIRST_ATTEMPT:FAIL@1.0")
def test_register_subscriber_rpc_error(self):
self.assert_impalad_log_contains("INFO",
"Injected RPC error.*Debug Action: REGISTER_SUBSCRIBER_FIRST_ATTEMPT")
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=" --debug_actions=GET_PROTOCOL_VERSION_FIRST_ATTEMPT:FAIL@1.0",
disable_log_buffering=True)
def test_get_protocol_version_rpc_error(self):
self.assert_impalad_log_contains("INFO",
"Injected RPC error.*Debug Action: GET_PROTOCOL_VERSION_FIRST_ATTEMPT")
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
class TestCatalogRpcErrors(CustomClusterTestSuite):
"""Tests for catalog RPC handling."""
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestCatalogRpcErrors, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=" --debug_actions=CATALOG_RPC_FIRST_ATTEMPT:FAIL@1.0",
disable_log_buffering=True)
def test_register_subscriber_rpc_error(self, unique_database):
"""Validate that RPCs to the catalogd are retried by injecting a failure into the
first RPC attempt for any catalogd RPC. Run a variety of queries that require
catalogd interaction to ensure all RPCs are retried."""
log_pattern = "{0}.*Injected RPC error.*Debug Action: CATALOG_RPC_FIRST_ATTEMPT"
# Validate create table queries.
result = self.execute_query("create table {0}.tmp (col int)".format(unique_database))
assert result.success
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)
# Validate insert queries.
result = self.execute_query("insert into table {0}.tmp values (1)"
.format(unique_database))
assert result.success
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)
# Validate compute stats queries.
result = self.execute_query("compute stats {0}.tmp".format(unique_database))
assert result.success
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)
# Validate refresh table queries.
result = self.execute_query("refresh {0}.tmp".format(unique_database))
assert result.success
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)
# Validate drop table queries.
result = self.execute_query("drop table {0}.tmp".format(unique_database))
assert result.success
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)
# Validate select queries against pre-existing, but not-loaded tables.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
self.assert_impalad_log_contains("INFO", log_pattern.format(result.query_id), -1)