Files
impala/tests/custom_cluster/test_catalog_hms_failures.py
Csaba Ringhofer f98b697c7b IMPALA-13929: Make 'functional-query' the default workload in tests
This change adds get_workload() to ImpalaTestSuite and removes it
from all test suites that already returned 'functional-query'.
get_workload() is also removed from CustomClusterTestSuite which
used to return 'tpch'.

All other changes besides impala_test_suite.py and
custom_cluster_test_suite.py are just mass removals of
get_workload() functions.

The behavior is only changed in custom cluster tests that didn't
override get_workload(). By returning 'functional-query' instead
of 'tpch', exploration_strategy() will no longer return 'core' in
'exhaustive' test runs. See IMPALA-3947 on why workload affected
exploration_strategy. An example for affected test is
TestCatalogHMSFailures which was skipped both in core and exhaustive
runs before this change.

get_workload() functions that return a different workload than
'functional-query' are not changed - it is possible that some of
these also don't handle exploration_strategy() as expected, but
individually checking these tests is out of scope in this patch.

Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115
Reviewed-on: http://gerrit.cloudera.org:8080/22726
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-08 07:12:55 +00:00

320 lines
12 KiB
Python

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
import os
import time
import threading
from subprocess import check_call
from tests.common.custom_cluster_test_suite import (
CustomClusterTestSuite,
DEFAULT_CLUSTER_SIZE)
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.skip import SkipIf
from tests.util.event_processor_utils import EventProcessorUtils
NUM_SUBSCRIBERS = DEFAULT_CLUSTER_SIZE + 1
@SkipIf.is_test_jdk
class TestHiveMetaStoreFailure(CustomClusterTestSuite):
"""Tests to validate the Catalog Service continues to function even if the HMS
fails."""
@classmethod
def setup_class(cls):
super(TestHiveMetaStoreFailure, cls).setup_class()
@classmethod
def run_hive_metastore(cls, if_not_running=False):
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
run_cmd = [script, '-only_metastore']
if if_not_running:
run_cmd.append('-if_not_running')
check_call(run_cmd, close_fds=True)
@classmethod
def teardown_class(cls):
# Make sure the metastore is running even if the test aborts somewhere unexpected
# before restarting the metastore itself.
cls.run_hive_metastore(if_not_running=True)
super(TestHiveMetaStoreFailure, cls).teardown_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
catalogd_args='--catalog_topic_mode=minimal')
def test_hms_service_dies(self):
"""Regression test for IMPALA-823 to verify the catalog service works properly when
HMS connections fail"""
# Force the tables to be uncached and then kill the hive metastore.
tbl_name = "functional.alltypes"
self.client.execute("invalidate metadata %s" % tbl_name)
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
try:
self.client.execute("describe %s" % tbl_name)
except IMPALA_CONNECTION_EXCEPTION as e:
print(str(e))
assert "Failed to load metadata for table: %s. Running 'invalidate metadata %s' "\
"may resolve this problem." % (tbl_name, tbl_name) in str(e)
self.run_hive_metastore()
self.client.execute("invalidate metadata %s" % tbl_name)
self.client.execute("describe %s" % tbl_name)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog',
catalogd_args='--catalog_topic_mode=minimal')
def test_local_catalog_load_with_hms_state_change(self, unique_database):
self.run_test_load_with_hms_down_and_up(unique_database,
"local_catalog_load_with_hms_state_change")
@pytest.mark.execute_serially
def test_load_with_hms_state_change(self, unique_database):
self.run_test_load_with_hms_down_and_up(unique_database,
"load_with_hms_state_change")
def run_test_load_with_hms_down_and_up(self, unique_database, table_name):
table = unique_database + "." + table_name
self.client.execute("create table {0} (i int)".format(table))
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
for _ in range(2):
try:
self.client.execute("describe {0}".format(table))
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Failed to load metadata for table: %s. "\
"Running 'invalidate metadata %s' may resolve this problem." \
% (table, table) in str(e)
self.run_hive_metastore()
res = self.client.execute("describe {0}".format(table))
assert res.data == ["i\tint\t"]
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
catalogd_args='--catalog_topic_mode=minimal',
disable_log_buffering=True)
def test_hms_client_retries(self):
"""Test that a running query will trigger the retry logic in
RetryingMetaStoreClient."""
# Force the tables to be uncached and then kill the hive metastore.
tbl_name = "functional.alltypes"
self.client.execute("invalidate metadata %s" % tbl_name)
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
# Run a query asynchronously.
query = "select * from {0} limit 1".format(tbl_name)
thread = threading.Thread(target=lambda:
self.execute_query_expect_success(self.client, query))
thread.start()
# Wait 1 second for the catalogd to start contacting HMS, then start HMS.
time.sleep(1)
self.run_hive_metastore()
# Wait for the query to complete, assert that the HMS client retried the connection.
thread.join()
self.assert_catalogd_log_contains("INFO",
"MetaStoreClient lost connection. Attempting to reconnect", expected_count=-1)
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog',
catalogd_args='--catalog_topic_mode=minimal')
def test_event_processor_tolerate_hms_restart(self):
"""IMPALA-12561: Test that event-processor won't go into ERROR state when there are
connection issues with HMS (mocked by a restart on HMS)"""
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.run_hive_metastore()
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
@SkipIf.is_test_jdk
class TestCatalogHMSFailures(CustomClusterTestSuite):
"""Test Catalog behavior when HMS is not present."""
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('These tests only run in exhaustive')
super(TestCatalogHMSFailures, cls).setup_class()
@classmethod
def run_hive_metastore(cls, if_not_running=False):
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
run_cmd = [script, '-only_metastore']
if if_not_running:
run_cmd.append('-if_not_running')
check_call(run_cmd, close_fds=True)
@classmethod
def cleanup_process(cls, proc):
try:
proc.kill()
except Exception:
pass
try:
proc.wait()
except Exception:
pass
@classmethod
def teardown_class(cls):
# Make sure the metastore is running even if the test aborts somewhere unexpected
# before restarting the metastore itself.
cls.run_hive_metastore(if_not_running=True)
super(TestCatalogHMSFailures, cls).teardown_class()
@classmethod
def reload_metadata(cls, client):
client.execute('invalidate metadata')
client.execute('show databases')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
catalogd_args='--initial_hms_cnxn_timeout_s=120 --catalog_topic_mode=minimal')
def test_kill_hms_after_catalog_init(self):
"""IMPALA-4278: If HMS dies after catalogd initialization, SQL statements that force
metadata load should fail quickly. After HMS restart, metadata load should work
again"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_hs2_client()
self.reload_metadata(client)
# Kill HMS
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
# Metadata load should fail quickly
start = time.time()
try:
self.reload_metadata(client)
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Connection refused" in str(e)
else:
assert False, "Metadata load should have failed"
end = time.time()
assert end - start < 30, "Metadata load hasn't failed quickly enough"
# Start HMS
self.run_hive_metastore()
# Metadata load should work now
self.reload_metadata(client)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
catalogd_args='--initial_hms_cnxn_timeout_s=120 --catalog_topic_mode=minimal')
def test_start_catalog_before_hms(self):
"""IMPALA-4278: If catalogd is started with initial_hms_cnxn_timeout_s set to a value
greater than HMS startup time, it will manage to establish connection to HMS even if
HMS is started a little later"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_hs2_client()
self.reload_metadata(client)
# Kill HMS
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
# Kill the catalogd.
catalogd = self.cluster.catalogd
catalogd.kill()
# The statestore should detect the catalog service has gone down.
statestored = self.cluster.statestored
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60)
try:
# Start the catalog service asynchronously.
catalogd.start(wait_until_ready=False)
# Wait 10s to be sure that the catalogd is in the 'trying to connect' phase of its
# startup.
time.sleep(10)
# Start HMS and wait for catalogd to come up
self.run_hive_metastore()
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
impalad.service.wait_for_metric_value('catalog.ready', True, timeout=60)
# Metadata load should work now
self.reload_metadata(client)
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',
catalogd_args='--initial_hms_cnxn_timeout_s=30 --catalog_topic_mode=minimal')
def test_catalogd_fails_if_hms_started_late(self):
"""IMPALA-4278: If the HMS is not started within initial_hms_cnxn_timeout_s, then the
catalogd fails"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_hs2_client()
self.reload_metadata(client)
# Kill HMS
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd, '-only_metastore'], close_fds=True)
# Kill the catalogd.
catalogd = self.cluster.catalogd
catalogd.kill()
# The statestore should detect the catalog service has gone down.
statestored = self.cluster.statestored
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60)
try:
# Start the catalog service asynchronously.
catalogd.start(wait_until_ready=False)
# Wait 40s to be sure that the catalogd has been trying to connect to HMS longer
# than initial_hms_cnxn_timeout_s.
time.sleep(40)
# Start HMS
self.run_hive_metastore()
# catalogd has terminated by now
assert not catalogd.get_pid(), "catalogd should have terminated"
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)
try:
# Start the catalog service again and wait for it to come up.
catalogd.start()
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
impalad.service.wait_for_metric_value('catalog.ready', True, timeout=60)
# Metadata load should work now
self.reload_metadata(client)
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)