mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Impala 4 decided to drop Sentry support in favor of Ranger. This removes Sentry support and related tests. It retires startup flags related to Sentry and does the first round of removing obsolete code. This does not adjust documentation to remove references to Sentry, and other dead code will be removed separately. Some issues came up when implementing this. Here is a summary of how this patch resolves them: 1. authorization_provider currently defaults to "sentry", but "ranger" requires extra parameters to be set. This changes the default value of authorization_provider to "", which translates internally to the noop policy that does no authorization. 2. These flags are Sentry specific and are now retired: - authorization_policy_provider_class - sentry_catalog_polling_frequency_s - sentry_config 3. The authorization_factory_class may be obsolete now that there is only one authorization policy, but this leaves it in place. 4. Sentry is the last component using CDH_COMPONENTS_HOME, so that is removed. There are still Maven dependencies coming from the CDH_BUILD_NUMBER repository, so that is not removed. 5. To make the transition easier, testdata/bin/kill-sentry-service.sh is not removed and it is still called from testdata/bin/kill-all.sh. Testing: - Core job passes Change-Id: I8e99c15936d6d250cf258e3a1dcba11d3eb4661e Reviewed-on: http://gerrit.cloudera.org:8080/15833 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
295 lines
12 KiB
Python
295 lines
12 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Superclass for all tests that need a custom cluster.
|
|
# TODO: Configure cluster size and other parameters.
|
|
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import pipes
|
|
import pytest
|
|
import subprocess
|
|
from subprocess import check_call
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.util.filesystem_utils import IS_LOCAL
|
|
from time import sleep
|
|
|
|
IMPALA_HOME = os.environ['IMPALA_HOME']
|
|
DEFAULT_CLUSTER_SIZE = 3
|
|
NUM_COORDINATORS = DEFAULT_CLUSTER_SIZE
|
|
|
|
# Additional args passed to respective daemon command line.
|
|
IMPALAD_ARGS = 'impalad_args'
|
|
STATESTORED_ARGS = 'state_store_args'
|
|
CATALOGD_ARGS = 'catalogd_args'
|
|
KUDU_ARGS = 'kudu_args'
|
|
# Additional args passed to the start-impala-cluster script.
|
|
START_ARGS = 'start_args'
|
|
HIVE_CONF_DIR = 'hive_conf_dir'
|
|
CLUSTER_SIZE = "cluster_size"
|
|
# Default query options passed to the impala daemon command line. Handled separately from
|
|
# other impala daemon arguments to allow merging multiple defaults into a single list.
|
|
DEFAULT_QUERY_OPTIONS = 'default_query_options'
|
|
IMPALA_LOG_DIR = 'impala_log_dir'
|
|
NUM_EXCLUSIVE_COORDINATORS = 'num_exclusive_coordinators'
|
|
STATESTORED_TIMEOUT_S = 'statestored_timeout_s'
|
|
IMPALAD_TIMEOUT_S = 'impalad_timeout_s'
|
|
|
|
# Run with fast topic updates by default to reduce time to first query running.
|
|
DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
|
|
--statestore_priority_update_frequency_ms=50 \
|
|
--statestore_heartbeat_frequency_ms=50'
|
|
|
|
|
|
class CustomClusterTestSuite(ImpalaTestSuite):
|
|
"""Every test in a test suite deriving from this class gets its own Impala cluster.
|
|
Custom arguments may be passed to the cluster by using the @with_args decorator."""
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(CustomClusterTestSuite, cls).add_test_dimensions()
|
|
cls.add_custom_cluster_constraints()
|
|
|
|
@classmethod
|
|
def add_custom_cluster_constraints(cls):
|
|
# Defines constraints for custom cluster tests, called by add_test_dimensions.
|
|
# By default, custom cluster tests only run on text/none and with a limited set of
|
|
# exec options. Subclasses may override this to relax these default constraints.
|
|
super(CustomClusterTestSuite, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'text' and
|
|
v.get_value('table_format').compression_codec == 'none')
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('exec_option')['batch_size'] == 0 and
|
|
v.get_value('exec_option')['disable_codegen'] == False and
|
|
v.get_value('exec_option')['num_nodes'] == 0)
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
# Explicit override of ImpalaTestSuite.setup_class(). For custom cluster, the
|
|
# ImpalaTestSuite.setup_class() procedure needs to happen on a per-method basis.
|
|
# IMPALA-3614: @SkipIfLocal.multiple_impalad workaround
|
|
# IMPALA-2943 TODO: When pytest is upgraded, see if this explicit skip can be
|
|
# removed in favor of the class-level SkipifLocal.multiple_impalad decorator.
|
|
if IS_LOCAL:
|
|
pytest.skip("multiple impalads needed")
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
# Explicit override of ImpalaTestSuite.teardown_class(). For custom cluster, the
|
|
# ImpalaTestSuite.teardown_class() procedure needs to happen on a per-method basis.
|
|
pass
|
|
|
|
@staticmethod
|
|
def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
|
|
start_args=None, default_query_options=None,
|
|
impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
|
|
num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None,
|
|
impalad_timeout_s=None):
|
|
"""Records arguments to be passed to a cluster by adding them to the decorated
|
|
method's func_dict"""
|
|
def decorate(func):
|
|
if impalad_args is not None:
|
|
func.func_dict[IMPALAD_ARGS] = impalad_args
|
|
func.func_dict[STATESTORED_ARGS] = statestored_args
|
|
if catalogd_args is not None:
|
|
func.func_dict[CATALOGD_ARGS] = catalogd_args
|
|
if start_args is not None:
|
|
func.func_dict[START_ARGS] = start_args.split()
|
|
if hive_conf_dir is not None:
|
|
func.func_dict[HIVE_CONF_DIR] = hive_conf_dir
|
|
if kudu_args is not None:
|
|
func.func_dict[KUDU_ARGS] = kudu_args
|
|
if default_query_options is not None:
|
|
func.func_dict[DEFAULT_QUERY_OPTIONS] = default_query_options
|
|
if impala_log_dir is not None:
|
|
func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
|
|
if cluster_size is not None:
|
|
func.func_dict[CLUSTER_SIZE] = cluster_size
|
|
if num_exclusive_coordinators is not None:
|
|
func.func_dict[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
|
|
if statestored_timeout_s is not None:
|
|
func.func_dict[STATESTORED_TIMEOUT_S] = statestored_timeout_s
|
|
if impalad_timeout_s is not None:
|
|
func.func_dict[IMPALAD_TIMEOUT_S] = impalad_timeout_s
|
|
return func
|
|
return decorate
|
|
|
|
def setup_method(self, method):
|
|
cluster_args = list()
|
|
for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS]:
|
|
if arg in method.func_dict:
|
|
cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
|
|
if START_ARGS in method.func_dict:
|
|
cluster_args.extend(method.func_dict[START_ARGS])
|
|
|
|
if HIVE_CONF_DIR in method.func_dict:
|
|
self._start_hive_service(method.func_dict[HIVE_CONF_DIR])
|
|
# Should let Impala adopt the same hive-site.xml. The only way is to add it in the
|
|
# beginning of the CLASSPATH. Because there's already a hive-site.xml in the
|
|
# default CLASSPATH (see bin/set-classpath.sh).
|
|
cluster_args.append(
|
|
'--env_vars=CUSTOM_CLASSPATH=%s ' % method.func_dict[HIVE_CONF_DIR])
|
|
|
|
if KUDU_ARGS in method.func_dict:
|
|
self._restart_kudu_service(method.func_dict[KUDU_ARGS])
|
|
|
|
cluster_size = DEFAULT_CLUSTER_SIZE
|
|
if CLUSTER_SIZE in method.func_dict:
|
|
cluster_size = method.func_dict[CLUSTER_SIZE]
|
|
|
|
use_exclusive_coordinators = False
|
|
num_coordinators = cluster_size
|
|
if NUM_EXCLUSIVE_COORDINATORS in method.func_dict:
|
|
num_coordinators = method.func_dict[NUM_EXCLUSIVE_COORDINATORS]
|
|
use_exclusive_coordinators = True
|
|
|
|
# Start a clean new cluster before each test
|
|
kwargs = {
|
|
"cluster_size": cluster_size,
|
|
"num_coordinators": num_coordinators,
|
|
"expected_num_executors": cluster_size,
|
|
"default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
|
|
"use_exclusive_coordinators": use_exclusive_coordinators
|
|
}
|
|
if IMPALA_LOG_DIR in method.func_dict:
|
|
kwargs["impala_log_dir"] = method.func_dict[IMPALA_LOG_DIR]
|
|
if STATESTORED_TIMEOUT_S in method.func_dict:
|
|
kwargs["statestored_timeout_s"] = method.func_dict[STATESTORED_TIMEOUT_S]
|
|
if IMPALAD_TIMEOUT_S in method.func_dict:
|
|
kwargs["impalad_timeout_s"] = method.func_dict[IMPALAD_TIMEOUT_S]
|
|
self._start_impala_cluster(cluster_args, **kwargs)
|
|
super(CustomClusterTestSuite, self).setup_class()
|
|
|
|
def teardown_method(self, method):
|
|
if HIVE_CONF_DIR in method.func_dict:
|
|
self._start_hive_service(None) # Restart Hive Service using default configs
|
|
super(CustomClusterTestSuite, self).teardown_class()
|
|
|
|
@classmethod
|
|
def _stop_impala_cluster(cls):
|
|
# TODO: Figure out a better way to handle case where processes are just starting
|
|
# / cleaning up so that sleeps are not needed.
|
|
sleep(2)
|
|
check_call([os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'), '--kill_only'])
|
|
sleep(2)
|
|
|
|
@classmethod
|
|
def _restart_kudu_service(cls, kudu_args=None):
|
|
kudu_env = dict(os.environ)
|
|
if kudu_args is not None:
|
|
kudu_env["IMPALA_KUDU_STARTUP_FLAGS"] = kudu_args
|
|
call = subprocess.Popen(
|
|
['/bin/bash', '-c', os.path.join(IMPALA_HOME,
|
|
'testdata/cluster/admin restart kudu')],
|
|
env=kudu_env)
|
|
call.wait()
|
|
if call.returncode != 0:
|
|
raise RuntimeError("Unable to restart Kudu")
|
|
|
|
@classmethod
|
|
def _start_hive_service(cls, hive_conf_dir):
|
|
hive_env = dict(os.environ)
|
|
if hive_conf_dir is not None:
|
|
hive_env['HIVE_CONF_DIR'] = hive_conf_dir
|
|
call = subprocess.Popen(
|
|
['/bin/bash', '-c', os.path.join(IMPALA_HOME, 'testdata/bin/run-hive-server.sh')],
|
|
env=hive_env)
|
|
call.wait()
|
|
if call.returncode != 0:
|
|
raise RuntimeError("Unable to start Hive")
|
|
|
|
@classmethod
|
|
def _stop_hive_service(cls):
|
|
subprocess.check_call([os.path.join(IMPALA_HOME,
|
|
"testdata/bin/kill-hive-server.sh")],
|
|
close_fds=True)
|
|
|
|
@classmethod
|
|
def _start_impala_cluster(cls,
|
|
options,
|
|
impala_log_dir=os.getenv('LOG_DIR', "/tmp/"),
|
|
cluster_size=DEFAULT_CLUSTER_SIZE,
|
|
num_coordinators=NUM_COORDINATORS,
|
|
use_exclusive_coordinators=False,
|
|
add_executors=False,
|
|
log_level=1,
|
|
expected_num_executors=DEFAULT_CLUSTER_SIZE,
|
|
expected_subscribers=0,
|
|
default_query_options=None,
|
|
statestored_timeout_s=60,
|
|
impalad_timeout_s=60):
|
|
cls.impala_log_dir = impala_log_dir
|
|
# We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that
|
|
# certain custom startup arguments work and we want to keep them independent of dev
|
|
# environments.
|
|
cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
|
|
'--state_store_args=%s' % DEFAULT_STATESTORE_ARGS,
|
|
'--cluster_size=%d' % cluster_size,
|
|
'--num_coordinators=%d' % num_coordinators,
|
|
'--log_dir=%s' % impala_log_dir,
|
|
'--log_level=%s' % log_level]
|
|
|
|
if use_exclusive_coordinators:
|
|
cmd.append("--use_exclusive_coordinators")
|
|
|
|
if add_executors:
|
|
cmd.append("--add_executors")
|
|
|
|
if pytest.config.option.use_local_catalog:
|
|
cmd.append("--impalad_args=--use_local_catalog=1")
|
|
cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
|
|
|
|
default_query_option_kvs = []
|
|
# Put any defaults first, then any arguments after that so they can override defaults.
|
|
if os.environ.get("ERASURE_CODING") == "true":
|
|
default_query_option_kvs.append(("allow_erasure_coded_files", "true"))
|
|
if default_query_options is not None:
|
|
default_query_option_kvs.extend(default_query_options)
|
|
# Add the default query options after any arguments. This will override any default
|
|
# options set in --impalad_args by design to force tests to pass default_query_options
|
|
# into this function directly.
|
|
options.append("--impalad_args=--default_query_options={0}".format(
|
|
','.join(["{0}={1}".format(k, v) for k, v in default_query_option_kvs])))
|
|
|
|
logging.info("Starting cluster with command: %s" %
|
|
" ".join(pipes.quote(arg) for arg in cmd + options))
|
|
try:
|
|
check_call(cmd + options, close_fds=True)
|
|
finally:
|
|
# Failure tests expect cluster to be initialised even if start-impala-cluster fails.
|
|
cls.cluster = ImpalaCluster.get_e2e_test_cluster()
|
|
statestored = cls.cluster.statestored
|
|
if statestored is None:
|
|
raise Exception("statestored was not found")
|
|
|
|
# The number of statestore subscribers is
|
|
# cluster_size (# of impalad) + 1 (for catalogd).
|
|
if expected_subscribers == 0:
|
|
expected_subscribers = expected_num_executors + 1
|
|
|
|
statestored.service.wait_for_live_subscribers(expected_subscribers,
|
|
timeout=statestored_timeout_s)
|
|
for impalad in cls.cluster.impalads:
|
|
impalad.service.wait_for_num_known_live_backends(expected_num_executors,
|
|
timeout=impalad_timeout_s)
|