Files
impala/docker/monitor.py
Philip Zeyliger 2896b8d127 IMPALA-6070: Expose using Docker to run tests faster.
Allows running the tests that make up the "core" suite in about 2 hours.
By comparison, https://jenkins.impala.io/job/ubuntu-16.04-from-scratch/buildTimeTrend
tends to run in about 3.5 hours.

This commit:
* Adds "echo" statements in a few places, to facilitate timing.
* Adds --skip-parallel/--skip-serial flags to run-tests.py,
  and exposes them in run-all-tests.sh.
* Marks TestRuntimeFilters as a serial test. This test runs
  queries that need > 1GB of memory, and, combined with
  other tests running in parallel, can kill the parallel test
  suite.
* Adds "test-with-docker.py", which runs a full build, data load,
  and executes tests inside of Docker containers, generating
  a timeline at the end. In short, one container is used
  to do the build and data load, and then this container is
  re-used to run various tests in parallel. All logs are
  left on the host system.

Besides the obvious win of getting test results more quickly, this
commit serves as an example of how to get various bits of Impala
development working inside of Docker containers. For example, Kudu
relies on atomic rename of directories, which isn't available in most
Docker filesystems, and entrypoint.sh works around it.

In addition, the timeline generated by the build suggests where further
optimizations can be made. Most obviously, dataload eats up a precious
~30-50 minutes, on a largely idle machine.

This work is significantly CPU and memory hungry. It was developed on a
32-core, 120GB RAM Google Compute Engine machine. I've worked out
parallelism configurations such that it runs nicely on 60GB of RAM
(c4.8xlarge) and over 100GB (eg., m4.10xlarge, which has 160GB). There is
some simple logic to guess at some knobs, and there are knobs.  By and
large, EC2 and GCE price machines linearly, so, if CPU usage can be kept
up, it's not wasteful to run on bigger machines.

Change-Id: I82052ef31979564968effef13a3c6af0d5c62767
Reviewed-on: http://gerrit.cloudera.org:8080/9085
Reviewed-by: Philip Zeyliger <philip@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2018-04-06 06:40:07 +00:00

330 lines
12 KiB
Python

#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Monitors Docker containers for CPU and memory usage, and
# prepares an HTML timeline based on said monitoring.
#
# Usage example:
# mon = monitor.ContainerMonitor("monitoring.txt")
# mon.start()
# # container1 is an object with attributes id, name, and logfile.
# mon.add(container1)
# mon.add(container2)
# mon.stop()
# timeline = monitor.Timeline("monitoring.txt",
# [container1, container2],
# re.compile(">>> "))
# timeline.create("output.html")
import datetime
import json
import logging
import os
import shutil
import subprocess
import threading
import time
# Unit for reporting user/system CPU seconds in cpuacct.stat.
# See https://www.kernel.org/doc/Documentation/cgroup-v1/cpuacct.txt and time(7).
USER_HZ = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
def total_memory():
"""Returns total RAM on system, in GB."""
return _memory()[0]
def used_memory():
"""Returns total used RAM on system, in GB."""
return _memory()[1]
def _memory():
"""Returns (total, used) memory on system, in GB.
Used is computed as total - available.
Calls "free" and parses output. Sample output for reference:
total used free shared buffers cache available
Mem: 126747197440 26363965440 56618553344 31678464 2091614208 41673064448 99384889344
Swap: 0 0 0
"""
free_lines = subprocess.check_output(["free", "-b", "-w"]).split('\n')
free_grid = [x.split() for x in free_lines]
# Identify columns for "total" and "available"
total_idx = free_grid[0].index("total")
available_idx = free_grid[0].index("available")
total = int(free_grid[1][1 + total_idx])
available = int(free_grid[1][1 + available_idx])
used = total - available
total_gb = total / (1024.0 * 1024.0 * 1024.0)
used_gb = used / (1024.0 * 1024.0 * 1024.0)
return (total_gb, used_gb)
def datetime_to_seconds_since_epoch(dt):
"""Converts a Python datetime to seconds since the epoch."""
return time.mktime(dt.timetuple())
def split_timestamp(line):
"""Parses timestamp at beginning of a line.
Returns a tuple of seconds since the epoch and the rest
of the line. Returns None on parse failures.
"""
LENGTH = 26
FORMAT = "%Y-%m-%d %H:%M:%S.%f"
t = line[:LENGTH]
return (datetime_to_seconds_since_epoch(datetime.datetime.strptime(t, FORMAT)),
line[LENGTH + 1:])
class ContainerMonitor(object):
"""Monitors Docker containers.
Monitoring data is written to a file. An example is:
2018-02-02 09:01:37.143591 d8f640989524be3939a70557a7bf7c015ba62ea5a105a64c94472d4ebca93c50 cpu user 2 system 5
2018-02-02 09:01:37.143591 d8f640989524be3939a70557a7bf7c015ba62ea5a105a64c94472d4ebca93c50 memory cache 11481088 rss 4009984 rss_huge 0 mapped_file 8605696 dirty 24576 writeback 0 pgpgin 4406 pgpgout 624 pgfault 3739 pgmajfault 99 inactive_anon 0 active_anon 3891200 inactive_file 7614464 active_file 3747840 unevictable 0 hierarchical_memory_limit 9223372036854771712 total_cache 11481088 total_rss 4009984 total_rss_huge 0 total_mapped_file 8605696 total_dirty 24576 total_writeback 0 total_pgpgin 4406 total_pgpgout 624 total_pgfault 3739 total_pgmajfault 99 total_inactive_anon 0 total_active_anon 3891200 total_inactive_file 7614464 total_active_file 3747840 total_unevictable 0
That is, the format is:
<timestamp> <container> cpu user <usercpu> system <systemcpu>
<timestamp> <container> memory <contents of memory.stat without newlines>
<usercpu> and <systemcpu> are in the units of USER_HZ.
See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt for documentation
on memory.stat; it's in the "memory" cgroup, often mounted at
/sys/fs/cgroup/memory/<cgroup>/memory.stat.
This format is parsed back by the Timeline class below and should
not be considered an API.
"""
def __init__(self, output_path, frequency_seconds=1):
"""frequency_seconds is how often metrics are gathered"""
self.containers = []
self.output_path = output_path
self.keep_monitoring = None
self.monitor_thread = None
self.frequency_seconds = frequency_seconds
def start(self):
self.keep_monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor)
self.monitor_thread.setDaemon(True)
self.monitor_thread.start()
def stop(self):
self.keep_monitoring = False
self.monitor_thread.join()
def add(self, container):
"""Adds monitoring for container, which is an object with property 'id'."""
self.containers.append(container)
@staticmethod
def _metrics_from_stat_file(root, container, stat):
"""Returns metrics stat file contents.
root: a cgroups root (a path as a string)
container: an object with string attribute id
stat: a string filename
Returns contents of <root>/<container.id>/<stat>
with newlines replaced with spaces.
Returns None on errors.
"""
dirname = os.path.join(root, "docker", container.id)
if not os.path.isdir(dirname):
# Container may no longer exist.
return None
try:
statcontents = file(os.path.join(dirname, stat)).read()
return statcontents.replace("\n", " ").strip()
except IOError, e:
# Ignore errors; cgroup can disappear on us.
logging.warning("Ignoring exception reading cgroup. " +
"This can happen if container just exited. " + str(e))
return None
def _monitor(self):
"""Monitors CPU usage of containers.
Otput is stored in self.output_path.
Also, keeps track of minimum and maximum memory usage (for the machine).
"""
# Ubuntu systems typically mount cpuacct cgroup in /sys/fs/cgroup/cpu,cpuacct,
# but this can vary by OS distribution.
all_cgroups = subprocess.check_output(
"findmnt -n -o TARGET -t cgroup --source cgroup".split()
).split("\n")
cpuacct_root = [c for c in all_cgroups if "cpuacct" in c][0]
memory_root = [c for c in all_cgroups if "memory" in c][0]
logging.info("Using cgroups: cpuacct %s, memory %s", cpuacct_root, memory_root)
self.min_memory_usage_gb = None
self.max_memory_usage_gb = None
with file(self.output_path, "w") as output:
while self.keep_monitoring:
# Use a single timestamp for a given round of monitoring.
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
for c in self.containers:
cpu = self._metrics_from_stat_file(cpuacct_root, c, "cpuacct.stat")
memory = self._metrics_from_stat_file(memory_root, c, "memory.stat")
if cpu:
output.write("%s %s cpu %s\n" % (now, c.id, cpu))
if memory:
output.write("%s %s memory %s\n" % (now, c.id, memory))
output.flush()
# Machine-wide memory usage
m = used_memory()
if self.min_memory_usage_gb is None:
self.min_memory_usage_gb, self.max_memory_usage_gb = m, m
else:
self.min_memory_usage_gb = min(self.min_memory_usage_gb, m)
self.max_memory_usage_gb = max(self.max_memory_usage_gb, m)
time.sleep(self.frequency_seconds)
class Timeline(object):
"""Given metric and log data for containers, creates a timeline report.
This is a standalone HTML file with a timeline for the log files and CPU charts for
the containers. The HTML uses https://developers.google.com/chart/ for rendering
the charts, which happens in the browser.
"""
def __init__(self, monitor_file, containers, interesting_re):
self.monitor_file = monitor_file
self.containers = containers
self.interesting_re = interesting_re
def logfile_timeline(self, container):
"""Returns a list of (name, timestamp, line) tuples for interesting lines in
the container's logfile. container is expected to have name and logfile attributes.
"""
interesting_lines = [
line.strip()
for line in file(container.logfile)
if self.interesting_re.search(line)]
return [(container.name,) + split_timestamp(line) for line in interesting_lines]
@staticmethod
def parse_metrics(f):
"""Parses timestamped metric lines.
Given metrics lines like:
2017-10-25 10:08:30.961510 87d5562a5fe0ea075ebb2efb0300d10d23bfa474645bb464d222976ed872df2a cpu user 33 system 15
Returns an iterable of (ts, container, user_cpu, system_cpu)
"""
prev_by_container = {}
for line in f:
ts, rest = split_timestamp(line.rstrip())
try:
container, metric_type, rest2 = rest.split(" ", 2)
if metric_type != "cpu":
continue
_, user_cpu_s, _, system_cpu_s = rest2.split(" ", 3)
except:
logging.warning("Skipping metric line: %s", line)
continue
prev_ts, prev_user, prev_system = prev_by_container.get(
container, (None, None, None))
user_cpu = int(user_cpu_s)
system_cpu = int(system_cpu_s)
if prev_ts is not None:
# Timestamps are seconds since the epoch and are floats.
dt = ts - prev_ts
assert type(dt) == float
if dt != 0:
yield ts, container, (user_cpu - prev_user)/dt/USER_HZ,\
(system_cpu - prev_system)/dt/USER_HZ
prev_by_container[container] = ts, user_cpu, system_cpu
def create(self, output):
# Read logfiles
timelines = []
for c in self.containers:
if not os.path.exists(c.logfile):
logging.warning("Missing log file: %s", c.logfile)
continue
timelines.append(self.logfile_timeline(c))
# Convert timelines to JSON
min_ts = None
timeline_json = []
for timeline in timelines:
for current_line, next_line in zip(timeline, timeline[1:]):
name, ts_current, msg = current_line
_, ts_next, _ = next_line
timeline_json.append(
[name, msg, ts_current, ts_next]
)
if not timeline_json:
logging.warning("No timeline data; skipping timeline")
return
min_ts = min(x[2] for x in timeline_json)
for row in timeline_json:
row[2] = row[2] - min_ts
row[3] = row[3] - min_ts
# metrics_by_container: container -> [ ts, user, system ]
metrics_by_container = dict()
max_metric_ts = 0
container_by_id = dict()
for c in self.containers:
container_by_id[c.id] = c
for ts, container_id, user, system in self.parse_metrics(file(self.monitor_file)):
container = container_by_id.get(container_id)
if not container:
continue
if ts > max_metric_ts:
max_metric_ts = ts
if ts < min_ts:
# We ignore metrics that show up before the timeline's
# first messages. This largely avoids a bug in the
# Google Charts visualization code wherein one of the series seems
# to wrap around.
continue
metrics_by_container.setdefault(
container.name, []).append((ts - min_ts, user, system))
with file(output, "w") as o:
template_path = os.path.join(os.path.dirname(__file__), "timeline.html.template")
shutil.copyfileobj(file(template_path), o)
o.write("\n<script>\nvar data = \n")
json.dump(dict(timeline=timeline_json, metrics=metrics_by_container,
max_ts=(max_metric_ts - min_ts)), o, indent=2)
o.write("</script>")
o.close()