Files
impala/tests/custom_cluster/test_data_cache.py
Riza Suminto f28a32fbc3 IMPALA-13916: Change BaseTestSuite.default_test_protocol to HS2
This is the final patch to move all Impala e2e and custom cluster tests
to use HS2 protocol by default. Only beeswax-specific test remains
testing against beeswax protocol by default. We can remove them once
Impala officially remove beeswax support.

HS2 error message formatting in impala-hs2-server.cc is adjusted a bit
to match with formatting in impala-beeswax-server.cc.

Move TestWebPageAndCloseSession from webserver/test_web_pages.py to
custom_cluster/test_web_pages.py to disable glog log buffering.

Testing:
- Pass exhaustive tests, except for some known and unrelated flaky
  tests.

Change-Id: I42e9ceccbba1e6853f37e68f106265d163ccae28
Reviewed-on: http://gerrit.cloudera.org:8080/22845
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Jason Fehr <jfehr@cloudera.com>
2025-05-20 14:32:10 +00:00

322 lines
15 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from signal import SIGRTMIN
from time import sleep
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
from tests.common.test_vector import HS2
@SkipIf.is_buggy_el6_kernel
@SkipIfNotHdfsMinicluster.scheduling
class TestDataCache(CustomClusterTestSuite):
""" This test enables the data cache and verfies that cache hit and miss counts
in the runtime profile and metrics are as expected. Run on non-EC HDFS only as
this test checks the number of data cache hit counts, which implicitly relies
on the scheduler's behavior and number of HDFS blocks.
"""
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestDataCache, cls).setup_class()
def get_impalad_args(eviction_policy, high_write_concurrency=True,
force_single_shard=True, keep_across_restarts=False):
impalad_args = ["--always_use_data_cache=true"]
if (high_write_concurrency):
impalad_args.append("--data_cache_write_concurrency=64")
if (force_single_shard):
impalad_args.append("--cache_force_single_shard")
if (keep_across_restarts):
impalad_args.append("--data_cache_keep_across_restarts=true")
impalad_args.append("--shutdown_grace_period_s=1")
impalad_args.append("--data_cache_eviction_policy={0}".format(eviction_policy))
return " ".join(impalad_args)
def get_data_cache_metric(self, suffix):
return self.get_metric('impala-server.io-mgr.remote-data-cache-' + suffix)
CACHE_START_ARGS = "--data_cache_dir=/tmp --data_cache_size=500MB"
def __test_data_cache_deterministic(self, vector, unique_database):
""" This test creates a temporary table from another table, overwrites it with
some other data and verifies that no stale data is read from the cache. Runs with
a single node to make it easier to verify the runtime profile. Also enables higher
write concurrency and uses a single shard to avoid non-determinism.
"""
self.run_test_case('QueryTest/data-cache', vector, unique_database)
assert self.get_data_cache_metric('dropped-bytes') >= 0
assert self.get_data_cache_metric('dropped-entries') >= 0
assert self.get_data_cache_metric('instant-evictions') >= 0
assert self.get_data_cache_metric('hit-bytes') > 0
assert self.get_data_cache_metric('hit-count') > 0
assert self.get_data_cache_metric('miss-bytes') > 0
assert self.get_data_cache_metric('miss-count') > 0
assert self.get_data_cache_metric('total-bytes') > 0
assert self.get_data_cache_metric('num-entries') > 0
assert self.get_data_cache_metric('num-writes') > 0
# Expect all cache hits results in no opened files.
opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count'
baseline = self.get_metric(opened_file_handles_metric)
self.execute_query("select count(distinct l_orderkey) from {0}.test_parquet".format(
unique_database))
assert self.get_metric(opened_file_handles_metric) == baseline
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU"),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_deterministic_lru(self, vector, unique_database):
self.__test_data_cache_deterministic(vector, unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS"),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_deterministic_lirs(self, vector, unique_database):
self.__test_data_cache_deterministic(vector, unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU") + " --max_cached_file_handles=0",
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_deterministic_no_file_handle_cache(self, vector, unique_database):
self.__test_data_cache_deterministic(vector, unique_database)
def __test_data_cache(self):
""" This test scans the same table twice and verifies the cache hit count metrics
are correct. The exact number of bytes hit is non-deterministic between runs due
to different mtime of files and multiple shards in the cache.
"""
QUERY = "select * from tpch_parquet.lineitem"
# Do a first run to warm up the cache. Expect no hits.
self.execute_query(QUERY)
assert self.get_data_cache_metric('hit-bytes') == 0
assert self.get_data_cache_metric('hit-count') == 0
assert self.get_data_cache_metric('miss-bytes') > 0
assert self.get_data_cache_metric('miss-count') > 0
assert self.get_data_cache_metric('total-bytes') > 0
assert self.get_data_cache_metric('num-entries') > 0
assert self.get_data_cache_metric('num-writes') > 0
# Do a second run. Expect some hits.
self.execute_query(QUERY)
assert self.get_data_cache_metric('hit-bytes') > 0
assert self.get_data_cache_metric('hit-count') > 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU", high_write_concurrency=False,
force_single_shard=False),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_lru(self):
self.__test_data_cache()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS", high_write_concurrency=False,
force_single_shard=False),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_lirs(self):
self.__test_data_cache()
def __test_data_cache_disablement(self, vector):
# Verifies that the cache metrics are all zero.
assert self.get_data_cache_metric('hit-bytes') == 0
assert self.get_data_cache_metric('hit-count') == 0
assert self.get_data_cache_metric('miss-bytes') == 0
assert self.get_data_cache_metric('miss-count') == 0
assert self.get_data_cache_metric('total-bytes') == 0
assert self.get_data_cache_metric('num-entries') == 0
assert self.get_data_cache_metric('num-writes') == 0
# Runs a query with the cache disabled and then enabled against multiple file formats.
# Verifies that the metrics stay at zero when the cache is disabled.
for disable_cache in [True, False]:
vector.get_value('exec_option')['disable_data_cache'] = int(disable_cache)
for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']:
QUERY = "select * from functional_{0}.alltypes".format(file_format)
self.execute_query(QUERY, vector.get_value('exec_option'))
assert disable_cache == (self.get_data_cache_metric('miss-bytes') == 0)
assert disable_cache == (self.get_data_cache_metric('miss-count') == 0)
assert disable_cache == (self.get_data_cache_metric('total-bytes') == 0)
assert disable_cache == (self.get_data_cache_metric('num-entries') == 0)
assert disable_cache == (self.get_data_cache_metric('num-writes') == 0)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU"),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_disablement_lru(self, vector):
self.__test_data_cache_disablement(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS"),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_disablement_lirs(self, vector):
self.__test_data_cache_disablement(vector)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS", high_write_concurrency=False),
start_args="--data_cache_dir=/tmp --data_cache_size=9MB",
cluster_size=1)
def test_data_cache_lirs_instant_evictions(self):
# The setup for this test is intricate. For Allocate() to succeed, the request
# needs to be smaller than the protected size (95% of the cache). For Insert() to
# fail, the request needs to be larger than the unprotected size (5% of the cache).
# So, for an 8MB cache store to fail, the cache needs to be > 8.4MB (8MB / 0.95)
# and less than 160MB (8MB / 0.05). This sets it to 9MB, which should result in
# 8MB cache inserts to be instantly evicted.
QUERY = "select count(*) from tpch.lineitem"
self.execute_query(QUERY)
assert self.get_data_cache_metric('miss-bytes') > 0
assert self.get_data_cache_metric('miss-count') > 0
assert self.get_data_cache_metric('total-bytes') >= 0
assert self.get_data_cache_metric('num-entries') >= 0
assert self.get_data_cache_metric('num-writes') >= 0
assert self.get_data_cache_metric('instant-evictions') > 0
# Run the query multiple times and verify that none of the counters go negative
instant_evictions_before = \
self.get_data_cache_metric('instant-evictions')
for i in range(10):
self.execute_query(QUERY)
instant_evictions_after = \
self.get_data_cache_metric('instant-evictions')
assert instant_evictions_after - instant_evictions_before > 0
# All the counters remain positive
assert self.get_data_cache_metric('num-entries') >= 0
assert self.get_data_cache_metric('num-writes') >= 0
assert self.get_data_cache_metric('total-bytes') >= 0
def __test_data_cache_keep_across_restarts(self, test_reduce_size=False):
QUERY = "select * from tpch_parquet.lineitem"
# Execute a query, record the total bytes and the number of entries of cache before
# cache dump.
with self.create_impala_client(protocol=HS2) as client1:
client1.execute(QUERY)
assert self.get_data_cache_metric('hit-bytes') == 0
assert self.get_data_cache_metric('hit-count') == 0
total_bytes = self.get_data_cache_metric('total-bytes')
num_entries = self.get_data_cache_metric('num-entries')
# Do graceful restart and, if necessary, reduce the cache size by 1/5.
impalad = self.cluster.impalads[0]
impalad.kill_and_wait_for_exit(SIGRTMIN)
new_size = 4 * total_bytes // 5
if test_reduce_size:
impalad.modify_argument('-data_cache', '/tmp/impala-datacache-0:' + str(new_size))
impalad.start()
impalad.service.wait_for_num_known_live_backends(1)
# After the restart, we expect the cache to have the same total bytes
# and number of entries as before the restart, and if the cache size is reduced,
# the metrics should be reduced accordingly.
if test_reduce_size:
assert self.get_data_cache_metric('total-bytes') <= new_size
assert self.get_data_cache_metric('num-entries') < num_entries
else:
assert self.get_data_cache_metric('total-bytes') == total_bytes
assert self.get_data_cache_metric('num-entries') == num_entries
# Reconnect to the service and execute the query, expecting some cache hits.
with self.create_impala_client(protocol=HS2) as client2:
client2.execute(QUERY)
assert self.get_data_cache_metric('hit-bytes') > 0
assert self.get_data_cache_metric('hit-count') > 0
if test_reduce_size:
assert self.get_data_cache_metric('miss-bytes') > 0
assert self.get_data_cache_metric('miss-count') > 0
else:
assert self.get_data_cache_metric('miss-bytes') == 0
assert self.get_data_cache_metric('miss-count') == 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_keep_across_restarts_lru(self):
self.__test_data_cache_keep_across_restarts()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_keep_across_restarts_lirs(self):
self.__test_data_cache_keep_across_restarts()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_reduce_size_restarts_lru(self):
self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1)
def test_data_cache_reduce_size_restarts_lirs(self):
self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
def __test_data_cache_readonly(self):
QUERY = "select * from tpch_parquet.lineitem"
# Execute the query asynchronously, wait a short while, and do gracefully shutdown
# immediately to test the race between cache writes and setting cache read-only.
with self.create_impala_client(protocol=HS2) as client1:
handle = client1.execute_async(QUERY)
sleep(1)
impalad = self.cluster.impalads[0]
impalad.kill(SIGRTMIN)
client1.fetch(QUERY, handle)
client1.close_query(handle)
impalad.wait_for_exit()
impalad.start()
impalad.service.wait_for_num_known_live_backends(1)
# We hope that in this case, the cache is still properly dumped and loaded,
# and then the same query is executed to expect some cache hits.
self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.')
with self.create_impala_client(protocol=HS2) as client2:
client2.execute(QUERY)
assert self.get_data_cache_metric('hit-bytes') > 0
assert self.get_data_cache_metric('hit-count') > 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
def test_data_cache_readonly_lru(self):
self.__test_data_cache_readonly()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
def test_data_cache_readonly_lirs(self):
self.__test_data_cache_readonly()