mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Largely, the changes involve conditionalizing some invocations to
account for differences between RH and Ubuntu. The trickiest bits were
timezone-related test errors (see below), postgresql permissions (need
to accept md5 passwords from localhost) and default ulimits (1024 user
processes/threads is not enough).
To test this, I built using test-with-docker. In additional to the
ulimit issue, I ran into the fact that /tmp needed 1777 permissions for
the postgresql socket, and entrypoint.sh had a few places that needed
special cases. At the moment, the data load ran fine, as did most of the
tests. I observed a test that relied on a python2.7-ism fail, which is
part of the point of this.
In the course of development, I encountered a handful of tests fail with
"Encounter parse error: failed to open /usr/share/zoneinfo/GMT-08:00 -
No such file or directory.", which was reproduced as follows:
[localhost:21000] default> use functional_orc_def; select * from alltypes;
...
WARNINGS: Encounter parse error: failed to open /usr/share/zoneinfo/GMT-08:00 - No such file or directory.
With Quanlong's help, I learned what was happening. test-with-docker was
translating my time zone (America/Los_Angeles) to US/Pacific-New,
because realpath(/etc/localtime) = US/Pacific-New. This timezone exists
in centos:6, so that wasn't a problem. However, this timezone does not
exist in the package "tzdata-java", which is the copy of the timezone
information used by Java. (There are bugs here that may have been fixed
in centos:7.) As a result, when ORC asks (by using
TimeZone.getDefault().getID()) the JDK
(src/solaris/native/java/util/TimeZone_md.c) for the default timezone,
it can't find the same name as /etc/localtime points to in its
repository and defaults to "GMT-08:00". This string then gets written
into the ORC files generated by Hive as part of data load, and then the
C++ library can't read them. This is fixed by changing "realpath"
to "readlink" in test-with-docker.py.
centos:7 is not addressed by this change. The move to systemd makes
"service sshd start" (and the same for postgresql) not work, and
additional care needs to be done to work around that.
This change is a joint effort with Laszlo Gaal.
Change-Id: Id54294d7607f51de87a9de373dcfc4a33f4bedf5
Reviewed-on: http://gerrit.cloudera.org:8080/11731
Reviewed-by: Philip Zeyliger <philip@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
815 lines
30 KiB
Python
Executable File
815 lines
30 KiB
Python
Executable File
#!/usr/bin/python
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
# We do not use Impala's python environment here, nor do we depend on
|
|
# non-standard python libraries to avoid needing extra build steps before
|
|
# triggering this.
|
|
import argparse
|
|
import datetime
|
|
import itertools
|
|
import logging
|
|
import multiprocessing
|
|
import multiprocessing.pool
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
CLI_HELP = """\
|
|
Runs tests inside of docker containers, parallelizing different types of
|
|
tests. This script first creates a docker container, checks out this repo
|
|
into it, bootstraps the container with Impala dependencies, and builds Impala
|
|
and its test data. Then, it saves the resulting container, and launches new
|
|
containers to run tests in parallel. An HTML, visual timeline is generated
|
|
as part of the build, in logs/docker/*/timeline.html.
|
|
"""
|
|
|
|
# To execute run:
|
|
# docker/test-with-docker.py
|
|
# After waiting for that to finish, inspect results in logs/docker.
|
|
#
|
|
# Visually, the timeline looks as follows, produced on a 32-core, 100GB RAM
|
|
# machine:
|
|
# ....... 1h05m Checkout, setup machine, build (8m with ccache),
|
|
# generate testdata (52m); missing ccache
|
|
# adds about 7 minutes (very sensitive to number
|
|
# of available cores)
|
|
# ... 11m Commit the Docker container
|
|
# . 10m FE tests
|
|
# . 10m JDBC tests
|
|
# .... 45m serial EE tests
|
|
# ...... 1h02m cluster tests
|
|
# ... 31m BE (C++) tests
|
|
# .... 36m parallel EE tests
|
|
# Total time: 2h25m.
|
|
#
|
|
# CPU usage is sustained high for the parallel EE tests and for
|
|
# the C++ compile (when it's not ccache'd), but is otherwise low.
|
|
# Because every parallel track consumes memory (a cluster),
|
|
# increasing parallelism and memory must be balanced.
|
|
#
|
|
# Memory usage is thorny. The minicluster memory can
|
|
# be tweaked somewhat by how much memory to give to the JVM
|
|
# and what to set --mem_limit too. Furthermore, parallel
|
|
# cluster tests use more memory when more parallelism happens.
|
|
#
|
|
# The code that runs inside of the containers is in entrypoint.sh,
|
|
# whereas the code that invokes docker is here.
|
|
#
|
|
# We avoid using Dockerfile and "docker build": they make it hard or impossible
|
|
# to cross-mount host directories into containers or use --privileged, and using
|
|
# them would require generating them dynamically. They're more trouble than
|
|
# they're worth for this use case.
|
|
#
|
|
# In practice, the containers are about 100GB (with 45GB
|
|
# being test data and ~40GB being the tests).
|
|
#
|
|
# Requirements:
|
|
# * Docker
|
|
# This has been tested on Ubuntu16.04 with Docker
|
|
# from the Ubuntu repos, i.e., Docker 1.13.1.
|
|
# * About 150 GB of disk space available to Docker.
|
|
# * 75GB of RAM.
|
|
#
|
|
# This script tries to clean up images and containers created by this process, though
|
|
# this can be disabled for debugging.
|
|
#
|
|
# To clean up containers and images manually, you can use:
|
|
# for x in $(docker ps -aq --filter label=pwd=$IMPALA_HOME); do
|
|
# docker stop $x; docker rm $x; done
|
|
# for x in $(docker images -q --filter label=pwd=$IMPALA_HOME); do docker rmi $x; done
|
|
#
|
|
# Core dumps:
|
|
# On an Ubuntu host, core dumps and Docker don't mix by default, because apport is not
|
|
# running inside of the container. See https://github.com/moby/moby/issues/11740
|
|
# To enable core dumps, run the following command on the host:
|
|
# $echo 'core.%e.%p' | sudo tee /proc/sys/kernel/core_pattern
|
|
#
|
|
# TODOs:
|
|
# - Support for executing other flavors, like exhaustive, or file systems,
|
|
# like S3.
|
|
#
|
|
# Suggested speed improvement TODOs:
|
|
# - Speed up testdata generation
|
|
# - Skip generating test data for variants not being run
|
|
# - Make container image smaller
|
|
# - Analyze .xml junit files to find slow tests; eradicate
|
|
# or move to different suite.
|
|
# - Run BE tests earlier (during data load)
|
|
|
|
if __name__ == '__main__' and __package__ is None:
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import monitor
|
|
|
|
base = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
|
|
default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
|
|
_compute_defaults()
|
|
parser = argparse.ArgumentParser(
|
|
description=CLI_HELP, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
group = parser.add_mutually_exclusive_group()
|
|
group.add_argument('--cleanup-containers', dest="cleanup_containers",
|
|
action='store_true', default=True,
|
|
help='Removes containers when finished.')
|
|
group.add_argument('--no-cleanup-containers',
|
|
dest="cleanup_containers", action='store_false')
|
|
group = parser.add_mutually_exclusive_group()
|
|
parser.add_argument(
|
|
'--parallel-test-concurrency', type=int,
|
|
default=default_parallel_test_concurrency,
|
|
help='For the ee-test-parallel suite, how many tests to run concurrently.')
|
|
parser.add_argument(
|
|
'--suite-concurrency', type=int, default=default_suite_concurrency,
|
|
help='Number of concurrent suites to run in parallel.')
|
|
parser.add_argument(
|
|
'--impalad-mem-limit-bytes', type=int, default=default_memlimit_gb,
|
|
help='Memlimit to pass to impalad for miniclusters.')
|
|
group.add_argument(
|
|
'--cleanup-image', dest="cleanup_image",
|
|
action='store_true', default=True,
|
|
help="Whether to remove image when done.")
|
|
group.add_argument('--no-cleanup-image', dest="cleanup_image", action='store_false')
|
|
parser.add_argument('--base-image', dest="base_image", default="ubuntu:16.04",
|
|
help="Base OS image to use. ubuntu:16.04 and centos:6 are known to work.")
|
|
parser.add_argument(
|
|
'--build-image', metavar='IMAGE',
|
|
help='Skip building, and run tests on pre-existing image.')
|
|
|
|
suite_group = parser.add_mutually_exclusive_group()
|
|
suite_group.add_argument(
|
|
'--suite', metavar='VARIANT', action='append',
|
|
help="""
|
|
Run specific test suites; can be specified multiple times.
|
|
Test-with-docker may shard some suites to improve parallelism.
|
|
If not specified, default tests are run.
|
|
Default: %s, All Choices: %s
|
|
""" % (",".join([ s.name for s in DEFAULT_SUITES]),
|
|
",".join([ s.name for s in ALL_SUITES ])))
|
|
suite_group.add_argument('--all-suites', action='store_true', default=False,
|
|
help="If set, run all available suites.")
|
|
parser.add_argument(
|
|
'--name', metavar='NAME',
|
|
help="Use a specific name for the test run. The name is used " +
|
|
"as a prefix for the container and image names, and " +
|
|
"as part of the log directory naming. Defaults to include a timestamp.",
|
|
default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S"))
|
|
parser.add_argument('--ccache-dir', metavar='DIR',
|
|
help="CCache directory to use",
|
|
default=os.path.expanduser("~/.ccache"))
|
|
parser.add_argument('--tail', action="store_true",
|
|
help="Run tail on all container log files.")
|
|
parser.add_argument('--env', metavar='K=V', default=[], action='append',
|
|
help="""Passes given environment variables (expressed as KEY=VALUE)
|
|
through containers.
|
|
""")
|
|
parser.add_argument('--test', action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if not args.suite:
|
|
if args.all_suites:
|
|
# Ignore "NOOP" tasks, as they are just for testing.
|
|
args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ]
|
|
else:
|
|
args.suite = [ s.name for s in DEFAULT_SUITES ]
|
|
t = TestWithDocker(
|
|
build_image=args.build_image, suite_names=args.suite,
|
|
name=args.name, cleanup_containers=args.cleanup_containers,
|
|
cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
|
|
parallel_test_concurrency=args.parallel_test_concurrency,
|
|
suite_concurrency=args.suite_concurrency,
|
|
impalad_mem_limit_bytes=args.impalad_mem_limit_bytes,
|
|
tail=args.tail,
|
|
env=args.env, base_image=args.base_image)
|
|
|
|
fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
|
|
fh.setFormatter(logging.Formatter(LOG_FORMAT))
|
|
logging.getLogger('').addHandler(fh)
|
|
|
|
logging.info("Arguments: %s", args)
|
|
|
|
ret = t.run()
|
|
t.create_timeline()
|
|
t.log_summary()
|
|
|
|
if not ret:
|
|
sys.exit(1)
|
|
|
|
|
|
def _compute_defaults():
|
|
"""Compute default config options based on memory.
|
|
|
|
The goal is to work reasonably on machines with
|
|
about 60GB of memory, like Amazon's c4.8xlarge (36 CPUs, 60GB)
|
|
or c5.9xlarge (36 CPUs, 72GB) or m4.4xlarge (16 CPUs, 64 GB).
|
|
|
|
Based on some experiments, we set up defaults for different
|
|
machine sizes based on memory, with an eye towards
|
|
having reasonable runtimes as well.
|
|
|
|
Experiments on memory usage:
|
|
|
|
suite parallelism usage
|
|
Xmx memlimit
|
|
ee-test-parallel 4GB 8 5GB 33GB
|
|
ee-test-parallel 4GB 16 7GB 37GB
|
|
ee-test-serial 4GB - 5GB 18GB
|
|
cluster-test 4GB - - 13GB
|
|
be-test 4GB - 10GB 19GB
|
|
fe-test 4GB - 10GB 9GB
|
|
"""
|
|
total_memory_gb = monitor.total_memory()
|
|
cpus = multiprocessing.cpu_count()
|
|
logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb)
|
|
|
|
parallel_test_concurrency = min(cpus, 8)
|
|
memlimit_gb = 8
|
|
|
|
if total_memory_gb >= 95:
|
|
suite_concurrency = 4
|
|
memlimit_gb = 11
|
|
parallel_test_concurrency = min(cpus, 12)
|
|
elif total_memory_gb >= 65:
|
|
suite_concurrency = 3
|
|
elif total_memory_gb >= 35:
|
|
suite_concurrency = 2
|
|
else:
|
|
logging.warning("This tool should be run on a machine with more memory.")
|
|
suite_concurrency = 1
|
|
|
|
return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024
|
|
|
|
class Suite(object):
|
|
"""Encapsulates a test suite.
|
|
|
|
A test suite is a named thing that the user can select to run,
|
|
and it runs in its own container, in parallel with other suites.
|
|
The actual running happens from entrypoint.sh and is controlled
|
|
mostly by environment variables. When complexity is easier
|
|
to handle in Python (with its richer data types), we prefer
|
|
it here.
|
|
"""
|
|
def __init__(self, name, **envs):
|
|
"""Create suite with given name and environment."""
|
|
self.name = name
|
|
self.envs = dict(
|
|
FE_TEST="false",
|
|
BE_TEST="false",
|
|
EE_TEST="false",
|
|
JDBC_TEST="false",
|
|
CLUSTER_TEST="false")
|
|
# If set, this suite is sharded past a certain suite concurrency threshold.
|
|
self.shard_at_concurrency = None
|
|
# Variable to which to append --shard_tests
|
|
self.sharding_variable = None
|
|
self.envs[name] = "true"
|
|
self.envs.update(envs)
|
|
self.timeout_minutes = 120
|
|
|
|
def copy(self, name, **envs):
|
|
"""Duplicates current suite allowing for environment updates."""
|
|
v = dict()
|
|
v.update(self.envs)
|
|
v.update(envs)
|
|
ret = Suite(name, **v)
|
|
ret.shard_at_concurrency = self.shard_at_concurrency
|
|
ret.sharding_variable = self.sharding_variable
|
|
ret.timeout_minutes = self.timeout_minutes
|
|
return ret
|
|
|
|
def exhaustive(self):
|
|
"""Returns an "exhaustive" copy of the suite."""
|
|
r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
|
|
r.timeout_minutes = 240
|
|
return r
|
|
|
|
def asan(self):
|
|
"""Returns an ASAN copy of this suite."""
|
|
r = self.copy(self.name + "_ASAN", REBUILD_ASAN="true")
|
|
r.timeout_minutes = self.timeout_minutes * 2.0 + 10
|
|
return r
|
|
|
|
def sharded(self, shards):
|
|
"""Returns a list of sharded copies of the list.
|
|
|
|
key is the name of the variable which needs to be appended with "--shard-tests=..."
|
|
"""
|
|
# RUN_TESTS_ARGS
|
|
ret = []
|
|
for i in range(1, shards + 1):
|
|
s = self.copy("%s_%d_of_%d" % (self.name, i, shards))
|
|
s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
|
|
+ " --shard_tests=%s/%s" % (i, shards)
|
|
ret.append(s)
|
|
return ret
|
|
|
|
# Definitions of all known suites:
|
|
be_test = Suite("BE_TEST")
|
|
ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
|
|
RUN_TESTS_ARGS="--skip-parallel --skip-stress")
|
|
ee_test_serial.shard_at_concurrency = 4
|
|
ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
|
|
ee_test_serial_exhaustive = ee_test_serial.exhaustive()
|
|
ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
|
|
RUN_TESTS_ARGS="--skip-serial")
|
|
ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
|
|
cluster_test = Suite("CLUSTER_TEST")
|
|
cluster_test.shard_at_concurrency = 4
|
|
cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
|
|
cluster_test_exhaustive = cluster_test.exhaustive()
|
|
|
|
# Default supported suites. These are organized slowest-to-fastest, so that,
|
|
# when parallelism is limited, the total time is least impacted.
|
|
DEFAULT_SUITES = [
|
|
ee_test_serial,
|
|
ee_test_parallel,
|
|
cluster_test,
|
|
Suite("BE_TEST"),
|
|
Suite("FE_TEST"),
|
|
Suite("JDBC_TEST")
|
|
]
|
|
|
|
OTHER_SUITES = [
|
|
ee_test_parallel_exhaustive,
|
|
ee_test_serial_exhaustive,
|
|
cluster_test_exhaustive,
|
|
|
|
# ASAN
|
|
be_test.asan(),
|
|
cluster_test.asan(),
|
|
ee_test_parallel.asan(),
|
|
ee_test_serial.asan(),
|
|
|
|
Suite("RAT_CHECK"),
|
|
# These are used for testing this script
|
|
Suite("NOOP"),
|
|
Suite("NOOP_FAIL"),
|
|
Suite("NOOP_SLEEP_FOREVER")
|
|
]
|
|
ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES
|
|
|
|
def _call(args, check=True):
|
|
"""Wrapper for calling a subprocess.
|
|
|
|
args is the first argument of subprocess.Popen, typically
|
|
an array, e.g., ["echo", "hi"].
|
|
|
|
If check is set, raise an exception on failure.
|
|
"""
|
|
logging.info("Calling: %s", args)
|
|
if check:
|
|
subprocess.check_call(args, stdin=None)
|
|
else:
|
|
return subprocess.call(args, stdin=None)
|
|
|
|
|
|
def _check_output(*args, **kwargs):
|
|
"""Wrapper for subprocess.check_output, with logging."""
|
|
logging.info("Running: %s, %s; cmdline: %s.", args, kwargs, " ".join(*args))
|
|
return subprocess.check_output(*args, **kwargs)
|
|
|
|
|
|
def _make_dir_if_not_exist(*parts):
|
|
d = os.path.join(*parts)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
class Container(object):
|
|
"""Encapsulates a container, with some metadata."""
|
|
|
|
def __init__(self, id_, name, logfile, exitcode=None, running=None):
|
|
self.id = id_
|
|
self.name = name
|
|
self.logfile = logfile
|
|
self.exitcode = exitcode
|
|
self.running = running
|
|
self.start = None
|
|
self.end = None
|
|
self.removed = False
|
|
|
|
# Protects multiple calls to "docker rm <self.id>"
|
|
self.lock = threading.Lock()
|
|
|
|
# Updated by Timeline class
|
|
self.total_user_cpu = -1
|
|
self.total_system_cpu = -1
|
|
self.peak_total_rss = -1
|
|
|
|
def runtime_seconds(self):
|
|
if self.start and self.end:
|
|
return self.end - self.start
|
|
|
|
def __str__(self):
|
|
return "Container<" + \
|
|
",".join(["%s=%s" % (k, v) for k, v in self.__dict__.items()]) \
|
|
+ ">"
|
|
|
|
|
|
class TestWithDocker(object):
|
|
"""Tests Impala using Docker containers for parallelism."""
|
|
|
|
def __init__(self, build_image, suite_names, name, cleanup_containers,
|
|
cleanup_image, ccache_dir, test_mode,
|
|
suite_concurrency, parallel_test_concurrency,
|
|
impalad_mem_limit_bytes, tail, env, base_image):
|
|
self.build_image = build_image
|
|
self.name = name
|
|
self.containers = []
|
|
self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()
|
|
# Protects multiple concurrent calls to "docker create"
|
|
self.docker_lock = threading.Lock()
|
|
|
|
# If using worktrees, we need to find $GIT_COMMON_DIR; rev-parse
|
|
# supports finding it as of vesion 2.5.0; for older versions, we
|
|
# use $GIT_DIR.
|
|
git_common_dir = _check_output(["git", "rev-parse", "--git-common-dir"]).strip()
|
|
if git_common_dir == "--git-common-dir":
|
|
git_common_dir = _check_output(["git", "rev-parse", "--git-dir"]).strip()
|
|
self.git_common_dir = os.path.realpath(git_common_dir)
|
|
assert os.path.exists(self.git_common_dir)
|
|
|
|
self.git_head_rev = _check_output(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip()
|
|
assert self.git_head_rev, \
|
|
"Could not get reference to HEAD using git rev-parse --abbrev-ref HEAD."
|
|
self.cleanup_containers = cleanup_containers
|
|
self.cleanup_image = cleanup_image
|
|
self.image = None
|
|
if build_image and cleanup_image:
|
|
# Refuse to clean up external image.
|
|
raise Exception("cleanup_image and build_image cannot be both specified")
|
|
self.ccache_dir = ccache_dir
|
|
self.log_dir = os.path.join(self.git_root, "logs", "docker", self.name)
|
|
self.monitoring_output_file = os.path.join(self.log_dir, "metrics.txt")
|
|
self.monitor = monitor.ContainerMonitor(self.monitoring_output_file)
|
|
self.test_mode = test_mode
|
|
self.suite_concurrency = suite_concurrency
|
|
self.parallel_test_concurrency = parallel_test_concurrency
|
|
self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
|
|
self.tail = tail
|
|
self.env = env
|
|
self.base_image = base_image
|
|
|
|
# Map suites back into objects; we ignore case for this mapping.
|
|
suites = []
|
|
suites_by_name = {}
|
|
for suite in ALL_SUITES:
|
|
suites_by_name[suite.name.lower()] = suite
|
|
for suite_name in suite_names:
|
|
suites.append(suites_by_name[suite_name.lower()])
|
|
|
|
# If we have enough concurrency, shard some suites into two halves.
|
|
suites2 = []
|
|
for suite in suites:
|
|
if suite.shard_at_concurrency is not None and \
|
|
suite_concurrency >= suite.shard_at_concurrency:
|
|
suites2.extend(suite.sharded(2))
|
|
else:
|
|
suites2.append(suite)
|
|
suites = suites2
|
|
|
|
self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]
|
|
|
|
def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
|
|
"""Returns a new container.
|
|
|
|
logdir - subdirectory to create under self.log_dir,
|
|
which will get mounted to /logs
|
|
logname - name of file in logdir that will be created
|
|
extras - extra arguments to pass to docker
|
|
entrypoint - entrypoint arguments, as a list.
|
|
"""
|
|
if extras is None:
|
|
extras = []
|
|
if self.test_mode:
|
|
extras = ["-e", "TEST_TEST_WITH_DOCKER=true"] + extras
|
|
|
|
# According to localtime(5), /etc/localtime is supposed
|
|
# to be a symlink to somewhere inside /usr/share/zoneinfo.
|
|
# Note that sometimes the symlink tree may be
|
|
# complicated, e.g.:
|
|
# /etc/localtime ->
|
|
# /usr/share/zoneinfo/America/Los_Angeles -> (readlink)
|
|
# ../US/Pacific-New (realpath)
|
|
# Using both readlink and realpath should work, but we've
|
|
# encountered one scenario (centos:6) where the Java tzdata
|
|
# database doesn't have US/Pacific-New, but has America/Los_Angeles.
|
|
# This is deemed sufficient to tip the scales to using readlink.
|
|
assert os.path.islink("/etc/localtime")
|
|
localtime_link_target = os.readlink("/etc/localtime")
|
|
assert localtime_link_target.startswith("/usr/share/zoneinfo")
|
|
|
|
# Workaround for what appears to be https://github.com/moby/moby/issues/13885
|
|
# Namely, if we run too many "docker create" at the same time, one
|
|
# of them hangs forever. To avoid the issue, we serialize the invocations
|
|
# of "docker create".
|
|
with self.docker_lock:
|
|
container_id = _check_output([
|
|
"docker", "create",
|
|
# Required for some of the ntp handling in bootstrap and Kudu;
|
|
# requirement may be lifted in newer Docker versions.
|
|
"--privileged",
|
|
"--name", name,
|
|
# Whereas the container names vary across containers, we use the same
|
|
# hostname repeatedly, so that the build container and the test
|
|
# containers have the same hostnames. Kudu errors out with "Remote
|
|
# error: Service unavailable: Timed out: could not wait for desired
|
|
# snapshot timestamp to be consistent: Tablet is lagging too much to be
|
|
# able to serve snapshot scan." if reading with READ_AT_SNAPSHOT
|
|
# if the hostnames change underneath it.
|
|
"--hostname", self.name,
|
|
# Label with the git root directory for easier cleanup
|
|
"--label=pwd=" + self.git_root,
|
|
# Consistent locales
|
|
"-e", "LC_ALL=C",
|
|
"-e", "IMPALAD_MEM_LIMIT_BYTES=" +
|
|
str(self.impalad_mem_limit_bytes),
|
|
# Mount the git directory so that clones can be local.
|
|
# We use /repo to have access to certain scripts,
|
|
# and we use /git_common_dir to have local clones,
|
|
# even when "git worktree" is being used.
|
|
"-v", self.git_root + ":/repo:ro",
|
|
"-v", self.git_common_dir + ":/git_common_dir:ro",
|
|
"-e", "GIT_HEAD_REV=" + self.git_head_rev,
|
|
# Share timezone between host and container
|
|
"-e", "LOCALTIME_LINK_TARGET=" + localtime_link_target,
|
|
"-v", self.ccache_dir + ":/ccache",
|
|
"-v", _make_dir_if_not_exist(self.log_dir,
|
|
logdir) + ":/logs",
|
|
"-v", base + ":/mnt/base:ro"]
|
|
+ list(itertools.chain(*[["-e", env] for env in self.env]))
|
|
+ extras
|
|
+ [image]
|
|
+ entrypoint).strip()
|
|
ctr = Container(name=name, id_=container_id,
|
|
logfile=os.path.join(self.log_dir, logdir, logname))
|
|
logging.info("Created container %s", ctr)
|
|
return ctr
|
|
|
|
def _run_container(self, container):
|
|
"""Runs container, and returns True if the container had a successful exit value.
|
|
|
|
This blocks while the container is running. The container output is
|
|
run through annotate.py to add timestamps and saved into the container's log file.
|
|
"""
|
|
container.running = True
|
|
tailer = None
|
|
|
|
with open(container.logfile, "aw") as log_output:
|
|
if self.tail:
|
|
tailer = subprocess.Popen(
|
|
["tail", "-f", "--pid", str(os.getpid()), "-v", container.logfile])
|
|
|
|
container.start = time.time()
|
|
# Sets up a "docker start ... | annotate.py > logfile" pipeline using
|
|
# subprocess.
|
|
annotate = subprocess.Popen(
|
|
[os.path.join(self.git_root, "docker", "annotate.py")],
|
|
stdin=subprocess.PIPE,
|
|
stdout=log_output,
|
|
stderr=log_output)
|
|
|
|
logging.info("Starting container %s; logging to %s", container.name,
|
|
container.logfile)
|
|
docker = subprocess.Popen(["docker", "start", "--attach", container.id],
|
|
stdin=None, stdout=annotate.stdin, stderr=annotate.stdin)
|
|
|
|
ret = docker.wait()
|
|
annotate.stdin.close()
|
|
annotate.wait()
|
|
|
|
logging.info("Container %s returned %s", container, ret)
|
|
container.exitcode = ret
|
|
container.running = False
|
|
container.end = time.time()
|
|
if tailer:
|
|
tailer.kill()
|
|
return ret == 0
|
|
|
|
@staticmethod
|
|
def _stop_container(container):
|
|
"""Stops container. Ignores errors (e.g., if it's already exited)."""
|
|
if container.running:
|
|
_call(["docker", "stop", container.id], check=False)
|
|
container.end = time.time()
|
|
container.running = False
|
|
|
|
@staticmethod
|
|
def _rm_container(container):
|
|
"""Removes container."""
|
|
# We can have multiple threads trying to call "docker rm" on a container.
|
|
# Docker will fail one of those with "already running", but we actually
|
|
# want to block until it's removed. Using a lock to serialize the "docker # rm"
|
|
# calls handles that.
|
|
with container.lock:
|
|
if not container.removed:
|
|
_call(["docker", "rm", container.id], check=False)
|
|
container.removed = True
|
|
|
|
def _create_build_image(self):
|
|
"""Creates the "build image", with Impala compiled and data loaded."""
|
|
container = self._create_container(
|
|
image=self.base_image, name=self.name + "-build",
|
|
logdir="build",
|
|
logname="log-build.txt",
|
|
# entrypoint.sh will create a user with our uid; this
|
|
# allows the shared file systems to work seamlessly
|
|
entrypoint=["/mnt/base/entrypoint.sh", "build", str(os.getuid())])
|
|
self.containers.append(container)
|
|
self.monitor.add(container)
|
|
try:
|
|
logging.info("Docker container for build: %s", container)
|
|
_check_output(["docker", "start", container.id])
|
|
if not self._run_container(container):
|
|
raise Exception("Build container failed.")
|
|
logging.info("Committing docker container.")
|
|
self.image = _check_output(
|
|
["docker", "commit",
|
|
"-c", "LABEL pwd=" + self.git_root,
|
|
container.id, "impala:built-" + self.name]).strip()
|
|
logging.info("Committed docker image: %s", self.image)
|
|
finally:
|
|
if self.cleanup_containers:
|
|
self._stop_container(container)
|
|
self._rm_container(container)
|
|
|
|
def _run_tests(self):
|
|
pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
|
|
outstanding_suites = []
|
|
for suite in self.suite_runners:
|
|
suite.task = pool.apply_async(suite.run)
|
|
outstanding_suites.append(suite)
|
|
|
|
ret = True
|
|
try:
|
|
while len(outstanding_suites) > 0:
|
|
for suite in list(outstanding_suites):
|
|
if suite.timed_out():
|
|
msg = "Task %s not finished within timeout %s" % (suite.name,
|
|
suite.suite.timeout_minutes,)
|
|
logging.error(msg)
|
|
raise Exception(msg)
|
|
task = suite.task
|
|
if task.ready():
|
|
this_task_ret = task.get()
|
|
outstanding_suites.remove(suite)
|
|
if this_task_ret:
|
|
logging.info("Suite %s succeeded.", suite.name)
|
|
else:
|
|
logging.info("Suite %s failed.", suite.name)
|
|
ret = False
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n")
|
|
raise
|
|
finally:
|
|
pool.terminate()
|
|
return ret
|
|
|
|
def run(self):
|
|
# Create logs directories and ccache dir.
|
|
_make_dir_if_not_exist(self.ccache_dir)
|
|
_make_dir_if_not_exist(self.log_dir)
|
|
|
|
self.monitor.start()
|
|
try:
|
|
if not self.build_image:
|
|
self._create_build_image()
|
|
else:
|
|
self.image = self.build_image
|
|
ret = self._run_tests()
|
|
return ret
|
|
finally:
|
|
self.monitor.stop()
|
|
if self.cleanup_containers:
|
|
for c in self.containers:
|
|
self._stop_container(c)
|
|
self._rm_container(c)
|
|
if self.cleanup_image and self.image:
|
|
_call(["docker", "rmi", self.image], check=False)
|
|
logging.info("Memory usage: %s GB min, %s GB max",
|
|
self.monitor.min_memory_usage_gb,
|
|
self.monitor.max_memory_usage_gb)
|
|
|
|
# Strings (really, regular expressions) pulled out into to the visual timeline.
|
|
_INTERESTING_STRINGS = [
|
|
">>> ",
|
|
]
|
|
_INTERESTING_RE = re.compile("|".join("(%s)" % (s,) for s in _INTERESTING_STRINGS))
|
|
|
|
def create_timeline(self):
|
|
"""Creates timeline into log directory."""
|
|
timeline = monitor.Timeline(
|
|
monitor_file=self.monitoring_output_file,
|
|
containers=self.containers,
|
|
interesting_re=self._INTERESTING_RE,
|
|
buildname=self.name)
|
|
timeline.create(os.path.join(self.log_dir, "timeline.html"))
|
|
|
|
def log_summary(self):
|
|
logging.info("Containers:")
|
|
def to_success_string(exitcode):
|
|
if exitcode == 0:
|
|
return "SUCCESS"
|
|
return "FAILURE"
|
|
|
|
for c in self.containers:
|
|
logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
|
|
"%0.1fx parallelism, %0.1f GB peak RSS",
|
|
to_success_string(c.exitcode), c.name, c.logfile,
|
|
c.runtime_seconds() / 60.0,
|
|
c.total_user_cpu / 60.0,
|
|
c.total_system_cpu / 60.0,
|
|
(c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
|
|
c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)
|
|
|
|
|
|
class TestSuiteRunner(object):
|
|
"""Runs a single test suite."""
|
|
|
|
def __init__(self, test_with_docker, suite):
|
|
self.test_with_docker = test_with_docker
|
|
self.suite = suite
|
|
self.task = None
|
|
self.name = suite.name.lower()
|
|
# Set at the beginning of run and facilitates enforcing timeouts
|
|
# for individual suites.
|
|
self.deadline = None
|
|
|
|
def timed_out(self):
|
|
return self.deadline is not None and time.time() > self.deadline
|
|
|
|
def run(self):
|
|
"""Runs given test. Returns true on success, based on exit code."""
|
|
self.deadline = time.time() + self.suite.timeout_minutes * 60
|
|
test_with_docker = self.test_with_docker
|
|
suite = self.suite
|
|
envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
|
|
for k, v in sorted(suite.envs.iteritems()):
|
|
envs.append("-e")
|
|
envs.append("%s=%s" % (k, v))
|
|
|
|
self.start = time.time()
|
|
|
|
# io-file-mgr-test expects a real-ish file system at /tmp;
|
|
# we mount a temporary directory into the container to appease it.
|
|
tmpdir = tempfile.mkdtemp(prefix=test_with_docker.name + "-" + self.name)
|
|
os.chmod(tmpdir, 01777)
|
|
# Container names are sometimes used as hostnames, and DNS names shouldn't
|
|
# have underscores.
|
|
container_name = test_with_docker.name + "-" + self.name.replace("_", "-")
|
|
|
|
container = test_with_docker._create_container(
|
|
image=test_with_docker.image,
|
|
name=container_name,
|
|
extras=[
|
|
"-v", tmpdir + ":/tmp",
|
|
"-u", str(os.getuid())
|
|
] + envs,
|
|
logdir=self.name,
|
|
logname="log-test-" + self.suite.name + ".txt",
|
|
entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name])
|
|
|
|
test_with_docker.containers.append(container)
|
|
test_with_docker.monitor.add(container)
|
|
try:
|
|
return test_with_docker._run_container(container)
|
|
except:
|
|
return False
|
|
finally:
|
|
logging.info("Cleaning up containers for %s" % (suite.name,))
|
|
test_with_docker._stop_container(container)
|
|
if test_with_docker.cleanup_containers:
|
|
test_with_docker._rm_container(container)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|