mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
This change adds get_workload() to ImpalaTestSuite and removes it from all test suites that already returned 'functional-query'. get_workload() is also removed from CustomClusterTestSuite which used to return 'tpch'. All other changes besides impala_test_suite.py and custom_cluster_test_suite.py are just mass removals of get_workload() functions. The behavior is only changed in custom cluster tests that didn't override get_workload(). By returning 'functional-query' instead of 'tpch', exploration_strategy() will no longer return 'core' in 'exhaustive' test runs. See IMPALA-3947 on why workload affected exploration_strategy. An example for affected test is TestCatalogHMSFailures which was skipped both in core and exhaustive runs before this change. get_workload() functions that return a different workload than 'functional-query' are not changed - it is possible that some of these also don't handle exploration_strategy() as expected, but individually checking these tests is out of scope in this patch. Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115 Reviewed-on: http://gerrit.cloudera.org:8080/22726 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
571 lines
24 KiB
Python
571 lines
24 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import glob
|
|
import os
|
|
import psutil
|
|
import pytest
|
|
import shutil
|
|
import time
|
|
from resource import setrlimit, RLIMIT_CORE, RLIM_INFINITY
|
|
from signal import SIGSEGV, SIGKILL, SIGUSR1, SIGTERM
|
|
from subprocess import CalledProcessError
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
|
|
DAEMONS = ['impalad', 'statestored', 'catalogd']
|
|
DAEMON_ARGS = ['impalad_args', 'state_store_args', 'catalogd_args']
|
|
|
|
|
|
class TestBreakpadBase(CustomClusterTestSuite):
|
|
"""Base class with utility methods for all breakpad tests."""
|
|
def setup_method(self, method):
|
|
# Override parent
|
|
# The temporary directory gets removed in teardown_method() after each test.
|
|
self.tmp_dir = self.make_tmp_dir('breakpad')
|
|
|
|
def teardown_method(self, method):
|
|
# Override parent
|
|
# Stop the cluster to prevent future accesses to self.tmp_dir.
|
|
self.kill_cluster(SIGKILL)
|
|
assert len(self.TMP_DIRS) > 0
|
|
self.clear_tmp_dirs()
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestBreakpadBase, cls).setup_class()
|
|
# Disable core dumps for this test
|
|
setrlimit(RLIMIT_CORE, (0, RLIM_INFINITY))
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
# Re-enable core dumps
|
|
setrlimit(RLIMIT_CORE, (RLIM_INFINITY, RLIM_INFINITY))
|
|
|
|
def start_cluster_with_args(self, **kwargs):
|
|
cluster_options = []
|
|
for daemon_arg in DAEMON_ARGS:
|
|
daemon_options = " ".join("-{0}={1}".format(k, v) for k, v in kwargs.items())
|
|
cluster_options.append("--{0}={1}".format(daemon_arg, daemon_options))
|
|
self._start_impala_cluster(cluster_options)
|
|
|
|
def start_cluster(self):
|
|
self.start_cluster_with_args(minidump_path=self.tmp_dir)
|
|
|
|
def kill_cluster(self, signal):
|
|
self.cluster.refresh()
|
|
processes = self.cluster.impalads + [self.cluster.catalogd, self.cluster.statestored]
|
|
processes = [_f for _f in processes if _f]
|
|
self.kill_processes(processes, signal)
|
|
signal is SIGUSR1 or self.assert_all_processes_killed()
|
|
|
|
def kill_processes(self, processes, signal):
|
|
for process in processes:
|
|
process.kill(signal)
|
|
signal is SIGUSR1 or self.wait_for_all_processes_dead(processes)
|
|
|
|
def wait_for_all_processes_dead(self, processes, timeout=300):
|
|
for process in processes:
|
|
try:
|
|
# For every process in the list we might see the original Impala process plus a
|
|
# forked off child that is writing the minidump. We need to catch both.
|
|
for pid in process.get_pids():
|
|
print("Checking pid %s" % pid)
|
|
psutil_process = psutil.Process(pid)
|
|
psutil_process.wait(timeout)
|
|
except psutil.NoSuchProcess:
|
|
# Process has exited in the meantime
|
|
pass
|
|
except psutil.TimeoutExpired:
|
|
raise RuntimeError("Unable to kill %s (pid %d) after %d seconds." %
|
|
(psutil_process.name, psutil_process.pid, timeout))
|
|
|
|
def get_num_processes(self, daemon):
|
|
self.cluster.refresh()
|
|
if daemon == 'impalad':
|
|
return len(self.cluster.impalads)
|
|
elif daemon == 'catalogd':
|
|
return self.cluster.catalogd and 1 or 0
|
|
elif daemon == 'statestored':
|
|
return self.cluster.statestored and 1 or 0
|
|
raise RuntimeError("Unknown daemon name: %s" % daemon)
|
|
|
|
def wait_for_num_processes(self, daemon, num_expected, timeout=30):
|
|
end = time.time() + timeout
|
|
self.cluster.refresh()
|
|
num_processes = self.get_num_processes(daemon)
|
|
while num_processes != num_expected and time.time() <= end:
|
|
time.sleep(1)
|
|
num_processes = self.get_num_processes(daemon)
|
|
return num_processes
|
|
|
|
def assert_all_processes_killed(self):
|
|
self.cluster.refresh()
|
|
assert not self.cluster.impalads
|
|
assert not self.cluster.statestored
|
|
assert not self.cluster.catalogd
|
|
|
|
def count_minidumps(self, daemon, base_dir=None):
|
|
base_dir = base_dir or self.tmp_dir
|
|
path = os.path.join(base_dir, daemon)
|
|
return len(glob.glob("%s/*.dmp" % path))
|
|
|
|
def count_all_minidumps(self, base_dir=None):
|
|
return sum((self.count_minidumps(daemon, base_dir) for daemon in DAEMONS))
|
|
|
|
def assert_num_minidumps_for_all_daemons(self, cluster_size, base_dir=None):
|
|
self.assert_num_logfile_entries(1)
|
|
assert self.count_minidumps('impalad', base_dir) == cluster_size
|
|
assert self.count_minidumps('statestored', base_dir) == 1
|
|
assert self.count_minidumps('catalogd', base_dir) == 1
|
|
|
|
def assert_num_logfile_entries(self, expected_count):
|
|
self.assert_impalad_log_contains('INFO', 'Wrote minidump to ',
|
|
expected_count=expected_count)
|
|
self.assert_impalad_log_contains('ERROR', 'Wrote minidump to ',
|
|
expected_count=expected_count)
|
|
self.assert_impalad_log_contains('INFO', 'Minidump with no thread info available.',
|
|
expected_count=expected_count)
|
|
self.assert_impalad_log_contains('ERROR', 'Minidump with no thread info available.',
|
|
expected_count=expected_count)
|
|
|
|
|
|
class TestBreakpadCore(TestBreakpadBase):
|
|
"""Core tests to check that the breakpad integration into the daemons works as
|
|
expected. This includes writing minidump when the daemons call abort(). Add tests here
|
|
that depend on functionality of Impala other than the breakpad integration itself.
|
|
"""
|
|
@pytest.mark.execute_serially
|
|
def test_abort_writes_minidump(self):
|
|
"""Check that abort() (e.g. hitting a DCHECK macro) writes a minidump."""
|
|
assert self.count_all_minidumps() == 0
|
|
failed_to_start = False
|
|
try:
|
|
# Calling with an unresolvable hostname will abort.
|
|
self.start_cluster_with_args(minidump_path=self.tmp_dir,
|
|
hostname="jhzvlthd")
|
|
except CalledProcessError:
|
|
failed_to_start = True
|
|
assert failed_to_start
|
|
# Don't check for minidumps until all processes have gone away so that
|
|
# the state of the cluster is not in flux.
|
|
self.wait_for_num_processes('impalad', 0)
|
|
assert self.count_minidumps('impalad') > 0
|
|
|
|
|
|
class TestBreakpadExhaustive(TestBreakpadBase):
|
|
"""Exhaustive tests to check that the breakpad integration into the daemons works as
|
|
expected. This includes writing minidump files on unhandled signals and rotating old
|
|
minidumps on startup.
|
|
"""
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('These breakpad tests only run in exhaustive')
|
|
super(TestBreakpadExhaustive, cls).setup_class()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_minidump_creation(self):
|
|
"""Check that when a daemon crashes, it writes a minidump file."""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster()
|
|
assert self.count_all_minidumps() == 0
|
|
cluster_size = self.get_num_processes('impalad')
|
|
self.kill_cluster(SIGSEGV)
|
|
self.assert_num_minidumps_for_all_daemons(cluster_size)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_sigusr1_writes_minidump(self):
|
|
"""Check that when a daemon receives SIGUSR1, it writes a minidump file."""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster()
|
|
assert self.count_all_minidumps() == 0
|
|
cluster_size = self.get_num_processes('impalad')
|
|
self.kill_cluster(SIGUSR1)
|
|
# Breakpad forks to write its minidump files, wait for all the clones to terminate.
|
|
assert self.wait_for_num_processes('impalad', cluster_size) == cluster_size
|
|
assert self.wait_for_num_processes('catalogd', 1) == 1
|
|
assert self.wait_for_num_processes('statestored', 1) == 1
|
|
# Make sure impalad still answers queries.
|
|
client = self.create_impala_client()
|
|
self.execute_query_expect_success(client, "SELECT COUNT(*) FROM functional.alltypes")
|
|
# Kill the cluster. Sending SIGKILL will not trigger minidumps to be written.
|
|
self.kill_cluster(SIGKILL)
|
|
self.assert_num_minidumps_for_all_daemons(cluster_size)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_sigusr1_doesnt_kill(self):
|
|
"""Check that when minidumps are disabled and a daemon receives SIGUSR1, it does not
|
|
die.
|
|
"""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster_with_args(enable_minidumps=False)
|
|
cluster_size = self.get_num_processes('impalad')
|
|
self.kill_cluster(SIGUSR1)
|
|
# Check that no minidumps have been written.
|
|
self.assert_num_logfile_entries(0)
|
|
assert self.count_all_minidumps() == 0
|
|
# Check that all daemons are still alive.
|
|
assert self.get_num_processes('impalad') == cluster_size
|
|
assert self.get_num_processes('catalogd') == 1
|
|
assert self.get_num_processes('statestored') == 1
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_sigterm_no_minidumps(self):
|
|
"""Check that when a SIGTERM is caught, no minidump file is written.
|
|
After receiving SIGTERM there should be no impalad/catalogd/statestored
|
|
running.
|
|
"""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster()
|
|
cluster_size = self.get_num_processes('impalad')
|
|
assert self.count_all_minidumps() == 0
|
|
|
|
# impalad/catalogd/statestored should be running.
|
|
assert cluster_size > 0
|
|
assert self.get_num_processes('catalogd') == 1
|
|
assert self.get_num_processes('statestored') == 1
|
|
# There should be no SIGTERM message in the log
|
|
# when the system starts.
|
|
self.assert_impalad_log_contains('INFO', 'Caught signal: SIGTERM. Daemon will exit',
|
|
expected_count=0)
|
|
|
|
self.kill_cluster(SIGTERM)
|
|
|
|
# There should be no impalad/catalogd/statestored running.
|
|
# There should be no minidump generated.
|
|
assert self.get_num_processes('impalad') == 0
|
|
assert self.get_num_processes('catalogd') == 0
|
|
assert self.get_num_processes('statestored') == 0
|
|
assert self.count_all_minidumps() == 0
|
|
# There should be a SIGTERM message in the log now
|
|
# since we raised one above.
|
|
log_str = 'Caught signal: SIGTERM. Daemon will exit.'
|
|
self.assert_impalad_log_contains('INFO', log_str, expected_count=1)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_minidump_relative_path(self):
|
|
"""Check that setting 'minidump_path' to a relative value results in minidump files
|
|
written to 'log_dir'.
|
|
"""
|
|
minidump_base_dir = os.path.join(os.environ.get('LOG_DIR', '/tmp'), 'minidumps')
|
|
shutil.rmtree(minidump_base_dir, ignore_errors=True)
|
|
# Omitting minidump_path as a parameter to the cluster will choose the default
|
|
# configuration, which is a FLAGS_log_dir/minidumps.
|
|
self.start_cluster_with_args()
|
|
assert self.count_all_minidumps(minidump_base_dir) == 0
|
|
cluster_size = self.get_num_processes('impalad')
|
|
self.kill_cluster(SIGSEGV)
|
|
self.assert_num_minidumps_for_all_daemons(cluster_size, minidump_base_dir)
|
|
shutil.rmtree(minidump_base_dir)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_minidump_cleanup(self):
|
|
"""Check that a limited number of minidumps is preserved during startup."""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster()
|
|
cluster_size = self.get_num_processes('impalad')
|
|
self.kill_cluster(SIGSEGV)
|
|
self.assert_num_logfile_entries(1)
|
|
# Maximum number of minidumps that the impalads should keep for this test.
|
|
max_minidumps = 2
|
|
self.start_cluster_with_args(minidump_path=self.tmp_dir,
|
|
max_minidumps=max_minidumps,
|
|
logbufsecs=1)
|
|
# Wait for log maintenance thread to clean up minidumps asynchronously.
|
|
start = time.time()
|
|
expected_impalad_minidumps = min(cluster_size, max_minidumps)
|
|
while (self.count_minidumps('impalad') != expected_impalad_minidumps
|
|
and time.time() - start < 10):
|
|
time.sleep(0.1)
|
|
assert self.count_minidumps('impalad') == expected_impalad_minidumps
|
|
assert self.count_minidumps('statestored') == 1
|
|
assert self.count_minidumps('catalogd') == 1
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_minidump_cleanup_thread(self):
|
|
"""Check that periodic rotation preserves a limited number of minidumps."""
|
|
assert self.count_all_minidumps() == 0
|
|
# Maximum number of minidumps that the impalads should keep for this test.
|
|
max_minidumps = 2
|
|
# Sleep interval for the log rotation thread.
|
|
rotation_interval = 1
|
|
self.start_cluster_with_args(minidump_path=self.tmp_dir,
|
|
max_minidumps=max_minidumps,
|
|
logbufsecs=rotation_interval)
|
|
cluster_size = self.get_num_processes('impalad')
|
|
# We trigger several rounds of minidump creation to make sure that all daemons wrote
|
|
# enough files to trigger rotation.
|
|
for i in range(max_minidumps + 1):
|
|
self.kill_cluster(SIGUSR1)
|
|
# Breakpad forks to write its minidump files, sleep briefly to allow the forked
|
|
# processes to start.
|
|
time.sleep(1)
|
|
# Wait for all the clones to terminate.
|
|
assert self.wait_for_num_processes('impalad', cluster_size) == cluster_size
|
|
assert self.wait_for_num_processes('catalogd', 1) == 1
|
|
assert self.wait_for_num_processes('statestored', 1) == 1
|
|
self.assert_num_logfile_entries(i + 1)
|
|
# Sleep long enough for log cleaning to take effect.
|
|
time.sleep(rotation_interval + 1)
|
|
assert self.count_minidumps('impalad') == min(cluster_size, max_minidumps)
|
|
assert self.count_minidumps('statestored') == max_minidumps
|
|
assert self.count_minidumps('catalogd') == max_minidumps
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_disable_minidumps(self):
|
|
"""Check that setting enable_minidumps to false disables minidump creation."""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster_with_args(enable_minidumps=False)
|
|
self.kill_cluster(SIGSEGV)
|
|
self.assert_num_logfile_entries(0)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_empty_minidump_path_disables_breakpad(self):
|
|
"""Check that setting the minidump_path to an empty value disables minidump creation.
|
|
"""
|
|
assert self.count_all_minidumps() == 0
|
|
self.start_cluster_with_args(minidump_path='')
|
|
self.kill_cluster(SIGSEGV)
|
|
self.assert_num_logfile_entries(0)
|
|
|
|
def trigger_single_minidump_and_get_size(self):
|
|
"""Kill a single impalad with SIGSEGV to make it write a minidump. Kill the rest of
|
|
the cluster. Clean up the single minidump file and return its size.
|
|
"""
|
|
self.cluster.refresh()
|
|
assert self.get_num_processes('impalad') > 0
|
|
# Make one impalad write a minidump.
|
|
self.kill_processes(self.cluster.impalads[:1], SIGSEGV)
|
|
# Kill the rest of the cluster.
|
|
self.kill_cluster(SIGKILL)
|
|
assert self.count_minidumps('impalad') == 1
|
|
# Get file size of that miniump.
|
|
path = os.path.join(self.tmp_dir, 'impalad')
|
|
minidump_file = glob.glob("%s/*.dmp" % path)[0]
|
|
minidump_size = os.path.getsize(minidump_file)
|
|
os.remove(minidump_file)
|
|
assert self.count_all_minidumps() == 0
|
|
return minidump_size
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_limit_minidump_size(self):
|
|
"""Check that setting the 'minidump_size_limit_hint_kb' to a small value will reduce
|
|
the minidump file size.
|
|
"""
|
|
assert self.count_all_minidumps() == 0
|
|
# Generate minidump with default settings.
|
|
self.start_cluster()
|
|
full_minidump_size = self.trigger_single_minidump_and_get_size()
|
|
# Start cluster with limited minidump file size, we use a very small value, to ensure
|
|
# the resulting minidump will be as small as possible.
|
|
self.start_cluster_with_args(minidump_path=self.tmp_dir,
|
|
minidump_size_limit_hint_kb=1)
|
|
reduced_minidump_size = self.trigger_single_minidump_and_get_size()
|
|
# Check that the minidump file size has been reduced.
|
|
assert reduced_minidump_size < full_minidump_size
|
|
|
|
|
|
class TestLoggingBase(TestBreakpadBase):
|
|
_default_max_log_files = 2
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestLoggingBase, cls).setup_class()
|
|
|
|
def start_cluster_with_args(self, cluster_size, log_dir, **kwargs):
|
|
cluster_options = []
|
|
for daemon_arg in DAEMON_ARGS:
|
|
daemon_options = " ".join("-{0}={1}".format(k, v) for k, v in kwargs.items())
|
|
cluster_options.append("--{0}={1}".format(daemon_arg, daemon_options))
|
|
self._start_impala_cluster(cluster_options, cluster_size=cluster_size,
|
|
expected_num_impalads=cluster_size,
|
|
impala_log_dir=log_dir,
|
|
ignore_pid_on_log_rotation=True)
|
|
|
|
def assert_logs(self, daemon, max_count, max_bytes, match_pid=True):
|
|
"""Assert that there are at most 'max_count' of INFO + ERROR log files for the
|
|
specified daemon and the individual file size does not exceed 'max_bytes'.
|
|
Also assert that stdout/stderr are redirected to correct file on each rotation."""
|
|
log_dir = self.tmp_dir
|
|
log_paths = glob.glob("%s/%s*log.ERROR.*" % (log_dir, daemon)) \
|
|
+ glob.glob("%s/%s*log.INFO.*" % (log_dir, daemon))
|
|
assert len(log_paths) <= max_count
|
|
|
|
# group log_paths by kind and pid (if match_pid).
|
|
log_group = {}
|
|
for path in sorted(log_paths):
|
|
tok = path.split('.')
|
|
key = tok[-1] + '.' + tok[-3] if match_pid else tok[-3]
|
|
if key in log_group:
|
|
log_group[key].append(path)
|
|
else:
|
|
log_group[key] = [path]
|
|
|
|
for key, paths in log_group.items():
|
|
for i in range(0, len(paths)):
|
|
try:
|
|
curr_path = paths[i]
|
|
# check log size
|
|
log_size = os.path.getsize(curr_path)
|
|
assert log_size <= max_bytes, "{} exceed {} bytes".format(curr_path, max_bytes)
|
|
|
|
if i < len(paths) - 1:
|
|
# check that we print the next_path in last line of this log file
|
|
next_path = paths[i + 1]
|
|
with open(curr_path, 'r') as f:
|
|
lines = f.readlines()
|
|
# There have been some test failures where the next_path is not in the last
|
|
# line of the output. One theory is that some output could have gone to the
|
|
# file after the next_path has been written. This tolerates having the
|
|
# next path in the last 3 lines. It also preserves the logs if there is a
|
|
# failure.
|
|
found_next_path = False
|
|
NUM_LINES_TO_CHECK = 3
|
|
for i in range(max(NUM_LINES_TO_CHECK, len(lines))):
|
|
if next_path in lines[-i]:
|
|
found_next_path = True
|
|
break
|
|
if not found_next_path:
|
|
# These logs are in a temporary directory. To improve debuggability,
|
|
# copy the logs to a location that would be preserved.
|
|
preserved_log_dir = os.getenv("LOG_DIR", "/tmp/")
|
|
preserved_path = os.path.join(preserved_log_dir,
|
|
os.path.basename(curr_path))
|
|
shutil.copyfile(curr_path, preserved_path)
|
|
msg = "Did not find {0} in the last {1} lines of {2}.".format(
|
|
next_path, NUM_LINES_TO_CHECK, curr_path)
|
|
msg += " Preserved the log contents at {0}".format(preserved_path)
|
|
assert False, msg
|
|
except OSError:
|
|
# The daemon might delete the log in the middle of assertion.
|
|
# In that case, do nothing and move on.
|
|
pass
|
|
|
|
def silent_remove(self, filename):
|
|
try:
|
|
os.remove(filename)
|
|
except OSError:
|
|
pass
|
|
|
|
def start_excessive_cerr_cluster(self, test_cluster_size=1, remove_symlink=False,
|
|
match_pid=True, max_log_count_begin=0,
|
|
max_log_count_end=None):
|
|
"""Check that impalad log is kept being rotated when most writing activity is coming
|
|
from stderr stream.
|
|
Along with LogFaultInjectionThread in init.cc, this test will fill impalad error logs
|
|
with approximately 128kb error messages per second."""
|
|
test_logbufsecs = 3
|
|
test_max_log_files = self._default_max_log_files
|
|
test_max_log_size = 1 # 1 MB
|
|
test_error_msg = ('123456789abcde_' * 64) # 1 KB error message
|
|
test_debug_actions = 'LOG_MAINTENANCE_STDERR:FAIL@1.0@' + test_error_msg
|
|
daemon = 'impalad'
|
|
os.chmod(self.tmp_dir, 0o744)
|
|
|
|
expected_log_max_bytes = int(1.2 * 1024**2) # 1.2 MB
|
|
self.assert_logs(daemon, max_log_count_begin, expected_log_max_bytes)
|
|
self.start_cluster_with_args(test_cluster_size, self.tmp_dir,
|
|
logbufsecs=test_logbufsecs,
|
|
max_log_files=test_max_log_files,
|
|
max_log_size=test_max_log_size,
|
|
debug_actions=test_debug_actions,
|
|
log_rotation_match_pid=('1' if match_pid else '0'))
|
|
self.wait_for_num_processes(daemon, test_cluster_size, 30)
|
|
# Count both INFO and ERROR logs
|
|
if max_log_count_end is None:
|
|
max_log_count_end = test_max_log_files * test_cluster_size * 2
|
|
# Wait for log maintenance thread to flush and rotate the logs asynchronously.
|
|
duration = test_logbufsecs * 10
|
|
start = time.time()
|
|
while (time.time() - start < duration):
|
|
time.sleep(1)
|
|
self.assert_logs(daemon, max_log_count_end, expected_log_max_bytes)
|
|
if (remove_symlink):
|
|
pattern = self.tmp_dir + '/' + daemon + '*'
|
|
symlinks = glob.glob(pattern + '.INFO') + glob.glob(pattern + '.ERROR')
|
|
for symlink in symlinks:
|
|
self.silent_remove(symlink)
|
|
|
|
|
|
class TestLogging(TestLoggingBase):
|
|
"""Core tests to check that impala log is rolled periodically, obeying
|
|
max_log_size, max_log_files, and log_rotation_match_pid even in the presence of heavy
|
|
stderr writing.
|
|
"""
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestLogging, cls).setup_class()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_excessive_cerr_ignore_pid(self):
|
|
"""Test excessive cerr activity twice with restart in between and no PID matching."""
|
|
self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
|
|
match_pid=False)
|
|
self.kill_cluster(SIGTERM)
|
|
# There should be no impalad/catalogd/statestored running.
|
|
assert self.get_num_processes('impalad') == 0
|
|
assert self.get_num_processes('catalogd') == 0
|
|
assert self.get_num_processes('statestored') == 0
|
|
max_count = self._default_max_log_files * 2
|
|
self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
|
|
match_pid=False,
|
|
max_log_count_begin=max_count,
|
|
max_log_count_end=max_count)
|
|
|
|
|
|
class TestLoggingExhaustive(TestLoggingBase):
|
|
"""Exhaustive tests to check that impala log is rolled periodically, obeying
|
|
max_log_size, max_log_files, and log_rotation_match_pid even in the presence of heavy
|
|
stderr writing.
|
|
"""
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('TestLoggingExhaustive only run in exhaustive')
|
|
super(TestLoggingExhaustive, cls).setup_class()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_excessive_cerr(self):
|
|
"""Test excessive cerr activity with single node cluster."""
|
|
self.start_excessive_cerr_cluster()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_excessive_cerr_no_symlink(self):
|
|
"""Test excessive cerr activity with two node cluster and missing log symlinks."""
|
|
self.start_excessive_cerr_cluster(test_cluster_size=2, remove_symlink=False)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_excessive_cerr_match_pid(self):
|
|
"""Test excessive cerr activity twice with restart in between and PID matching."""
|
|
self.start_excessive_cerr_cluster(1, remove_symlink=False, match_pid=True)
|
|
self.kill_cluster(SIGTERM)
|
|
# There should be no impalad/catalogd/statestored running.
|
|
assert self.get_num_processes('impalad') == 0
|
|
assert self.get_num_processes('catalogd') == 0
|
|
assert self.get_num_processes('statestored') == 0
|
|
max_count_begin = self._default_max_log_files * 2
|
|
max_count_end = max_count_begin * 2
|
|
self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
|
|
match_pid=True,
|
|
max_log_count_begin=max_count_begin,
|
|
max_log_count_end=max_count_end)
|