mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
- Fix a bug in which rsync --chown doesn't work on CentOS 7. - Update HOST_TESTDATA_EXTERNAL_VOLUME_PATH (for the minicluster data): most runs now are on EC2 etc., and they already need a large volume for docker images, so just keep the cluster data there, too. - Reduce extremely verbose logging. - Default to a database that's part of dataload (tpch_kudu). - Change some of the controller variables to my preferred defaults. Change-Id: I169f60dad53d2e4980ed6bd1f350fb0dcf274306 Testing: Regular downstream runs for months. Reviewed-on: http://gerrit.cloudera.org:8080/12386 Reviewed-by: David Knupp <dknupp@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
169 lines
6.5 KiB
Python
Executable File
169 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env impala-python
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from time import sleep, localtime, strftime
|
|
from tests.comparison.query_profile import DefaultProfile, ImpalaNestedTypesProfile
|
|
from schedule_item import ScheduleItem
|
|
from threading import Thread
|
|
import os
|
|
import pickle
|
|
import time
|
|
import logging
|
|
|
|
PATH_TO_SCHEDULE = '/tmp/query_gen/schedule'
|
|
PATH_TO_REPORTS = '/tmp/query_gen/reports'
|
|
PATH_TO_FINISHED_JOBS = '/tmp/query_gen/completed_jobs'
|
|
PATH_TO_LOG = '/tmp/query_gen/log'
|
|
RUN_TIME_LIMIT = 12 * 3600
|
|
GENERATION_FREQUENCY = 300 + RUN_TIME_LIMIT
|
|
MAX_CONCURRENCY = 1
|
|
DEFAULT_RUN_NAME = 'AUTO_RUN'
|
|
SLEEP_LENGTH = 3
|
|
|
|
NESTED_TYPES_MODE = False
|
|
DELETE_SCHEDULE_ITEMS_ON_STARTUP = True
|
|
SHOULD_BUILD_IMPALA = True
|
|
SHOULD_LOAD_DATA = False
|
|
SHOULD_PULL_DOCKER_IMAGE = False
|
|
DATABASE_NAME = 'tpch_kudu'
|
|
POSTGRES_DATABASE_NAME = 'tpch_kudu'
|
|
|
|
LOG = logging.getLogger('Controller')
|
|
|
|
|
|
class Controller(object):
|
|
'''This class controls the query generator. Generates new schedule_items regularly and
|
|
places them into the schedule directory. Schedule_items can also be generated by other
|
|
means (for example front_end.py), so it checks schedule directory regularly and starts
|
|
running new jobs. It seemed easier and more convenient to implement the scheduling
|
|
mechanism this way, rather than use Jenkins.
|
|
|
|
This is indended to be running on machine dedicated to be running the query generator.
|
|
TARGET_HOST environment variable should be set to the address of the host that will be
|
|
running Impala. The target machine should have Docker installed and configured. Each job
|
|
will be run in a separate Docker container. The Docker Image can be specified by setting
|
|
the DOCKER_IMAGE_NAME environment variable. The Image needs have Postgres installed and
|
|
appropriate data loaded.
|
|
|
|
Attributes:
|
|
schedule_items: Keeps track of active job threads. This maps job id to the thread that
|
|
running it.
|
|
time_last_generated: Stores the time when a schedule was last generated automatically.
|
|
Used to control the rate at which new schedule_items are generated.
|
|
'''
|
|
|
|
def __init__(self):
|
|
self.check_env_vars()
|
|
self.make_local_dirs()
|
|
|
|
self.schedule_items = {}
|
|
self.time_last_generated = 0
|
|
|
|
def make_local_dirs(self):
|
|
'''Create directories for schedule, log and results.
|
|
'''
|
|
if not os.path.exists(PATH_TO_SCHEDULE):
|
|
os.makedirs(PATH_TO_SCHEDULE)
|
|
if DELETE_SCHEDULE_ITEMS_ON_STARTUP:
|
|
for job_id in os.listdir(PATH_TO_SCHEDULE):
|
|
os.remove(os.path.join(PATH_TO_SCHEDULE, job_id))
|
|
if not os.path.exists(PATH_TO_FINISHED_JOBS):
|
|
os.makedirs(PATH_TO_FINISHED_JOBS)
|
|
if not os.path.exists(PATH_TO_REPORTS):
|
|
os.makedirs(PATH_TO_REPORTS)
|
|
try:
|
|
os.remove(PATH_TO_LOG)
|
|
except OSError:
|
|
# Log file could not be removed most likely because it does not exist, so this
|
|
# exception can be ignored.
|
|
pass
|
|
|
|
def check_env_vars(self):
|
|
'''Check if all necessary enivornment variables have been set.'''
|
|
if 'DOCKER_PASSWORD' not in os.environ:
|
|
exit('DOCKER_PASSWORD environment variable not set')
|
|
if 'TARGET_HOST' not in os.environ:
|
|
exit('TARGET_HOST environment variable not set')
|
|
if 'TARGET_HOST_USERNAME' not in os.environ:
|
|
exit('TARGET_HOST_USERNAME environment variable not set')
|
|
if 'DOCKER_IMAGE_NAME' not in os.environ:
|
|
exit('DOCKER_IMAGE_NAME environment variable not set')
|
|
|
|
def start_new_jobs(self):
|
|
'''Check the schedule directory for new items. If a new item is present, start a new
|
|
job (if maximum concurrency level has not been reached). Each job gets it's own
|
|
thread.
|
|
'''
|
|
|
|
finished_jobs = set(os.listdir(PATH_TO_FINISHED_JOBS))
|
|
for job_id in os.listdir(PATH_TO_SCHEDULE):
|
|
# If schedule item is not already running, start running it
|
|
if job_id not in self.schedule_items and job_id not in finished_jobs and len(
|
|
self.schedule_items) < MAX_CONCURRENCY:
|
|
with open(os.path.join(PATH_TO_SCHEDULE, job_id), 'r') as f:
|
|
schedule_item = pickle.load(f)
|
|
job = schedule_item.generate_job()
|
|
thread = Thread(target = job.start, name = job_id)
|
|
thread.daemon = True
|
|
LOG.info('Created Job Thread: {0}'.format(job_id))
|
|
self.schedule_items[job_id] = thread
|
|
thread.start()
|
|
sleep(SLEEP_LENGTH)
|
|
|
|
def generate_schedule_item(self):
|
|
'''Generate a default schedule_item. This method should normally be called every few
|
|
hours.
|
|
'''
|
|
if self.should_generate_new_item():
|
|
profile = ImpalaNestedTypesProfile() if NESTED_TYPES_MODE else DefaultProfile()
|
|
schedule_item = ScheduleItem(
|
|
run_name = '{0}-{1}'.format(strftime(
|
|
"%Y-%b-%d-%H:%M:%S", localtime()), DEFAULT_RUN_NAME),
|
|
query_profile=profile,
|
|
time_limit_sec=RUN_TIME_LIMIT)
|
|
schedule_item.save_pickle()
|
|
self.time_last_generated = time.time()
|
|
LOG.info('Generated Schedule Item')
|
|
sleep(2)
|
|
|
|
def should_generate_new_item(self):
|
|
'''Returns true if a new item should be generated.
|
|
'''
|
|
return time.time() - self.time_last_generated > GENERATION_FREQUENCY
|
|
|
|
def run(self):
|
|
'''Main method for the Controller class. Keeps track of how many threads are alive,
|
|
generates new schedule items and starts running new jobs.
|
|
'''
|
|
while True:
|
|
self.schedule_items = dict([(run_id, thread) for run_id, thread
|
|
in self.schedule_items.items() if self.schedule_items[run_id].isAlive()])
|
|
LOG.info('Number of Active Threads: {0}'.format(len(self.schedule_items)))
|
|
self.generate_schedule_item()
|
|
self.start_new_jobs()
|
|
sleep(SLEEP_LENGTH)
|
|
|
|
if __name__ == '__main__':
|
|
controller = Controller()
|
|
logging.basicConfig(level=logging.INFO,
|
|
filename=PATH_TO_LOG,
|
|
format='%(asctime)s %(threadName)s:%(module)s[%(lineno)s]:%(message)s',
|
|
datefmt='%H:%M:%S')
|
|
controller.run()
|