mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
There are 3 builtin case conversion string functions: upper(), lower(),
and initcap(). Previously they only convert English alphabetic
characters. This patch adds support to deal with Unicode characters.
There are many corner cases in case conversion depending on the locale
and context. E.g.
1) Case conversion is locale-sensitive.
Turkish has 4 letter "I"s. English has only two, a lowercase dotted i
and an uppercase dotless I. Turkish has lowercase and uppercase forms of
both dotted and dotless I. So simply converting "i" to "I" for upper
case is wrong in Turkish:
+-------+--------+---------+
| | Dotted | Dotless |
+-------+--------+---------+
| Upper | İ | I |
+-------+--------+---------+
| Lower | i | ı |
+-------+--------+---------+
2) Case conversion may change a string's length.
The German word "grüßen" should be converted to "GRÜSSEN" in upper case:
the letter "ß" should be converted to "SS".
3) Case conversion is context-sensitive.
The Greek word "ὈΔΥΣΣΕΎΣ" should be converted to "ὀδυσσεύς", where the
Greek letter "Σ" is converted to "σ" or to "ς", depending on its
position in the word.
The above cases will be focus in follow-up JIRAs. This patch addes the
initial implementation of UTF-8 aware case conversion functions.
--------
Implementation:
In UTF-8 mode (turned on by set UTF8_MODE=true) of these functions, the
bytes in strings are converted to wide characters using std::mbrtowc().
Each wide character (wchar_t) will then be converted using std::towupper
or std::towlower correspondingly. We then convert them back to multi
bytes using std::wcrtomb().
Note that these builtins are locale aware. If impalad is launched
without a UTF-8 aware locale, e.g. LC_ALL="C", these builtins can't
recognize non-ascii characters, which will return unexpected results.
Thus we modify our docker images to set LC_ALL="C.UTF-8" instead of "C".
This patch also logs the current locale when launching impala daemons
for better debugging. We will support customized locale in IMPALA-11080.
Test:
- Add BE unit tests and e2e tests.
Change-Id: I443e89d46f4638ce85664b021666bc4f03ee8abd
Reviewed-on: http://gerrit.cloudera.org:8080/17785
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
824 lines
31 KiB
Python
Executable File
824 lines
31 KiB
Python
Executable File
#!/usr/bin/python
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
# We do not use Impala's python environment here, nor do we depend on
|
|
# non-standard python libraries to avoid needing extra build steps before
|
|
# triggering this.
|
|
import argparse
|
|
import datetime
|
|
import itertools
|
|
import logging
|
|
import multiprocessing
|
|
import multiprocessing.pool
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
CLI_HELP = """\
|
|
Runs tests inside of docker containers, parallelizing different types of
|
|
tests. This script first creates a docker container, checks out this repo
|
|
into it, bootstraps the container with Impala dependencies, and builds Impala
|
|
and its test data. Then, it saves the resulting container, and launches new
|
|
containers to run tests in parallel. An HTML, visual timeline is generated
|
|
as part of the build, in logs/docker/*/timeline.html.
|
|
"""
|
|
|
|
# To execute run:
|
|
# docker/test-with-docker.py
|
|
# After waiting for that to finish, inspect results in logs/docker.
|
|
#
|
|
# Visually, the timeline looks as follows, produced on a 32-core, 100GB RAM
|
|
# machine:
|
|
# ....... 1h05m Checkout, setup machine, build (8m with ccache),
|
|
# generate testdata (52m); missing ccache
|
|
# adds about 7 minutes (very sensitive to number
|
|
# of available cores)
|
|
# ... 11m Commit the Docker container
|
|
# . 10m FE tests
|
|
# . 10m JDBC tests
|
|
# .... 45m serial EE tests
|
|
# ...... 1h02m cluster tests
|
|
# ... 31m BE (C++) tests
|
|
# .... 36m parallel EE tests
|
|
# Total time: 2h25m.
|
|
#
|
|
# CPU usage is sustained high for the parallel EE tests and for
|
|
# the C++ compile (when it's not ccache'd), but is otherwise low.
|
|
# Because every parallel track consumes memory (a cluster),
|
|
# increasing parallelism and memory must be balanced.
|
|
#
|
|
# Memory usage is thorny. The minicluster memory can
|
|
# be tweaked somewhat by how much memory to give to the JVM
|
|
# and what to set --mem_limit too. Furthermore, parallel
|
|
# cluster tests use more memory when more parallelism happens.
|
|
#
|
|
# The code that runs inside of the containers is in entrypoint.sh,
|
|
# whereas the code that invokes docker is here.
|
|
#
|
|
# We avoid using Dockerfile and "docker build": they make it hard or impossible
|
|
# to cross-mount host directories into containers or use --privileged, and using
|
|
# them would require generating them dynamically. They're more trouble than
|
|
# they're worth for this use case.
|
|
#
|
|
# In practice, the containers are about 100GB (with 45GB
|
|
# being test data and ~40GB being the tests).
|
|
#
|
|
# Requirements:
|
|
# * Docker
|
|
# This has been tested on Ubuntu16.04 with Docker
|
|
# from the Ubuntu repos, i.e., Docker 1.13.1.
|
|
# * About 150 GB of disk space available to Docker.
|
|
# * 75GB of RAM.
|
|
#
|
|
# This script tries to clean up images and containers created by this process, though
|
|
# this can be disabled for debugging.
|
|
#
|
|
# To clean up containers and images manually, you can use:
|
|
# for x in $(docker ps -aq --filter label=pwd=$IMPALA_HOME); do
|
|
# docker stop $x; docker rm $x; done
|
|
# for x in $(docker images -q --filter label=pwd=$IMPALA_HOME); do docker rmi $x; done
|
|
#
|
|
# Core dumps:
|
|
# On an Ubuntu host, core dumps and Docker don't mix by default, because apport is not
|
|
# running inside of the container. See https://github.com/moby/moby/issues/11740
|
|
# To enable core dumps, run the following command on the host:
|
|
# $echo 'core.%e.%p' | sudo tee /proc/sys/kernel/core_pattern
|
|
#
|
|
# TODOs:
|
|
# - Support for executing other flavors, like exhaustive, or file systems,
|
|
# like S3.
|
|
#
|
|
# Suggested speed improvement TODOs:
|
|
# - Speed up testdata generation
|
|
# - Skip generating test data for variants not being run
|
|
# - Make container image smaller
|
|
# - Analyze .xml junit files to find slow tests; eradicate
|
|
# or move to different suite.
|
|
# - Run BE tests earlier (during data load)
|
|
|
|
if __name__ == '__main__' and __package__ is None:
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
import monitor
|
|
|
|
base = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
LOG_FORMAT="%(asctime)s %(threadName)s: %(message)s"
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
|
|
default_parallel_test_concurrency, default_suite_concurrency, default_memlimit_gb = \
|
|
_compute_defaults()
|
|
parser = argparse.ArgumentParser(
|
|
description=CLI_HELP, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
group = parser.add_mutually_exclusive_group()
|
|
group.add_argument('--cleanup-containers', dest="cleanup_containers",
|
|
action='store_true', default=True,
|
|
help='Removes containers when finished.')
|
|
group.add_argument('--no-cleanup-containers',
|
|
dest="cleanup_containers", action='store_false')
|
|
group = parser.add_mutually_exclusive_group()
|
|
parser.add_argument(
|
|
'--parallel-test-concurrency', type=int,
|
|
default=default_parallel_test_concurrency,
|
|
help='For the ee-test-parallel suite, how many tests to run concurrently.')
|
|
parser.add_argument(
|
|
'--suite-concurrency', type=int, default=default_suite_concurrency,
|
|
help='Number of concurrent suites to run in parallel.')
|
|
parser.add_argument(
|
|
'--impalad-mem-limit-bytes', type=int, default=default_memlimit_gb,
|
|
help='Memlimit to pass to impalad for miniclusters.')
|
|
group.add_argument(
|
|
'--cleanup-image', dest="cleanup_image",
|
|
action='store_true', default=True,
|
|
help="Whether to remove image when done.")
|
|
group.add_argument('--no-cleanup-image', dest="cleanup_image", action='store_false')
|
|
parser.add_argument('--base-image', dest="base_image", default="ubuntu:16.04",
|
|
help="Base OS image to use. ubuntu:16.04 and centos:6 are known to work.")
|
|
parser.add_argument(
|
|
'--build-image', metavar='IMAGE',
|
|
help='Skip building, and run tests on pre-existing image.')
|
|
|
|
suite_group = parser.add_mutually_exclusive_group()
|
|
suite_group.add_argument(
|
|
'--suite', metavar='VARIANT', action='append',
|
|
help="""
|
|
Run specific test suites; can be specified multiple times.
|
|
Test-with-docker may shard some suites to improve parallelism.
|
|
If not specified, default tests are run.
|
|
Default: %s, All Choices: %s
|
|
""" % (",".join([ s.name for s in DEFAULT_SUITES]),
|
|
",".join([ s.name for s in ALL_SUITES ])))
|
|
suite_group.add_argument('--all-suites', action='store_true', default=False,
|
|
help="If set, run all available suites.")
|
|
parser.add_argument(
|
|
'--name', metavar='NAME',
|
|
help="Use a specific name for the test run. The name is used " +
|
|
"as a prefix for the container and image names, and " +
|
|
"as part of the log directory naming. Defaults to include a timestamp.",
|
|
default=datetime.datetime.now().strftime("i-%Y%m%d-%H%M%S"))
|
|
parser.add_argument('--ccache-dir', metavar='DIR',
|
|
help="CCache directory to use",
|
|
default=os.path.expanduser("~/.ccache"))
|
|
parser.add_argument('--tail', action="store_true",
|
|
help="Run tail on all container log files.")
|
|
parser.add_argument('--env', metavar='K=V', default=[], action='append',
|
|
help="""Passes given environment variables (expressed as KEY=VALUE)
|
|
through containers.
|
|
""")
|
|
parser.add_argument('--test', action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
if not args.suite:
|
|
if args.all_suites:
|
|
# Ignore "NOOP" tasks, as they are just for testing.
|
|
args.suite = [ s.name for s in ALL_SUITES if not s.name.startswith("NOOP") ]
|
|
else:
|
|
args.suite = [ s.name for s in DEFAULT_SUITES ]
|
|
t = TestWithDocker(
|
|
build_image=args.build_image, suite_names=args.suite,
|
|
name=args.name, cleanup_containers=args.cleanup_containers,
|
|
cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
|
|
parallel_test_concurrency=args.parallel_test_concurrency,
|
|
suite_concurrency=args.suite_concurrency,
|
|
impalad_mem_limit_bytes=args.impalad_mem_limit_bytes,
|
|
tail=args.tail,
|
|
env=args.env, base_image=args.base_image)
|
|
|
|
fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
|
|
fh.setFormatter(logging.Formatter(LOG_FORMAT))
|
|
logging.getLogger('').addHandler(fh)
|
|
|
|
logging.info("Arguments: %s", args)
|
|
|
|
ret = t.run()
|
|
t.create_timeline()
|
|
t.log_summary()
|
|
|
|
if not ret:
|
|
sys.exit(1)
|
|
|
|
|
|
def _compute_defaults():
|
|
"""Compute default config options based on memory.
|
|
|
|
The goal is to work reasonably on machines with
|
|
about 60GB of memory, like Amazon's c4.8xlarge (36 CPUs, 60GB)
|
|
or c5.9xlarge (36 CPUs, 72GB) or m4.4xlarge (16 CPUs, 64 GB).
|
|
|
|
Based on some experiments, we set up defaults for different
|
|
machine sizes based on memory, with an eye towards
|
|
having reasonable runtimes as well.
|
|
|
|
Experiments on memory usage:
|
|
|
|
suite parallelism usage
|
|
Xmx memlimit
|
|
ee-test-parallel 4GB 8 5GB 33GB
|
|
ee-test-parallel 4GB 16 7GB 37GB
|
|
ee-test-serial 4GB - 5GB 18GB
|
|
cluster-test 4GB - - 13GB
|
|
be-test 4GB - 10GB 19GB
|
|
fe-test 4GB - 10GB 9GB
|
|
"""
|
|
total_memory_gb = monitor.total_memory()
|
|
cpus = multiprocessing.cpu_count()
|
|
logging.info("CPUs: %s Memory (GB): %s", cpus, total_memory_gb)
|
|
|
|
parallel_test_concurrency = min(cpus, 8)
|
|
memlimit_gb = 8
|
|
|
|
if total_memory_gb >= 140:
|
|
suite_concurrency = 6
|
|
memlimit_gb = 11
|
|
parallel_test_concurrency = min(cpus, 12)
|
|
elif total_memory_gb >= 95:
|
|
suite_concurrency = 4
|
|
memlimit_gb = 11
|
|
parallel_test_concurrency = min(cpus, 12)
|
|
elif total_memory_gb >= 65:
|
|
suite_concurrency = 3
|
|
elif total_memory_gb >= 35:
|
|
suite_concurrency = 2
|
|
else:
|
|
logging.warning("This tool should be run on a machine with more memory.")
|
|
suite_concurrency = 1
|
|
|
|
return parallel_test_concurrency, suite_concurrency, memlimit_gb * 1024 * 1024 * 1024
|
|
|
|
class Suite(object):
|
|
"""Encapsulates a test suite.
|
|
|
|
A test suite is a named thing that the user can select to run,
|
|
and it runs in its own container, in parallel with other suites.
|
|
The actual running happens from entrypoint.sh and is controlled
|
|
mostly by environment variables. When complexity is easier
|
|
to handle in Python (with its richer data types), we prefer
|
|
it here.
|
|
"""
|
|
def __init__(self, name, **envs):
|
|
"""Create suite with given name and environment."""
|
|
self.name = name
|
|
self.envs = dict(
|
|
FE_TEST="false",
|
|
BE_TEST="false",
|
|
EE_TEST="false",
|
|
JDBC_TEST="false",
|
|
CLUSTER_TEST="false")
|
|
# If set, this suite is sharded past a certain suite concurrency threshold.
|
|
self.shard_at_concurrency = None
|
|
# Variable to which to append --shard_tests
|
|
self.sharding_variable = None
|
|
self.envs[name] = "true"
|
|
self.envs.update(envs)
|
|
self.timeout_minutes = 120
|
|
|
|
def copy(self, name, **envs):
|
|
"""Duplicates current suite allowing for environment updates."""
|
|
v = dict()
|
|
v.update(self.envs)
|
|
v.update(envs)
|
|
ret = Suite(name, **v)
|
|
ret.shard_at_concurrency = self.shard_at_concurrency
|
|
ret.sharding_variable = self.sharding_variable
|
|
ret.timeout_minutes = self.timeout_minutes
|
|
return ret
|
|
|
|
def exhaustive(self):
|
|
"""Returns an "exhaustive" copy of the suite."""
|
|
r = self.copy(self.name + "_EXHAUSTIVE", EXPLORATION_STRATEGY="exhaustive")
|
|
r.timeout_minutes = 240
|
|
return r
|
|
|
|
def asan(self):
|
|
"""Returns an ASAN copy of this suite."""
|
|
r = self.copy(self.name + "_ASAN", REBUILD_ASAN="true")
|
|
r.timeout_minutes = self.timeout_minutes * 2.0 + 10
|
|
return r
|
|
|
|
def sharded(self, shards):
|
|
"""Returns a list of sharded copies of the list.
|
|
|
|
key is the name of the variable which needs to be appended with "--shard-tests=..."
|
|
"""
|
|
# RUN_TESTS_ARGS
|
|
ret = []
|
|
for i in range(1, shards + 1):
|
|
s = self.copy("%s_%d_of_%d" % (self.name, i, shards))
|
|
s.envs[self.sharding_variable] = self.envs.get(self.sharding_variable, "") \
|
|
+ " --shard_tests=%s/%s" % (i, shards)
|
|
ret.append(s)
|
|
return ret
|
|
|
|
# Definitions of all known suites:
|
|
be_test = Suite("BE_TEST")
|
|
ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
|
|
RUN_TESTS_ARGS="--skip-parallel --skip-stress")
|
|
ee_test_serial.shard_at_concurrency = 4
|
|
ee_test_serial.sharding_variable = "RUN_TESTS_ARGS"
|
|
ee_test_serial_exhaustive = ee_test_serial.exhaustive()
|
|
ee_test_parallel = Suite("EE_TEST_PARALLEL", EE_TEST="true",
|
|
RUN_TESTS_ARGS="--skip-serial")
|
|
ee_test_parallel_exhaustive = ee_test_parallel.exhaustive()
|
|
cluster_test = Suite("CLUSTER_TEST")
|
|
cluster_test.shard_at_concurrency = 4
|
|
cluster_test.sharding_variable = "RUN_CUSTOM_CLUSTER_TESTS_ARGS"
|
|
cluster_test_exhaustive = cluster_test.exhaustive()
|
|
|
|
# Default supported suites. These are organized slowest-to-fastest, so that,
|
|
# when parallelism is limited, the total time is least impacted.
|
|
DEFAULT_SUITES = [
|
|
ee_test_serial,
|
|
ee_test_parallel,
|
|
cluster_test,
|
|
Suite("FE_TEST"),
|
|
Suite("BE_TEST"),
|
|
Suite("JDBC_TEST")
|
|
]
|
|
|
|
OTHER_SUITES = [
|
|
ee_test_parallel_exhaustive,
|
|
ee_test_serial_exhaustive,
|
|
cluster_test_exhaustive,
|
|
|
|
# ASAN
|
|
be_test.asan(),
|
|
cluster_test.asan(),
|
|
ee_test_parallel.asan(),
|
|
ee_test_serial.asan(),
|
|
|
|
Suite("RAT_CHECK"),
|
|
# These are used for testing this script
|
|
Suite("NOOP"),
|
|
Suite("NOOP_FAIL"),
|
|
Suite("NOOP_SLEEP_FOREVER")
|
|
]
|
|
ALL_SUITES = DEFAULT_SUITES + OTHER_SUITES
|
|
|
|
def _call(args, check=True):
|
|
"""Wrapper for calling a subprocess.
|
|
|
|
args is the first argument of subprocess.Popen, typically
|
|
an array, e.g., ["echo", "hi"].
|
|
|
|
If check is set, raise an exception on failure.
|
|
"""
|
|
logging.info("Calling: %s", args)
|
|
if check:
|
|
subprocess.check_call(args, stdin=None)
|
|
else:
|
|
return subprocess.call(args, stdin=None)
|
|
|
|
|
|
def _check_output(*args, **kwargs):
|
|
"""Wrapper for subprocess.check_output, with logging."""
|
|
logging.info("Running: %s, %s; cmdline: %s.", args, kwargs, " ".join(*args))
|
|
return subprocess.check_output(*args, **kwargs)
|
|
|
|
|
|
def _make_dir_if_not_exist(*parts):
|
|
d = os.path.join(*parts)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
class Container(object):
|
|
"""Encapsulates a container, with some metadata."""
|
|
|
|
def __init__(self, id_, name, logfile, exitcode=None, running=None):
|
|
self.id = id_
|
|
self.name = name
|
|
self.logfile = logfile
|
|
self.exitcode = exitcode
|
|
self.running = running
|
|
self.start = None
|
|
self.end = None
|
|
self.removed = False
|
|
|
|
# Protects multiple calls to "docker rm <self.id>"
|
|
self.lock = threading.Lock()
|
|
|
|
# Updated by Timeline class
|
|
self.total_user_cpu = -1
|
|
self.total_system_cpu = -1
|
|
self.peak_total_rss = -1
|
|
|
|
def runtime_seconds(self):
|
|
if self.start and self.end:
|
|
return self.end - self.start
|
|
|
|
def __str__(self):
|
|
return "Container<" + \
|
|
",".join(["%s=%s" % (k, v) for k, v in self.__dict__.items()]) \
|
|
+ ">"
|
|
|
|
|
|
class TestWithDocker(object):
|
|
"""Tests Impala using Docker containers for parallelism."""
|
|
|
|
def __init__(self, build_image, suite_names, name, cleanup_containers,
|
|
cleanup_image, ccache_dir, test_mode,
|
|
suite_concurrency, parallel_test_concurrency,
|
|
impalad_mem_limit_bytes, tail,
|
|
env, base_image):
|
|
self.build_image = build_image
|
|
self.name = name
|
|
self.containers = []
|
|
self.git_root = _check_output(["git", "rev-parse", "--show-toplevel"]).strip()
|
|
# Protects multiple concurrent calls to "docker create"
|
|
self.docker_lock = threading.Lock()
|
|
|
|
# If using worktrees, we need to find $GIT_COMMON_DIR; rev-parse
|
|
# supports finding it as of vesion 2.5.0; for older versions, we
|
|
# use $GIT_DIR.
|
|
git_common_dir = _check_output(["git", "rev-parse", "--git-common-dir"]).strip()
|
|
if git_common_dir == "--git-common-dir":
|
|
git_common_dir = _check_output(["git", "rev-parse", "--git-dir"]).strip()
|
|
self.git_common_dir = os.path.realpath(git_common_dir)
|
|
assert os.path.exists(self.git_common_dir)
|
|
|
|
self.git_head_rev = _check_output(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip()
|
|
assert self.git_head_rev, \
|
|
"Could not get reference to HEAD using git rev-parse --abbrev-ref HEAD."
|
|
self.cleanup_containers = cleanup_containers
|
|
self.cleanup_image = cleanup_image
|
|
self.image = None
|
|
if build_image and cleanup_image:
|
|
# Refuse to clean up external image.
|
|
raise Exception("cleanup_image and build_image cannot be both specified")
|
|
self.ccache_dir = ccache_dir
|
|
self.log_dir = os.path.join(self.git_root, "logs", "docker", self.name)
|
|
self.monitoring_output_file = os.path.join(self.log_dir, "metrics.txt")
|
|
self.monitor = monitor.ContainerMonitor(self.monitoring_output_file)
|
|
self.test_mode = test_mode
|
|
self.suite_concurrency = suite_concurrency
|
|
self.parallel_test_concurrency = parallel_test_concurrency
|
|
self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
|
|
self.tail = tail
|
|
self.env = env
|
|
self.base_image = base_image
|
|
|
|
# Map suites back into objects; we ignore case for this mapping.
|
|
suites = []
|
|
suites_by_name = {}
|
|
for suite in ALL_SUITES:
|
|
suites_by_name[suite.name.lower()] = suite
|
|
for suite_name in suite_names:
|
|
suites.append(suites_by_name[suite_name.lower()])
|
|
|
|
# If we have enough concurrency, shard some suites into two halves.
|
|
suites2 = []
|
|
for suite in suites:
|
|
if suite.shard_at_concurrency is not None and \
|
|
suite_concurrency >= suite.shard_at_concurrency:
|
|
suites2.extend(suite.sharded(2))
|
|
else:
|
|
suites2.append(suite)
|
|
suites = suites2
|
|
|
|
self.suite_runners = [TestSuiteRunner(self, suite) for suite in suites]
|
|
|
|
def _create_container(self, image, name, logdir, logname, entrypoint, extras=None):
|
|
"""Returns a new container.
|
|
|
|
logdir - subdirectory to create under self.log_dir,
|
|
which will get mounted to /logs
|
|
logname - name of file in logdir that will be created
|
|
extras - extra arguments to pass to docker
|
|
entrypoint - entrypoint arguments, as a list.
|
|
"""
|
|
if extras is None:
|
|
extras = []
|
|
if self.test_mode:
|
|
extras = ["-e", "TEST_TEST_WITH_DOCKER=true"] + extras
|
|
|
|
# According to localtime(5), /etc/localtime is supposed
|
|
# to be a symlink to somewhere inside /usr/share/zoneinfo.
|
|
# Note that sometimes the symlink tree may be
|
|
# complicated, e.g.:
|
|
# /etc/localtime ->
|
|
# /usr/share/zoneinfo/America/Los_Angeles -> (readlink)
|
|
# ../US/Pacific-New (realpath)
|
|
# Using both readlink and realpath should work, but we've
|
|
# encountered one scenario (centos:6) where the Java tzdata
|
|
# database doesn't have US/Pacific-New, but has America/Los_Angeles.
|
|
# This is deemed sufficient to tip the scales to using readlink.
|
|
assert os.path.islink("/etc/localtime")
|
|
localtime_link_target = os.readlink("/etc/localtime")
|
|
assert localtime_link_target.startswith("/usr/share/zoneinfo")
|
|
|
|
# Workaround for what appears to be https://github.com/moby/moby/issues/13885
|
|
# Namely, if we run too many "docker create" at the same time, one
|
|
# of them hangs forever. To avoid the issue, we serialize the invocations
|
|
# of "docker create".
|
|
with self.docker_lock:
|
|
container_id = _check_output([
|
|
"docker", "create",
|
|
# Required for some of the ntp handling in bootstrap and Kudu;
|
|
# requirement may be lifted in newer Docker versions.
|
|
"--privileged",
|
|
"--name", name,
|
|
# Whereas the container names vary across containers, we use the same
|
|
# hostname repeatedly, so that the build container and the test
|
|
# containers have the same hostnames. Kudu errors out with "Remote
|
|
# error: Service unavailable: Timed out: could not wait for desired
|
|
# snapshot timestamp to be consistent: Tablet is lagging too much to be
|
|
# able to serve snapshot scan." if reading with READ_AT_SNAPSHOT
|
|
# if the hostnames change underneath it.
|
|
"--hostname", self.name,
|
|
# Label with the git root directory for easier cleanup
|
|
"--label=pwd=" + self.git_root,
|
|
# Consistent locales
|
|
"-e", "LC_ALL=C.UTF-8",
|
|
"-e", "IMPALAD_MEM_LIMIT_BYTES=" +
|
|
str(self.impalad_mem_limit_bytes),
|
|
# Mount the git directory so that clones can be local.
|
|
# We use /repo to have access to certain scripts,
|
|
# and we use /git_common_dir to have local clones,
|
|
# even when "git worktree" is being used.
|
|
"-v", self.git_root + ":/repo:ro",
|
|
"-v", self.git_common_dir + ":/git_common_dir:ro",
|
|
"-e", "GIT_HEAD_REV=" + self.git_head_rev,
|
|
# Share timezone between host and container
|
|
"-e", "LOCALTIME_LINK_TARGET=" + localtime_link_target,
|
|
"-v", self.ccache_dir + ":/ccache",
|
|
"-e", "CCACHE_TEMPDIR=" + "/ccache/" + name,
|
|
"-v", _make_dir_if_not_exist(self.log_dir,
|
|
logdir) + ":/logs",
|
|
"-v", base + ":/mnt/base:ro"]
|
|
+ list(itertools.chain(*[["-e", env] for env in self.env]))
|
|
+ extras
|
|
+ [image]
|
|
+ entrypoint).strip()
|
|
ctr = Container(name=name, id_=container_id,
|
|
logfile=os.path.join(self.log_dir, logdir, logname))
|
|
logging.info("Created container %s", ctr)
|
|
return ctr
|
|
|
|
def _run_container(self, container):
|
|
"""Runs container, and returns True if the container had a successful exit value.
|
|
|
|
This blocks while the container is running. The container output is
|
|
run through annotate.py to add timestamps and saved into the container's log file.
|
|
"""
|
|
container.running = True
|
|
tailer = None
|
|
|
|
with open(container.logfile, "aw") as log_output:
|
|
if self.tail:
|
|
tailer = subprocess.Popen(
|
|
["tail", "-f", "--pid", str(os.getpid()), "-v", container.logfile])
|
|
|
|
container.start = time.time()
|
|
# Sets up a "docker start ... | annotate.py > logfile" pipeline using
|
|
# subprocess.
|
|
annotate = subprocess.Popen(
|
|
[os.path.join(self.git_root, "docker", "annotate.py")],
|
|
stdin=subprocess.PIPE,
|
|
stdout=log_output,
|
|
stderr=log_output)
|
|
|
|
logging.info("Starting container %s; logging to %s", container.name,
|
|
container.logfile)
|
|
docker = subprocess.Popen(["docker", "start", "--attach", container.id],
|
|
stdin=None, stdout=annotate.stdin, stderr=annotate.stdin)
|
|
|
|
ret = docker.wait()
|
|
annotate.stdin.close()
|
|
annotate.wait()
|
|
|
|
logging.info("Container %s returned %s", container, ret)
|
|
container.exitcode = ret
|
|
container.running = False
|
|
container.end = time.time()
|
|
if tailer:
|
|
tailer.kill()
|
|
return ret == 0
|
|
|
|
@staticmethod
|
|
def _stop_container(container):
|
|
"""Stops container. Ignores errors (e.g., if it's already exited)."""
|
|
if container.running:
|
|
_call(["docker", "stop", container.id], check=False)
|
|
container.end = time.time()
|
|
container.running = False
|
|
|
|
@staticmethod
|
|
def _rm_container(container):
|
|
"""Removes container."""
|
|
# We can have multiple threads trying to call "docker rm" on a container.
|
|
# Docker will fail one of those with "already running", but we actually
|
|
# want to block until it's removed. Using a lock to serialize the "docker # rm"
|
|
# calls handles that.
|
|
with container.lock:
|
|
if not container.removed:
|
|
_call(["docker", "rm", container.id], check=False)
|
|
container.removed = True
|
|
|
|
def _create_build_image(self):
|
|
"""Creates the "build image", with Impala compiled and data loaded."""
|
|
container = self._create_container(
|
|
image=self.base_image, name=self.name + "-build",
|
|
logdir="build",
|
|
logname="log-build.txt",
|
|
# entrypoint.sh will create a user with our uid; this
|
|
# allows the shared file systems to work seamlessly
|
|
entrypoint=["/mnt/base/entrypoint.sh", "build", str(os.getuid())])
|
|
self.containers.append(container)
|
|
self.monitor.add(container)
|
|
try:
|
|
logging.info("Docker container for build: %s", container)
|
|
_check_output(["docker", "start", container.id])
|
|
if not self._run_container(container):
|
|
raise Exception("Build container failed.")
|
|
logging.info("Committing docker container.")
|
|
self.image = _check_output(
|
|
["docker", "commit",
|
|
"-c", "LABEL pwd=" + self.git_root,
|
|
"-c", "USER impdev",
|
|
"-c", "WORKDIR /home/impdev/Impala",
|
|
"-c", 'CMD ["/home/impdev/Impala/docker/entrypoint.sh", "shell"]',
|
|
container.id, "impala:built-" + self.name]).strip()
|
|
logging.info("Committed docker image: %s", self.image)
|
|
finally:
|
|
if self.cleanup_containers:
|
|
self._stop_container(container)
|
|
self._rm_container(container)
|
|
|
|
def _run_tests(self):
|
|
pool = multiprocessing.pool.ThreadPool(processes=self.suite_concurrency)
|
|
outstanding_suites = []
|
|
for suite in self.suite_runners:
|
|
suite.task = pool.apply_async(suite.run)
|
|
outstanding_suites.append(suite)
|
|
|
|
ret = True
|
|
try:
|
|
while len(outstanding_suites) > 0:
|
|
for suite in list(outstanding_suites):
|
|
if suite.timed_out():
|
|
msg = "Task %s not finished within timeout %s" % (suite.name,
|
|
suite.suite.timeout_minutes,)
|
|
logging.error(msg)
|
|
raise Exception(msg)
|
|
task = suite.task
|
|
if task.ready():
|
|
this_task_ret = task.get()
|
|
outstanding_suites.remove(suite)
|
|
if this_task_ret:
|
|
logging.info("Suite %s succeeded.", suite.name)
|
|
else:
|
|
logging.info("Suite %s failed.", suite.name)
|
|
ret = False
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
logging.info("\n\nDetected KeyboardInterrupt; shutting down!\n\n")
|
|
raise
|
|
finally:
|
|
pool.terminate()
|
|
return ret
|
|
|
|
def run(self):
|
|
# Create logs directories and ccache dir.
|
|
_make_dir_if_not_exist(self.ccache_dir)
|
|
_make_dir_if_not_exist(self.log_dir)
|
|
|
|
self.monitor.start()
|
|
try:
|
|
if not self.build_image:
|
|
self._create_build_image()
|
|
else:
|
|
self.image = self.build_image
|
|
ret = self._run_tests()
|
|
return ret
|
|
finally:
|
|
self.monitor.stop()
|
|
if self.cleanup_containers:
|
|
for c in self.containers:
|
|
self._stop_container(c)
|
|
self._rm_container(c)
|
|
if self.cleanup_image and self.image:
|
|
_call(["docker", "rmi", self.image], check=False)
|
|
logging.info("Memory usage: %s GB min, %s GB max",
|
|
self.monitor.min_memory_usage_gb,
|
|
self.monitor.max_memory_usage_gb)
|
|
|
|
# Strings (really, regular expressions) pulled out into to the visual timeline.
|
|
_INTERESTING_STRINGS = [
|
|
">>> ",
|
|
]
|
|
_INTERESTING_RE = re.compile("|".join("(%s)" % (s,) for s in _INTERESTING_STRINGS))
|
|
|
|
def create_timeline(self):
|
|
"""Creates timeline into log directory."""
|
|
timeline = monitor.Timeline(
|
|
monitor_file=self.monitoring_output_file,
|
|
containers=self.containers,
|
|
interesting_re=self._INTERESTING_RE,
|
|
buildname=self.name)
|
|
timeline.create(os.path.join(self.log_dir, "timeline.html"))
|
|
|
|
def log_summary(self):
|
|
logging.info("Containers:")
|
|
def to_success_string(exitcode):
|
|
if exitcode == 0:
|
|
return "SUCCESS"
|
|
return "FAILURE"
|
|
|
|
for c in self.containers:
|
|
logging.info("%s %s %s %0.1fm wall, %0.1fm user, %0.1fm system, " +
|
|
"%0.1fx parallelism, %0.1f GB peak RSS",
|
|
to_success_string(c.exitcode), c.name, c.logfile,
|
|
c.runtime_seconds() / 60.0,
|
|
c.total_user_cpu / 60.0,
|
|
c.total_system_cpu / 60.0,
|
|
(c.total_user_cpu + c.total_system_cpu) / max(c.runtime_seconds(), 0.0001),
|
|
c.peak_total_rss / 1024.0 / 1024.0 / 1024.0)
|
|
|
|
|
|
class TestSuiteRunner(object):
|
|
"""Runs a single test suite."""
|
|
|
|
def __init__(self, test_with_docker, suite):
|
|
self.test_with_docker = test_with_docker
|
|
self.suite = suite
|
|
self.task = None
|
|
self.name = suite.name.lower()
|
|
# Set at the beginning of run and facilitates enforcing timeouts
|
|
# for individual suites.
|
|
self.deadline = None
|
|
|
|
def timed_out(self):
|
|
return self.deadline is not None and time.time() > self.deadline
|
|
|
|
def run(self):
|
|
"""Runs given test. Returns true on success, based on exit code."""
|
|
self.deadline = time.time() + self.suite.timeout_minutes * 60
|
|
test_with_docker = self.test_with_docker
|
|
suite = self.suite
|
|
envs = ["-e", "NUM_CONCURRENT_TESTS=" + str(test_with_docker.parallel_test_concurrency)]
|
|
for k, v in sorted(suite.envs.iteritems()):
|
|
envs.append("-e")
|
|
envs.append("%s=%s" % (k, v))
|
|
|
|
self.start = time.time()
|
|
|
|
# io-file-mgr-test expects a real-ish file system at /tmp;
|
|
# we mount a temporary directory into the container to appease it.
|
|
tmpdir = tempfile.mkdtemp(prefix=test_with_docker.name + "-" + self.name)
|
|
os.chmod(tmpdir, 01777)
|
|
# Container names are sometimes used as hostnames, and DNS names shouldn't
|
|
# have underscores.
|
|
container_name = test_with_docker.name + "-" + self.name.replace("_", "-")
|
|
|
|
container = test_with_docker._create_container(
|
|
image=test_with_docker.image,
|
|
name=container_name,
|
|
extras=[
|
|
"-v", tmpdir + ":/tmp",
|
|
"-u", str(os.getuid())
|
|
] + envs,
|
|
logdir=self.name,
|
|
logname="log-test-" + self.suite.name + ".txt",
|
|
entrypoint=["/mnt/base/entrypoint.sh", "test_suite", suite.name])
|
|
|
|
test_with_docker.containers.append(container)
|
|
test_with_docker.monitor.add(container)
|
|
try:
|
|
return test_with_docker._run_container(container)
|
|
except:
|
|
return False
|
|
finally:
|
|
logging.info("Cleaning up containers for %s" % (suite.name,))
|
|
test_with_docker._stop_container(container)
|
|
if test_with_docker.cleanup_containers:
|
|
test_with_docker._rm_container(container)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|