mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This change adds get_workload() to ImpalaTestSuite and removes it from all test suites that already returned 'functional-query'. get_workload() is also removed from CustomClusterTestSuite which used to return 'tpch'. All other changes besides impala_test_suite.py and custom_cluster_test_suite.py are just mass removals of get_workload() functions. The behavior is only changed in custom cluster tests that didn't override get_workload(). By returning 'functional-query' instead of 'tpch', exploration_strategy() will no longer return 'core' in 'exhaustive' test runs. See IMPALA-3947 on why workload affected exploration_strategy. An example for affected test is TestCatalogHMSFailures which was skipped both in core and exhaustive runs before this change. get_workload() functions that return a different workload than 'functional-query' are not changed - it is possible that some of these also don't handle exploration_strategy() as expected, but individually checking these tests is out of scope in this patch. Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115 Reviewed-on: http://gerrit.cloudera.org:8080/22726 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
372 lines
17 KiB
Python
372 lines
17 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Validates limit on scan nodes
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import pytest
|
|
import re
|
|
import time
|
|
from subprocess import check_call
|
|
|
|
from tests.common.environ import build_flavor_timeout, IS_DOCKERIZED_TEST_CLUSTER
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
|
|
from tests.common.skip import SkipIfFS, SkipIfDockerizedCluster
|
|
from tests.common.test_dimensions import create_single_exec_option_dimension
|
|
from tests.util.filesystem_utils import get_fs_path, IS_EC
|
|
from tests.util.shell_util import exec_process
|
|
|
|
|
|
# End to end test that hdfs caching is working.
|
|
@SkipIfFS.hdfs_caching # missing coverage: verify SET CACHED gives error
|
|
class TestHdfsCaching(ImpalaTestSuite):
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestHdfsCaching, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('exec_option')['batch_size'] == 0)
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == "text")
|
|
|
|
# The tpch nation table is cached as part of data loading. We'll issue a query
|
|
# against this table and verify the metric is updated correctly.
|
|
@pytest.mark.execute_serially
|
|
def test_table_is_cached(self, vector):
|
|
cached_read_metric = "impala-server.io-mgr.cached-bytes-read"
|
|
query_string = "select count(*) from tpch.nation"
|
|
expected_bytes_delta = 2199
|
|
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
|
|
|
|
# Collect the cached read metric on all the impalads before running the query
|
|
cached_bytes_before = list()
|
|
for impalad in impala_cluster.impalads:
|
|
cached_bytes_before.append(impalad.service.get_metric_value(cached_read_metric))
|
|
|
|
# Execute the query.
|
|
result = self.execute_query_using_vector(query_string, vector)
|
|
assert(len(result.data) == 1)
|
|
assert(result.data[0] == '25')
|
|
|
|
# Read the metrics again.
|
|
cached_bytes_after = list()
|
|
for impalad in impala_cluster.impalads:
|
|
cached_bytes_after.append(impalad.service.get_metric_value(cached_read_metric))
|
|
|
|
# Verify that the cached bytes increased by the expected number on exactly one of
|
|
# the impalads.
|
|
num_metrics_increased = 0
|
|
assert(len(cached_bytes_before) == len(cached_bytes_after))
|
|
for i in range(0, len(cached_bytes_before)):
|
|
assert (cached_bytes_before[i] == cached_bytes_after[i]
|
|
or cached_bytes_before[i] + expected_bytes_delta == cached_bytes_after[i])
|
|
if cached_bytes_after[i] > cached_bytes_before[i]:
|
|
num_metrics_increased = num_metrics_increased + 1
|
|
|
|
if IS_DOCKERIZED_TEST_CLUSTER:
|
|
assert num_metrics_increased == 0, "HDFS caching is disabled in dockerised cluster."
|
|
elif IS_EC:
|
|
assert num_metrics_increased == 0, "HDFS caching is disabled with erasure coding."
|
|
elif num_metrics_increased != 1:
|
|
# Test failed, print the metrics
|
|
for i in range(0, len(cached_bytes_before)):
|
|
print("%d %d" % (cached_bytes_before[i], cached_bytes_after[i]))
|
|
assert(False)
|
|
|
|
def test_cache_cancellation(self, vector):
|
|
""" This query runs on some mix of cached and not cached tables. The query has
|
|
a limit so it exercises the cancellation paths. Regression test for
|
|
IMPALA-1019. """
|
|
num_iters = 100
|
|
query_string = """
|
|
with t1 as (select int_col x, bigint_col y from functional.alltypes limit 2),
|
|
t2 as (select int_col x, bigint_col y from functional.alltypestiny limit 2),
|
|
t3 as (select int_col x, bigint_col y from functional.alltypessmall limit 2)
|
|
select * from t1, t2, t3 where t1.x = t2.x and t2.x = t3.x """
|
|
|
|
# Run this query for some iterations since it is timing dependent.
|
|
for x in range(1, num_iters):
|
|
result = self.execute_query_using_vector(query_string, vector)
|
|
assert(len(result.data) == 2)
|
|
|
|
|
|
# A separate class has been created for "test_hdfs_caching_fallback_path" to make it
|
|
# run as a part of exhaustive tests which require the workload to be 'functional-query'.
|
|
# TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads
|
|
@SkipIfFS.hdfs_caching
|
|
class TestHdfsCachingFallbackPath(ImpalaTestSuite):
|
|
@SkipIfFS.hdfs_encryption
|
|
def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checksum):
|
|
""" This tests the code path of the query execution where the hdfs cache read fails
|
|
and the execution falls back to the normal read path. To reproduce this situation we
|
|
rely on IMPALA-3679, where zcrs are not supported with encryption zones. This makes
|
|
sure ReadFromCache() fails and falls back to ReadRange() to read the scan range."""
|
|
|
|
if self.exploration_strategy() != 'exhaustive' or\
|
|
vector.get_value('table_format').file_format != 'text':
|
|
pytest.skip()
|
|
|
|
# Create a new encryption zone and copy the tpch.nation table data into it.
|
|
encrypted_table_dir = get_fs_path("/test-warehouse/" + testid_checksum)
|
|
create_query_sql = "CREATE EXTERNAL TABLE %s.cached_nation like tpch.nation "\
|
|
"LOCATION '%s'" % (unique_database, encrypted_table_dir)
|
|
check_call(["hdfs", "dfs", "-mkdir", encrypted_table_dir], shell=False)
|
|
check_call(["hdfs", "crypto", "-createZone", "-keyName", "testKey1", "-path",
|
|
encrypted_table_dir], shell=False)
|
|
check_call(["hdfs", "dfs", "-cp", get_fs_path("/test-warehouse/tpch.nation/*.tbl"),
|
|
encrypted_table_dir], shell=False)
|
|
# Reduce the scan range size to force the query to have multiple scan ranges.
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['max_scan_range_length'] = 1024
|
|
try:
|
|
self.execute_query_expect_success(self.client, create_query_sql)
|
|
# Cache the table data
|
|
self.execute_query_expect_success(self.client, "ALTER TABLE %s.cached_nation set "
|
|
"cached in 'testPool'" % unique_database)
|
|
# Wait till the whole path is cached. We set a deadline of 20 seconds for the path
|
|
# to be cached to make sure this doesn't loop forever in case of caching errors.
|
|
caching_deadline = time.time() + 20
|
|
while not is_path_fully_cached(encrypted_table_dir):
|
|
if time.time() > caching_deadline:
|
|
pytest.fail("Timed out caching path: " + encrypted_table_dir)
|
|
time.sleep(2)
|
|
self.execute_query_expect_success(self.client, "invalidate metadata "
|
|
"%s.cached_nation" % unique_database)
|
|
result = self.execute_query_expect_success(self.client, "select count(*) from "
|
|
"%s.cached_nation" % unique_database, exec_options)
|
|
assert(len(result.data) == 1)
|
|
assert(result.data[0] == '25')
|
|
except Exception as e:
|
|
pytest.fail("Failure in test_hdfs_caching_fallback_path: " + str(e))
|
|
finally:
|
|
check_call(["hdfs", "dfs", "-rm", "-r", "-f", "-skipTrash", encrypted_table_dir],
|
|
shell=False)
|
|
|
|
|
|
@SkipIfFS.hdfs_caching
|
|
class TestHdfsCachingDdl(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestHdfsCachingDdl, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
|
|
def setup_method(self, method):
|
|
super(TestHdfsCachingDdl, self).setup_method(method)
|
|
self.cleanup_db("cachedb")
|
|
self.client.execute("create database cachedb")
|
|
|
|
def teardown_method(self, method):
|
|
self.cleanup_db("cachedb")
|
|
super(TestHdfsCachingDdl, self).teardown_method(method)
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIfDockerizedCluster.accesses_host_filesystem
|
|
def test_caching_ddl(self, vector):
|
|
# Get the number of cache requests before starting the test
|
|
num_entries_pre = get_num_cache_requests()
|
|
self.run_test_case('QueryTest/hdfs-caching', vector)
|
|
|
|
# After running this test case we should be left with 10 cache requests.
|
|
# In this case, 1 for each table + 7 more for each cached partition + 1
|
|
# for the table with partitions on both HDFS and local file system.
|
|
assert num_entries_pre == get_num_cache_requests() - 10
|
|
|
|
self.execute_query_using_vector("drop table cachedb.cached_tbl_part", vector)
|
|
self.execute_query_using_vector("drop table cachedb.cached_tbl_nopart", vector)
|
|
self.execute_query_using_vector("drop table cachedb.cached_tbl_local", vector)
|
|
self.execute_query_using_vector("drop table cachedb.cached_tbl_ttl", vector)
|
|
|
|
# Dropping the tables should cleanup cache entries leaving us with the same
|
|
# total number of entries.
|
|
assert num_entries_pre == get_num_cache_requests()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_caching_ddl_drop_database(self, vector):
|
|
"""IMPALA-2518: DROP DATABASE CASCADE should properly drop all impacted cache
|
|
directives"""
|
|
with self.create_impala_client_from_vector(vector) as client:
|
|
self.__run_caching_ddl_drop_database(client)
|
|
|
|
def __run_caching_ddl_drop_database(self, client):
|
|
num_entries_pre = get_num_cache_requests()
|
|
# Populates the `cachedb` database with some cached tables and partitions
|
|
client.execute("use cachedb")
|
|
client.execute("create table cached_tbl_nopart (i int) cached in 'testPool'")
|
|
client.execute("insert into cached_tbl_nopart select 1")
|
|
client.execute("create table cached_tbl_part (i int) partitioned by (j int) "
|
|
"cached in 'testPool'")
|
|
client.execute("insert into cached_tbl_part (i,j) select 1, 2")
|
|
# We expect the number of cached entities to grow
|
|
assert num_entries_pre < get_num_cache_requests()
|
|
client.execute("use default")
|
|
client.execute("drop database cachedb cascade")
|
|
# We want to see the number of cached entities return to the original count
|
|
assert num_entries_pre == get_num_cache_requests()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_cache_reload_validation(self, vector):
|
|
"""This is a set of tests asserting that cache directives modified
|
|
outside of Impala are picked up after reload, cf IMPALA-1645"""
|
|
|
|
num_entries_pre = get_num_cache_requests()
|
|
create_table = ("create table cachedb.cached_tbl_reload "
|
|
"(id int) cached in 'testPool' with replication = 8")
|
|
self.execute_query_using_vector(create_table, vector)
|
|
|
|
# Access the table once to load the metadata
|
|
self.execute_query_using_vector(
|
|
"select count(*) from cachedb.cached_tbl_reload", vector)
|
|
|
|
create_table = ("create table cachedb.cached_tbl_reload_part (i int) "
|
|
"partitioned by (j int) cached in 'testPool' with replication = 8")
|
|
self.execute_query_using_vector(create_table, vector)
|
|
|
|
# Add two partitions
|
|
self.execute_query_using_vector(
|
|
"alter table cachedb.cached_tbl_reload_part add partition (j=1)", vector)
|
|
self.execute_query_using_vector(
|
|
"alter table cachedb.cached_tbl_reload_part add partition (j=2)", vector)
|
|
|
|
assert num_entries_pre + 4 == get_num_cache_requests(), \
|
|
"Adding the tables should be reflected by the number of cache directives."
|
|
|
|
# Modify the cache directive outside of Impala and reload the table to verify
|
|
# that changes are visible
|
|
drop_cache_directives_for_path("/test-warehouse/cachedb.db/cached_tbl_reload")
|
|
drop_cache_directives_for_path("/test-warehouse/cachedb.db/cached_tbl_reload_part")
|
|
drop_cache_directives_for_path(
|
|
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=1")
|
|
change_cache_directive_repl_for_path(
|
|
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2", 3)
|
|
|
|
# Create a bogus cached table abusing an existing cache directive ID, IMPALA-1750
|
|
dirid = get_cache_directive_for_path(
|
|
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
|
|
self.execute_query_using_vector(
|
|
("create table cachedb.no_replication_factor (id int) "
|
|
"tblproperties(\"cache_directive_id\"=\"{}\")").format(dirid), vector)
|
|
self.run_test_case('QueryTest/hdfs-caching-validation', vector)
|
|
# Temp fix for IMPALA-2510. Due to IMPALA-2518, when the test database is dropped,
|
|
# the cache directives are not removed for table 'cached_tbl_reload_part'.
|
|
drop_cache_directives_for_path(
|
|
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_external_drop(self):
|
|
"""IMPALA-3040: Tests that dropping a partition in Hive leads to the removal of the
|
|
cache directive after a refresh statement in Impala."""
|
|
num_entries_pre = get_num_cache_requests()
|
|
self.client.execute("use cachedb")
|
|
self.client.execute("create table test_external_drop_tbl (i int) partitioned by "
|
|
"(j int) cached in 'testPool'")
|
|
self.client.execute("insert into test_external_drop_tbl (i,j) select 1, 2")
|
|
# 1 directive for the table and 1 directive for the partition.
|
|
assert num_entries_pre + 2 == get_num_cache_requests()
|
|
self.hive_client.drop_partition("cachedb", "test_external_drop_tbl", ["2"], True)
|
|
self.client.execute("refresh test_external_drop_tbl")
|
|
# The directive on the partition is removed.
|
|
assert num_entries_pre + 1 == get_num_cache_requests()
|
|
self.client.execute("drop table test_external_drop_tbl")
|
|
# We want to see the number of cached entities return to the original count.
|
|
assert num_entries_pre == get_num_cache_requests()
|
|
|
|
|
|
def drop_cache_directives_for_path(path):
|
|
"""Drop the cache directive for a given path"""
|
|
rc, stdout, stderr = exec_process("hdfs cacheadmin -removeDirectives -path %s" % path)
|
|
assert rc == 0, \
|
|
"Error removing cache directive for path %s (%s, %s)" % (path, stdout, stderr)
|
|
|
|
|
|
def is_path_fully_cached(path):
|
|
"""Returns true if all the bytes of the path are cached, false otherwise"""
|
|
rc, stdout, stderr = exec_process(
|
|
"hdfs cacheadmin -listDirectives -stats -path %s" % path)
|
|
assert rc == 0
|
|
caching_stats = stdout.strip("\n").split("\n")[-1].split()
|
|
# Compare BYTES_NEEDED and BYTES_CACHED, the output format is as follows
|
|
# "ID POOL REPL EXPIRY PATH BYTES_NEEDED BYTES_CACHED FILES_NEEDED FILES_CACHED"
|
|
return len(caching_stats) > 0 and caching_stats[5] == caching_stats[6]
|
|
|
|
|
|
def get_cache_directive_for_path(path):
|
|
rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -path %s" % path)
|
|
assert rc == 0
|
|
dirid = re.search(r'^\s+?(\d+)\s+?testPool\s+?.*?$', stdout, re.MULTILINE).group(1)
|
|
return dirid
|
|
|
|
|
|
def change_cache_directive_repl_for_path(path, repl):
|
|
"""Drop the cache directive for a given path"""
|
|
dirid = get_cache_directive_for_path(path)
|
|
rc, stdout, stderr = exec_process(
|
|
"hdfs cacheadmin -modifyDirective -id %s -replication %s" % (dirid, repl))
|
|
assert rc == 0, \
|
|
"Error modifying cache directive for path %s (%s, %s)" % (path, stdout, stderr)
|
|
|
|
|
|
def get_num_cache_requests():
|
|
"""Returns the number of outstanding cache requests. Due to race conditions in the
|
|
way cache requests are added/dropped/reported (see IMPALA-3040), this function tries
|
|
to return a stable result by making several attempts to stabilize it within a
|
|
reasonable timeout."""
|
|
def get_num_cache_requests_util():
|
|
rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -stats")
|
|
assert rc == 0, 'Error executing hdfs cacheadmin: %s %s' % (stdout, stderr)
|
|
# remove blank new lines from output count
|
|
lines = [line for line in stdout.split('\n') if line.strip()]
|
|
count = None
|
|
for line in lines:
|
|
if line.startswith("Found "):
|
|
# the line should say "Found <int> entries"
|
|
# if we find this line we parse the number of entries
|
|
# from this line.
|
|
count = int(re.search(r'\d+', line).group())
|
|
break
|
|
# if count is available we return it else we just
|
|
# return the total number of lines
|
|
if count is not None:
|
|
return count
|
|
else:
|
|
return len(stdout.split('\n'))
|
|
|
|
# IMPALA-3040: This can take time, especially under slow builds like ASAN.
|
|
wait_time_in_sec = build_flavor_timeout(5, slow_build_timeout=20)
|
|
num_stabilization_attempts = 0
|
|
max_num_stabilization_attempts = 10
|
|
num_requests = None
|
|
LOG.info("{0} Entered get_num_cache_requests()".format(time.time()))
|
|
while num_stabilization_attempts < max_num_stabilization_attempts:
|
|
new_requests = get_num_cache_requests_util()
|
|
if new_requests == num_requests: break
|
|
LOG.info("{0} Waiting to stabilise: num_requests={1} new_requests={2}".format(
|
|
time.time(), num_requests, new_requests))
|
|
num_requests = new_requests
|
|
num_stabilization_attempts = num_stabilization_attempts + 1
|
|
time.sleep(wait_time_in_sec)
|
|
LOG.info("{0} Final num requests: {1}".format(time.time(), num_requests))
|
|
return num_requests
|