mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This is the final patch to move all Impala e2e and custom cluster tests to use HS2 protocol by default. Only beeswax-specific test remains testing against beeswax protocol by default. We can remove them once Impala officially remove beeswax support. HS2 error message formatting in impala-hs2-server.cc is adjusted a bit to match with formatting in impala-beeswax-server.cc. Move TestWebPageAndCloseSession from webserver/test_web_pages.py to custom_cluster/test_web_pages.py to disable glog log buffering. Testing: - Pass exhaustive tests, except for some known and unrelated flaky tests. Change-Id: I42e9ceccbba1e6853f37e68f106265d163ccae28 Reviewed-on: http://gerrit.cloudera.org:8080/22845 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Jason Fehr <jfehr@cloudera.com>
322 lines
15 KiB
Python
322 lines
15 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from signal import SIGRTMIN
|
|
from time import sleep
|
|
|
|
import pytest
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
|
|
from tests.common.test_vector import HS2
|
|
|
|
|
|
@SkipIf.is_buggy_el6_kernel
|
|
@SkipIfNotHdfsMinicluster.scheduling
|
|
class TestDataCache(CustomClusterTestSuite):
|
|
""" This test enables the data cache and verfies that cache hit and miss counts
|
|
in the runtime profile and metrics are as expected. Run on non-EC HDFS only as
|
|
this test checks the number of data cache hit counts, which implicitly relies
|
|
on the scheduler's behavior and number of HDFS blocks.
|
|
"""
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
super(TestDataCache, cls).setup_class()
|
|
|
|
def get_impalad_args(eviction_policy, high_write_concurrency=True,
|
|
force_single_shard=True, keep_across_restarts=False):
|
|
impalad_args = ["--always_use_data_cache=true"]
|
|
if (high_write_concurrency):
|
|
impalad_args.append("--data_cache_write_concurrency=64")
|
|
if (force_single_shard):
|
|
impalad_args.append("--cache_force_single_shard")
|
|
if (keep_across_restarts):
|
|
impalad_args.append("--data_cache_keep_across_restarts=true")
|
|
impalad_args.append("--shutdown_grace_period_s=1")
|
|
impalad_args.append("--data_cache_eviction_policy={0}".format(eviction_policy))
|
|
return " ".join(impalad_args)
|
|
|
|
def get_data_cache_metric(self, suffix):
|
|
return self.get_metric('impala-server.io-mgr.remote-data-cache-' + suffix)
|
|
|
|
CACHE_START_ARGS = "--data_cache_dir=/tmp --data_cache_size=500MB"
|
|
|
|
def __test_data_cache_deterministic(self, vector, unique_database):
|
|
""" This test creates a temporary table from another table, overwrites it with
|
|
some other data and verifies that no stale data is read from the cache. Runs with
|
|
a single node to make it easier to verify the runtime profile. Also enables higher
|
|
write concurrency and uses a single shard to avoid non-determinism.
|
|
"""
|
|
self.run_test_case('QueryTest/data-cache', vector, unique_database)
|
|
assert self.get_data_cache_metric('dropped-bytes') >= 0
|
|
assert self.get_data_cache_metric('dropped-entries') >= 0
|
|
assert self.get_data_cache_metric('instant-evictions') >= 0
|
|
assert self.get_data_cache_metric('hit-bytes') > 0
|
|
assert self.get_data_cache_metric('hit-count') > 0
|
|
assert self.get_data_cache_metric('miss-bytes') > 0
|
|
assert self.get_data_cache_metric('miss-count') > 0
|
|
assert self.get_data_cache_metric('total-bytes') > 0
|
|
assert self.get_data_cache_metric('num-entries') > 0
|
|
assert self.get_data_cache_metric('num-writes') > 0
|
|
|
|
# Expect all cache hits results in no opened files.
|
|
opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count'
|
|
baseline = self.get_metric(opened_file_handles_metric)
|
|
self.execute_query("select count(distinct l_orderkey) from {0}.test_parquet".format(
|
|
unique_database))
|
|
assert self.get_metric(opened_file_handles_metric) == baseline
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU"),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_deterministic_lru(self, vector, unique_database):
|
|
self.__test_data_cache_deterministic(vector, unique_database)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS"),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_deterministic_lirs(self, vector, unique_database):
|
|
self.__test_data_cache_deterministic(vector, unique_database)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU") + " --max_cached_file_handles=0",
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_deterministic_no_file_handle_cache(self, vector, unique_database):
|
|
self.__test_data_cache_deterministic(vector, unique_database)
|
|
|
|
def __test_data_cache(self):
|
|
""" This test scans the same table twice and verifies the cache hit count metrics
|
|
are correct. The exact number of bytes hit is non-deterministic between runs due
|
|
to different mtime of files and multiple shards in the cache.
|
|
"""
|
|
QUERY = "select * from tpch_parquet.lineitem"
|
|
# Do a first run to warm up the cache. Expect no hits.
|
|
self.execute_query(QUERY)
|
|
assert self.get_data_cache_metric('hit-bytes') == 0
|
|
assert self.get_data_cache_metric('hit-count') == 0
|
|
assert self.get_data_cache_metric('miss-bytes') > 0
|
|
assert self.get_data_cache_metric('miss-count') > 0
|
|
assert self.get_data_cache_metric('total-bytes') > 0
|
|
assert self.get_data_cache_metric('num-entries') > 0
|
|
assert self.get_data_cache_metric('num-writes') > 0
|
|
|
|
# Do a second run. Expect some hits.
|
|
self.execute_query(QUERY)
|
|
assert self.get_data_cache_metric('hit-bytes') > 0
|
|
assert self.get_data_cache_metric('hit-count') > 0
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU", high_write_concurrency=False,
|
|
force_single_shard=False),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_lru(self):
|
|
self.__test_data_cache()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS", high_write_concurrency=False,
|
|
force_single_shard=False),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_lirs(self):
|
|
self.__test_data_cache()
|
|
|
|
def __test_data_cache_disablement(self, vector):
|
|
# Verifies that the cache metrics are all zero.
|
|
assert self.get_data_cache_metric('hit-bytes') == 0
|
|
assert self.get_data_cache_metric('hit-count') == 0
|
|
assert self.get_data_cache_metric('miss-bytes') == 0
|
|
assert self.get_data_cache_metric('miss-count') == 0
|
|
assert self.get_data_cache_metric('total-bytes') == 0
|
|
assert self.get_data_cache_metric('num-entries') == 0
|
|
assert self.get_data_cache_metric('num-writes') == 0
|
|
|
|
# Runs a query with the cache disabled and then enabled against multiple file formats.
|
|
# Verifies that the metrics stay at zero when the cache is disabled.
|
|
for disable_cache in [True, False]:
|
|
vector.get_value('exec_option')['disable_data_cache'] = int(disable_cache)
|
|
for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']:
|
|
QUERY = "select * from functional_{0}.alltypes".format(file_format)
|
|
self.execute_query(QUERY, vector.get_value('exec_option'))
|
|
assert disable_cache == (self.get_data_cache_metric('miss-bytes') == 0)
|
|
assert disable_cache == (self.get_data_cache_metric('miss-count') == 0)
|
|
assert disable_cache == (self.get_data_cache_metric('total-bytes') == 0)
|
|
assert disable_cache == (self.get_data_cache_metric('num-entries') == 0)
|
|
assert disable_cache == (self.get_data_cache_metric('num-writes') == 0)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU"),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_disablement_lru(self, vector):
|
|
self.__test_data_cache_disablement(vector)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS"),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_disablement_lirs(self, vector):
|
|
self.__test_data_cache_disablement(vector)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS", high_write_concurrency=False),
|
|
start_args="--data_cache_dir=/tmp --data_cache_size=9MB",
|
|
cluster_size=1)
|
|
def test_data_cache_lirs_instant_evictions(self):
|
|
# The setup for this test is intricate. For Allocate() to succeed, the request
|
|
# needs to be smaller than the protected size (95% of the cache). For Insert() to
|
|
# fail, the request needs to be larger than the unprotected size (5% of the cache).
|
|
# So, for an 8MB cache store to fail, the cache needs to be > 8.4MB (8MB / 0.95)
|
|
# and less than 160MB (8MB / 0.05). This sets it to 9MB, which should result in
|
|
# 8MB cache inserts to be instantly evicted.
|
|
QUERY = "select count(*) from tpch.lineitem"
|
|
self.execute_query(QUERY)
|
|
assert self.get_data_cache_metric('miss-bytes') > 0
|
|
assert self.get_data_cache_metric('miss-count') > 0
|
|
assert self.get_data_cache_metric('total-bytes') >= 0
|
|
assert self.get_data_cache_metric('num-entries') >= 0
|
|
assert self.get_data_cache_metric('num-writes') >= 0
|
|
assert self.get_data_cache_metric('instant-evictions') > 0
|
|
|
|
# Run the query multiple times and verify that none of the counters go negative
|
|
instant_evictions_before = \
|
|
self.get_data_cache_metric('instant-evictions')
|
|
for i in range(10):
|
|
self.execute_query(QUERY)
|
|
instant_evictions_after = \
|
|
self.get_data_cache_metric('instant-evictions')
|
|
assert instant_evictions_after - instant_evictions_before > 0
|
|
|
|
# All the counters remain positive
|
|
assert self.get_data_cache_metric('num-entries') >= 0
|
|
assert self.get_data_cache_metric('num-writes') >= 0
|
|
assert self.get_data_cache_metric('total-bytes') >= 0
|
|
|
|
def __test_data_cache_keep_across_restarts(self, test_reduce_size=False):
|
|
QUERY = "select * from tpch_parquet.lineitem"
|
|
# Execute a query, record the total bytes and the number of entries of cache before
|
|
# cache dump.
|
|
with self.create_impala_client(protocol=HS2) as client1:
|
|
client1.execute(QUERY)
|
|
assert self.get_data_cache_metric('hit-bytes') == 0
|
|
assert self.get_data_cache_metric('hit-count') == 0
|
|
total_bytes = self.get_data_cache_metric('total-bytes')
|
|
num_entries = self.get_data_cache_metric('num-entries')
|
|
|
|
# Do graceful restart and, if necessary, reduce the cache size by 1/5.
|
|
impalad = self.cluster.impalads[0]
|
|
impalad.kill_and_wait_for_exit(SIGRTMIN)
|
|
new_size = 4 * total_bytes // 5
|
|
if test_reduce_size:
|
|
impalad.modify_argument('-data_cache', '/tmp/impala-datacache-0:' + str(new_size))
|
|
impalad.start()
|
|
impalad.service.wait_for_num_known_live_backends(1)
|
|
|
|
# After the restart, we expect the cache to have the same total bytes
|
|
# and number of entries as before the restart, and if the cache size is reduced,
|
|
# the metrics should be reduced accordingly.
|
|
if test_reduce_size:
|
|
assert self.get_data_cache_metric('total-bytes') <= new_size
|
|
assert self.get_data_cache_metric('num-entries') < num_entries
|
|
else:
|
|
assert self.get_data_cache_metric('total-bytes') == total_bytes
|
|
assert self.get_data_cache_metric('num-entries') == num_entries
|
|
|
|
# Reconnect to the service and execute the query, expecting some cache hits.
|
|
with self.create_impala_client(protocol=HS2) as client2:
|
|
client2.execute(QUERY)
|
|
assert self.get_data_cache_metric('hit-bytes') > 0
|
|
assert self.get_data_cache_metric('hit-count') > 0
|
|
if test_reduce_size:
|
|
assert self.get_data_cache_metric('miss-bytes') > 0
|
|
assert self.get_data_cache_metric('miss-count') > 0
|
|
else:
|
|
assert self.get_data_cache_metric('miss-bytes') == 0
|
|
assert self.get_data_cache_metric('miss-count') == 0
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_keep_across_restarts_lru(self):
|
|
self.__test_data_cache_keep_across_restarts()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_keep_across_restarts_lirs(self):
|
|
self.__test_data_cache_keep_across_restarts()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_reduce_size_restarts_lru(self):
|
|
self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1)
|
|
def test_data_cache_reduce_size_restarts_lirs(self):
|
|
self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
|
|
|
|
def __test_data_cache_readonly(self):
|
|
QUERY = "select * from tpch_parquet.lineitem"
|
|
# Execute the query asynchronously, wait a short while, and do gracefully shutdown
|
|
# immediately to test the race between cache writes and setting cache read-only.
|
|
with self.create_impala_client(protocol=HS2) as client1:
|
|
handle = client1.execute_async(QUERY)
|
|
sleep(1)
|
|
impalad = self.cluster.impalads[0]
|
|
impalad.kill(SIGRTMIN)
|
|
client1.fetch(QUERY, handle)
|
|
client1.close_query(handle)
|
|
impalad.wait_for_exit()
|
|
impalad.start()
|
|
impalad.service.wait_for_num_known_live_backends(1)
|
|
|
|
# We hope that in this case, the cache is still properly dumped and loaded,
|
|
# and then the same query is executed to expect some cache hits.
|
|
self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.')
|
|
with self.create_impala_client(protocol=HS2) as client2:
|
|
client2.execute(QUERY)
|
|
assert self.get_data_cache_metric('hit-bytes') > 0
|
|
assert self.get_data_cache_metric('hit-count') > 0
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
|
|
def test_data_cache_readonly_lru(self):
|
|
self.__test_data_cache_readonly()
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
|
|
start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
|
|
def test_data_cache_readonly_lirs(self):
|
|
self.__test_data_cache_readonly()
|