mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Several tests on catalogd HA failover have a loop of the following pattern: - Do some operations - Kills the active catalogd - Verifies some results - Starts the killed catalogd After starting the killed catalogd, the test gets the new active and standby catalogds and check their /healthz pages immediately. This could fail if the web pages are not registered yet. The cause is when starting catalogd, we just wait for its 'statestore-subscriber.connected' to be True. This doesn't guarantee that the web pages are initialized. This patch adds a wait for this, i.e. when getting the web pages hits 404 (Not Found) error, wait and retry. Another flaky issue of these failover tests is cleanup unique_database could fail due to impalad still using the old active catalogd address even in RPC failure retries (IMPALA-14228). This patch adds a retry on the DROP DATABASE statement to work around this. Sets disable_log_buffering to True so the killed catalogd has complete logs. Sets catalog_client_connection_num_retries to 2 to save time in coordinator retrying RPCs to the killed catalogd. This reduce the duration of test_warmed_up_metadata_failover_catchup from 100s to 50s. Tests: - Ran all (15) failover tests in test_catalogd_ha.py 10 times (each round takes 450s). Change-Id: Iad42a55ed7c357ed98d85c69e16ff705a8cae89d Reviewed-on: http://gerrit.cloudera.org:8080/23235 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Quanlong Huang <huangquanlong@gmail.com>
638 lines
28 KiB
Python
638 lines
28 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Basic object model of a Impala Services (impalad + statestored). Provides a way to
|
|
# programatically interact with the services and perform operations such as querying
|
|
# the debug webpage, getting metric values, or creating client connections.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
from time import sleep, time
|
|
|
|
import requests
|
|
from thrift.transport.TSocket import TSocket
|
|
from thrift.transport.TTransport import TBufferedTransport
|
|
|
|
from tests.common.impala_connection import create_connection, create_ldap_connection
|
|
from tests.common.network import to_host_port, CERT_TO_CA_MAP
|
|
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
|
|
|
|
LOG = logging.getLogger('impala_service')
|
|
LOG.setLevel(level=logging.DEBUG)
|
|
|
|
WEBSERVER_USERNAME = os.environ.get('IMPALA_WEBSERVER_USERNAME', None)
|
|
WEBSERVER_PASSWORD = os.environ.get('IMPALA_WEBSERVER_PASSWORD', None)
|
|
|
|
|
|
# Base class for all Impala services
|
|
# TODO: Refactor the retry/timeout logic into a common place.
|
|
class BaseImpalaService(object):
|
|
def __init__(self, hostname, webserver_interface, webserver_port,
|
|
webserver_certificate_file, ssl_client_ca_certificate_file):
|
|
self.hostname = hostname
|
|
self.webserver_interface = webserver_interface
|
|
if webserver_interface == "":
|
|
# If no webserver interface was specified, just use the hostname.
|
|
self.webserver_interface = hostname
|
|
self.webserver_port = webserver_port
|
|
self.webserver_certificate_file = webserver_certificate_file
|
|
self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file
|
|
self.webserver_username_password = None
|
|
if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None:
|
|
self.webserver_username_password = (WEBSERVER_USERNAME, WEBSERVER_PASSWORD)
|
|
|
|
def open_debug_webpage(self, page_name, timeout=10, interval=1):
|
|
start_time = time()
|
|
|
|
while (time() - start_time < timeout):
|
|
try:
|
|
protocol = "http"
|
|
if self.webserver_certificate_file != "":
|
|
protocol = "https"
|
|
host_port = to_host_port(self.webserver_interface, self.webserver_port)
|
|
url = "%s://%s/%s" % (protocol, host_port, page_name)
|
|
cert = self.webserver_certificate_file
|
|
# Instead of cert use its CA cert if available.
|
|
file_part = cert.split("/")[-1]
|
|
if file_part in CERT_TO_CA_MAP:
|
|
cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part])
|
|
return requests.get(url, verify=cert, auth=self.webserver_username_password)
|
|
except Exception as e:
|
|
LOG.info("Debug webpage not yet available: %s", str(e))
|
|
sleep(interval)
|
|
assert 0, 'Debug webpage did not become available in expected time.'
|
|
|
|
def read_debug_webpage(self, page_name, timeout=10, interval=1):
|
|
return self.open_debug_webpage(page_name, timeout=timeout, interval=interval).text
|
|
|
|
def get_debug_webpage_json(self, page_name, timeout=10, interval=1):
|
|
"""Returns the json for the given Impala debug webpage, eg. '/queries'"""
|
|
return json.loads(self.read_debug_webpage(
|
|
page_name + "?json", timeout=timeout, interval=interval))
|
|
|
|
def dump_debug_webpage_json(self, page_name, filename):
|
|
"""Dumps the json for a given Impalad debug webpage to the specified file.
|
|
Prints the JSON with indenting to be somewhat human readable."""
|
|
debug_json = self.get_debug_webpage_json(page_name)
|
|
with open(filename, "w") as json_out:
|
|
json.dump(debug_json, json_out, indent=2)
|
|
|
|
def get_metric_value(self, metric_name, default_value=None):
|
|
"""Returns the value of the the given metric name from the Impala debug webpage"""
|
|
return self.get_metric_values([metric_name], [default_value])[0]
|
|
|
|
def get_metric_values(self, metric_names, default_values=None):
|
|
"""Returns the value of the given metrics from the Impala debug webpage. If
|
|
default_values is provided and a metric is not present, the default value
|
|
is returned instead."""
|
|
if default_values is None:
|
|
default_values = [None for m in metric_names]
|
|
assert len(metric_names) == len(default_values)
|
|
metrics = self.get_debug_webpage_json('jsonmetrics')
|
|
return [metrics.get(metric_name, default_value)
|
|
for metric_name, default_value in zip(metric_names, default_values)]
|
|
|
|
def get_flag_current_value(self, flag):
|
|
"""Returns the value of the the given flag name from the Impala /varz debug webpage.
|
|
If the flag does not exist it returns None."""
|
|
varz = self.get_debug_webpage_json("varz")
|
|
for var in varz.get("flags"):
|
|
if var["name"] == flag:
|
|
return var["current"]
|
|
return None
|
|
|
|
def get_flag_current_values(self):
|
|
"""Returns the value of all flags from the Impala /varz debug webpage."""
|
|
flags = dict()
|
|
varz = self.get_debug_webpage_json("varz")
|
|
for var in varz.get("flags"):
|
|
flags[var['name']] = var['current']
|
|
return flags
|
|
|
|
def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interval=1,
|
|
allow_greater=False):
|
|
start_time = time()
|
|
total_wait = 0
|
|
while (total_wait < timeout):
|
|
LOG.info("Getting metric: %s from %s:%s" %
|
|
(metric_name, self.hostname, self.webserver_port))
|
|
value = None
|
|
try:
|
|
value = self.get_metric_value(metric_name)
|
|
except Exception as e:
|
|
LOG.error(e)
|
|
|
|
# if allow_greater is True we wait until the metric value becomes >= the expected
|
|
# value.
|
|
if (value == expected_value) or (allow_greater and value >= expected_value):
|
|
LOG.info("Metric '{0}' has reached desired value: {1}. total_wait: {2}s".format(
|
|
metric_name, value, total_wait))
|
|
return value
|
|
else:
|
|
LOG.info("Waiting for metric value '{0}'{1}{2}. Current value: {3}. "
|
|
"total_wait: {4}s".format(
|
|
metric_name, ('>=' if allow_greater else '='), expected_value, value,
|
|
total_wait))
|
|
LOG.info("Sleeping %ds before next retry." % interval)
|
|
sleep(interval)
|
|
total_wait = time() - start_time
|
|
|
|
LOG.info("Metric {0} did not reach value {1} in {2}s. Actual value was '{3}'. "
|
|
"total_wait: {4}s. Failing...".format(
|
|
metric_name, expected_value, timeout, value, total_wait))
|
|
self.__metric_timeout_assert(metric_name, expected_value, timeout, value)
|
|
|
|
def wait_for_page_ready(self, page_name, timeout=10, interval=1):
|
|
start_time = time()
|
|
total_wait = 0
|
|
while (total_wait < timeout):
|
|
response = self.open_debug_webpage(page_name)
|
|
total_wait = time() - start_time
|
|
if response.status_code == requests.codes.not_found:
|
|
LOG.info(
|
|
"Debug webpage {}:{}/{} not yet available. Sleeping {}s before next retry"
|
|
.format(self.webserver_interface, self.webserver_port, page_name,
|
|
interval))
|
|
sleep(interval)
|
|
continue
|
|
LOG.info(
|
|
"Debug webpage {}:{}/{} is ready. total_wait: {}s"
|
|
.format(self.webserver_interface, self.webserver_port, page_name, total_wait))
|
|
return
|
|
|
|
def __request_minidump(self, pid):
|
|
"""
|
|
Impala processes (impalad, catalogd, statestored) have a signal handler for
|
|
SIGUSR1 that dumps a minidump. This sends a SIGUSR1 to the specified pid
|
|
to trigger a minidump. Note that this will not work for processes that don't
|
|
implement a handler for SIGUSR1. This does not wait for the minidump to be
|
|
generated, so callers will need to consider this.
|
|
"""
|
|
cmd = ["kill", "-SIGUSR1", pid]
|
|
subprocess.check_output(cmd)
|
|
|
|
def __metric_timeout_assert(self, metric_name, expected_value, timeout, actual_value):
|
|
"""
|
|
Helper function to dump diagnostic information for debugging a metric timeout and
|
|
then assert.
|
|
"""
|
|
impala_home = os.environ["IMPALA_HOME"]
|
|
log_dir = os.environ["IMPALA_LOGS_DIR"]
|
|
|
|
# Create a diagnostic directory to contain all the relevent information
|
|
datetime_string = datetime.now().strftime("%Y%m%d_%H:%M:%S")
|
|
diag_dir = os.path.join(log_dir, "metric_timeout_diags_{0}".format(datetime_string))
|
|
if not os.path.exists(diag_dir):
|
|
os.makedirs(diag_dir)
|
|
|
|
# Read debug pages from the Web UI and dump them to files in the diagnostic
|
|
# directory
|
|
json_dir = os.path.join(diag_dir, "json")
|
|
if not os.path.exists(json_dir):
|
|
os.makedirs(json_dir)
|
|
json_diag_string = "Dumping debug webpages in JSON format...\n"
|
|
debug_pages = ["memz", "metrics", "queries", "sessions", "threadz", "rpcz"]
|
|
for debug_page in debug_pages:
|
|
json_filename = os.path.join(json_dir, "{0}.json".format(debug_page))
|
|
self.dump_debug_webpage_json(debug_page, json_filename)
|
|
json_filename_rewritten = json_filename.replace(impala_home, "$IMPALA_HOME")
|
|
json_diag_string += \
|
|
"Dumped {0} JSON to {1}\n".format(debug_page, json_filename_rewritten)
|
|
|
|
# Requests a minidump for each running impalad/catalogd. The minidump will be
|
|
# written to the processes's minidump_path. For simplicity, we leave it there,
|
|
# as it will be preserved along with everything else in the log directory.
|
|
impalad_pids = subprocess.check_output(["pgrep", "impalad"],
|
|
universal_newlines=True).split("\n")[:-1]
|
|
catalogd_pids = subprocess.check_output(["pgrep", "catalogd"],
|
|
universal_newlines=True).split("\n")[:-1]
|
|
minidump_diag_string = "Dumping minidumps for impalads/catalogds...\n"
|
|
for pid in impalad_pids:
|
|
self.__request_minidump(pid)
|
|
minidump_diag_string += "Dumped minidump for Impalad PID {0}\n".format(pid)
|
|
for pid in catalogd_pids:
|
|
self.__request_minidump(pid)
|
|
minidump_diag_string += "Dumped minidump for Catalogd PID {0}\n".format(pid)
|
|
|
|
# This is not a critical path, so sleep 30 seconds to be sure that the minidumps
|
|
# have been written.
|
|
sleep(30)
|
|
|
|
# Now, fire the assert with the information about the dumps. This provides
|
|
# information about where to find the json files and other diagnostics.
|
|
# This is in the logs directory, so it should be packed up along with everything
|
|
# else for automated jobs.
|
|
assert_string = \
|
|
"Metric {0} did not reach value {1} in {2}s. Actual value was '{3}'.\n" \
|
|
.format(metric_name, expected_value, timeout, actual_value)
|
|
assert_string += json_diag_string
|
|
assert_string += minidump_diag_string
|
|
assert 0, assert_string
|
|
|
|
def get_catalog_object_dump(self, object_type, object_name):
|
|
""" Gets the web-page for the given 'object_type' and 'object_name'."""
|
|
return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %
|
|
(object_type, object_name))
|
|
|
|
def get_catalog_objects(self, excludes=['_impala_builtins']):
|
|
""" Returns a dictionary containing all catalog objects. Each entry's key is the fully
|
|
qualified object name and the value is a tuple of the form (type, version).
|
|
Does not return databases listed in the 'excludes' list."""
|
|
catalog = self.get_debug_webpage_json('catalog')
|
|
objects = {}
|
|
for db_desc in catalog["databases"]:
|
|
db_name = db_desc["name"]
|
|
if db_name in excludes:
|
|
continue
|
|
db = self.get_catalog_object_dump('DATABASE', db_name)
|
|
objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db))
|
|
for table_desc in db_desc["tables"]:
|
|
table_name = table_desc["fqtn"]
|
|
table = self.get_catalog_object_dump('TABLE', table_name)
|
|
objects[table_name] = ('TABLE', self.extract_catalog_object_version(table))
|
|
return objects
|
|
|
|
def extract_catalog_object_version(self, thrift_txt):
|
|
""" Extracts and returns the version of the catalog object's 'thrift_txt'
|
|
representation."""
|
|
result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt)
|
|
assert result, 'Unable to find catalog version in object: ' + thrift_txt
|
|
return int(result.group(1))
|
|
|
|
|
|
# Allows for interacting with an Impalad instance to perform operations such as creating
|
|
# new connections or accessing the debug webpage.
|
|
class ImpaladService(BaseImpalaService):
|
|
def __init__(self, hostname, webserver_interface="", external_interface="",
|
|
webserver_port=25000, beeswax_port=21000, krpc_port=27000, hs2_port=21050,
|
|
hs2_http_port=28000, webserver_certificate_file="",
|
|
ssl_client_ca_certificate_file=""):
|
|
super(ImpaladService, self).__init__(
|
|
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
|
ssl_client_ca_certificate_file)
|
|
self.external_interface = external_interface if external_interface else hostname
|
|
self.beeswax_port = beeswax_port
|
|
self.krpc_port = krpc_port
|
|
self.hs2_port = hs2_port
|
|
self.hs2_http_port = hs2_http_port
|
|
|
|
def get_num_known_live_executors(self, timeout=30, interval=1,
|
|
include_shutting_down=True):
|
|
return self.get_num_known_live_backends(timeout=timeout,
|
|
interval=interval,
|
|
include_shutting_down=include_shutting_down,
|
|
only_executors=True)
|
|
|
|
def get_num_known_live_backends(self, timeout=30, interval=1,
|
|
include_shutting_down=True, only_coordinators=False, only_executors=False):
|
|
LOG.debug("Getting num_known_live_backends from %s:%s" %
|
|
(self.hostname, self.webserver_port))
|
|
result = self.get_debug_webpage_json('backends', timeout, interval)
|
|
count = 0
|
|
for backend in result['backends']:
|
|
if backend['is_quiescing'] and not include_shutting_down:
|
|
continue
|
|
if only_coordinators and not backend['is_coordinator']:
|
|
continue
|
|
if only_executors and not backend['is_executor']:
|
|
continue
|
|
count += 1
|
|
return count
|
|
|
|
def get_executor_groups(self, timeout=30, interval=1):
|
|
"""Returns a mapping from executor group name to a list of all KRPC endpoints of a
|
|
group's executors."""
|
|
LOG.debug("Getting executor groups from %s:%s" %
|
|
(self.hostname, self.webserver_port))
|
|
result = self.get_debug_webpage_json('backends', timeout, interval)
|
|
groups = defaultdict(list)
|
|
for backend in result['backends']:
|
|
groups[backend['executor_groups']].append(backend['krpc_address'])
|
|
return groups
|
|
|
|
def get_queries_json(self, timeout=30, interval=1):
|
|
"""Return the full JSON from the /queries page."""
|
|
return json.loads(
|
|
self.read_debug_webpage('queries?json', timeout=timeout, interval=interval))
|
|
|
|
def get_query_locations(self):
|
|
# Returns a dictionary of the format <host_address, num_of_queries_running_there>
|
|
result = self.get_queries_json()
|
|
if result['query_locations'] is not None:
|
|
return dict([(loc["location"], loc["count"]) for loc in result['query_locations']])
|
|
return None
|
|
|
|
def get_in_flight_queries(self, timeout=30, interval=1):
|
|
"""Returns the number of in flight queries."""
|
|
return self.get_queries_json(timeout=timeout, interval=interval)['in_flight_queries']
|
|
|
|
def get_completed_queries(self, timeout=30, interval=1):
|
|
"""Returns the number of completed queries."""
|
|
result = self.get_debug_webpage_json('queries', timeout, interval)
|
|
return result['completed_queries']
|
|
|
|
def _get_pool_counter(self, pool_name, counter_name, timeout=30, interval=1):
|
|
"""Returns the value of the field 'counter_name' in pool 'pool_name' or 0 if the pool
|
|
doesn't exist."""
|
|
result = self.get_debug_webpage_json('admission', timeout, interval)
|
|
pools = result['resource_pools']
|
|
for pool in pools:
|
|
if pool['pool_name'] == pool_name:
|
|
return pool[counter_name]
|
|
return 0
|
|
|
|
def get_num_queued_queries(self, pool_name, timeout=30, interval=1):
|
|
"""Returns the number of queued queries in pool 'pool_name' or 0 if the pool doesn't
|
|
exist."""
|
|
return self._get_pool_counter(pool_name, 'agg_num_queued', timeout, interval)
|
|
|
|
def get_total_admitted_queries(self, pool_name, timeout=30, interval=1):
|
|
"""Returns the total number of queries that have been admitted to pool 'pool_name' or
|
|
0 if the pool doesn't exist."""
|
|
return self._get_pool_counter(pool_name, 'total_admitted', timeout, interval)
|
|
|
|
def get_num_running_queries(self, pool_name, timeout=30, interval=1):
|
|
"""Returns the number of queries currently running in pool 'pool_name' or 0 if the
|
|
pool doesn't exist."""
|
|
return self._get_pool_counter(pool_name, 'agg_num_running', timeout, interval)
|
|
|
|
def get_num_in_flight_queries(self, timeout=30, interval=1):
|
|
LOG.info("Getting num_in_flight_queries from %s:%s" %
|
|
(self.hostname, self.webserver_port))
|
|
result = self.read_debug_webpage('inflight_query_ids?raw', timeout, interval)
|
|
return None if result is None else len(
|
|
[line for line in result.split('\n') if line])
|
|
|
|
def wait_for_num_in_flight_queries(self, expected_val, timeout=10):
|
|
"""Waits for the number of in-flight queries to reach a certain value"""
|
|
start_time = time()
|
|
while (time() - start_time < timeout):
|
|
num_in_flight_queries = self.get_num_in_flight_queries()
|
|
if num_in_flight_queries == expected_val: return True
|
|
sleep(1)
|
|
LOG.info("The number of in flight queries: %s, expected: %s" %
|
|
(num_in_flight_queries, expected_val))
|
|
return False
|
|
|
|
def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1,
|
|
include_shutting_down=True, early_abort_fn=lambda: False):
|
|
"""Poll the debug web server until the number of backends known by this service
|
|
reaches 'expected_value'. 'early_abort_fn' is called periodically and can
|
|
throw an exception if polling should be aborted early."""
|
|
start_time = time()
|
|
while (time() - start_time < timeout):
|
|
early_abort_fn()
|
|
value = None
|
|
try:
|
|
value = self.get_num_known_live_backends(timeout=1, interval=interval,
|
|
include_shutting_down=include_shutting_down)
|
|
except Exception as e:
|
|
LOG.error(e)
|
|
if value == expected_value:
|
|
LOG.info("num_known_live_backends has reached value: %s" % value)
|
|
return value
|
|
else:
|
|
LOG.info("Waiting for num_known_live_backends=%s. Current value: %s" %
|
|
(expected_value, value))
|
|
sleep(interval)
|
|
assert 0, 'num_known_live_backends did not reach expected value in time'
|
|
|
|
def read_query_profile_page(self, query_id, timeout=10, interval=1):
|
|
"""Fetches the raw contents of the query's runtime profile webpage.
|
|
Fails an assertion if Impala's webserver is unavailable or the query's
|
|
profile page doesn't exist."""
|
|
return self.read_debug_webpage(
|
|
"query_profile?query_id=%s&raw" % (query_id), timeout=timeout, interval=interval)
|
|
|
|
def get_query_status(self, query_id, timeout=10, interval=1):
|
|
"""Gets the 'Query Status' section of the query's runtime profile."""
|
|
page = self.read_query_profile_page(query_id, timeout=timeout, interval=interval)
|
|
status_line =\
|
|
next((x for x in page.split('\n') if re.search('Query Status:', x)), None)
|
|
return status_line.split('Query Status:')[1].strip()
|
|
|
|
def wait_for_query_state(self, client, query_handle, target_state,
|
|
timeout=10, interval=1):
|
|
"""Keeps polling for the query's state using client in the given interval until
|
|
the query's state reaches the target state or the given timeout has been reached."""
|
|
start_time = time()
|
|
while (time() - start_time < timeout):
|
|
try:
|
|
query_state = client.get_state(query_handle)
|
|
except Exception as e:
|
|
LOG.error("Exception while getting state of query {0}\n{1}".format(
|
|
client.handle_id(query_handle), str(e)))
|
|
if query_state == target_state:
|
|
return
|
|
sleep(interval)
|
|
assert target_state == query_state, \
|
|
'Query {0} did not reach query state in time target={1} actual={2}'.format(
|
|
client.handle_id(query_handle), target_state, query_state)
|
|
return
|
|
|
|
def wait_for_query_status(self, query_id, expected_content, timeout=30, interval=1):
|
|
"""Polls for the query's status in the query profile web page to contain the
|
|
specified content. Returns False if the timeout was reached before a successful
|
|
match, True otherwise."""
|
|
start_time = time()
|
|
query_status = ""
|
|
while (time() - start_time < timeout):
|
|
try:
|
|
query_status = self.get_query_status(
|
|
query_id, timeout=timeout, interval=interval)
|
|
if query_status is None:
|
|
raise Exception("Could not find 'Query Status' section in profile of "
|
|
"query with id {}".format(query_id))
|
|
except Exception:
|
|
pass
|
|
if expected_content in query_status:
|
|
return True
|
|
sleep(interval)
|
|
return False
|
|
|
|
def use_ssl_for_clients(self):
|
|
return self.ssl_client_ca_certificate_file != ""
|
|
|
|
def is_port_open(self, host, port):
|
|
try:
|
|
sock = socket.create_connection((host, port), timeout=1)
|
|
sock.close()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def webserver_port_is_open(self):
|
|
return self.is_port_open(self.webserver_interface, self.webserver_port)
|
|
|
|
def create_beeswax_client(self, use_kerberos=False):
|
|
"""Creates a new beeswax client connection to the impalad.
|
|
DEPRECATED: Use create_hs2_client() instead."""
|
|
client = create_connection(to_host_port(self.external_interface, self.beeswax_port),
|
|
use_kerberos, BEESWAX, use_ssl=self.use_ssl_for_clients())
|
|
client.connect()
|
|
return client
|
|
|
|
def beeswax_port_is_open(self):
|
|
"""Test if the beeswax port is open. Does not need to authenticate."""
|
|
# Check if the port is open first to avoid chatty logging of Thrift connection.
|
|
if not self.is_port_open(self.external_interface, self.beeswax_port): return False
|
|
|
|
try:
|
|
# The beeswax client will connect successfully even if not authenticated.
|
|
client = self.create_beeswax_client()
|
|
client.close()
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
def create_ldap_beeswax_client(self, user, password, use_ssl=False):
|
|
client = create_ldap_connection(to_host_port(self.hostname, self.beeswax_port),
|
|
user=user, password=password, use_ssl=use_ssl)
|
|
client.connect()
|
|
return client
|
|
|
|
def create_hs2_client(self, user=None):
|
|
"""Creates a new HS2 client connection to the impalad"""
|
|
client = create_connection('%s:%d' % (self.external_interface, self.hs2_port),
|
|
protocol=HS2, user=user,
|
|
use_ssl=self.use_ssl_for_clients())
|
|
client.connect()
|
|
return client
|
|
|
|
def hs2_port_is_open(self):
|
|
"""Test if the HS2 port is open. Does not need to authenticate."""
|
|
# Check if the port is open first to avoid chatty logging of Thrift connection.
|
|
if not self.is_port_open(self.external_interface, self.hs2_port): return False
|
|
|
|
# Impyla will try to authenticate as part of connecting, so preserve previous logic
|
|
# that uses the HS2 thrift code directly.
|
|
try:
|
|
sock = TSocket(self.external_interface, self.hs2_port)
|
|
transport = TBufferedTransport(sock)
|
|
transport.open()
|
|
transport.close()
|
|
return True
|
|
except Exception as e:
|
|
LOG.info(e)
|
|
return False
|
|
|
|
def hs2_http_port_is_open(self):
|
|
# Only check if the port is open, do not create Thrift transport.
|
|
return self.is_port_open(self.external_interface, self.hs2_http_port)
|
|
|
|
def create_client(self, protocol):
|
|
"""Creates a new client connection for given protocol to this impalad"""
|
|
port = self.hs2_port
|
|
if protocol == HS2_HTTP:
|
|
port = self.hs2_http_port
|
|
if protocol == BEESWAX:
|
|
LOG.warning('beeswax protocol is deprecated.')
|
|
port = self.beeswax_port
|
|
client = create_connection(to_host_port(self.external_interface, port),
|
|
protocol=protocol)
|
|
client.connect()
|
|
return client
|
|
|
|
def create_client_from_vector(self, vector):
|
|
"""A shorthand for create_client with test vector as input.
|
|
Vector must have 'protocol' and 'exec_option' dimension.
|
|
Return a client of specified 'protocol' and with cofiguration 'exec_option' set."""
|
|
client = self.create_client(protocol=vector.get_value('protocol'))
|
|
client.set_configuration(vector.get_exec_option_dict())
|
|
return client
|
|
|
|
|
|
# Allows for interacting with the StateStore service to perform operations such as
|
|
# accessing the debug webpage.
|
|
class StateStoredService(BaseImpalaService):
|
|
def __init__(self, hostname, webserver_interface, webserver_port,
|
|
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
|
|
super(StateStoredService, self).__init__(
|
|
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
|
ssl_client_ca_certificate_file)
|
|
self.service_port = service_port
|
|
|
|
def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1):
|
|
self.wait_for_metric_value('statestore.live-backends', num_subscribers,
|
|
timeout=timeout, interval=interval)
|
|
|
|
def get_statestore_service_port(self):
|
|
return self.service_port
|
|
|
|
|
|
# Allows for interacting with the Catalog service to perform operations such as
|
|
# accessing the debug webpage.
|
|
class CatalogdService(BaseImpalaService):
|
|
def __init__(self, hostname, webserver_interface, webserver_port,
|
|
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
|
|
super(CatalogdService, self).__init__(
|
|
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
|
ssl_client_ca_certificate_file)
|
|
self.service_port = service_port
|
|
|
|
def get_catalog_version(self, timeout=10, interval=1):
|
|
""" Gets catalogd's latest catalog version. Retry for 'timeout'
|
|
seconds, sleeping 'interval' seconds between tries. If the
|
|
version cannot be obtained, this method fails."""
|
|
start_time = time()
|
|
while (time() - start_time < timeout):
|
|
try:
|
|
info = self.get_debug_webpage_json('catalog')
|
|
if "version" in info: return info['version']
|
|
except Exception:
|
|
LOG.info('Catalogd version not yet available.')
|
|
sleep(interval)
|
|
assert False, 'Catalog version not ready in expected time.'
|
|
|
|
def get_catalog_service_port(self):
|
|
return self.service_port
|
|
|
|
def verify_table_metadata_loaded(self, db, table, expect_loaded=True, timeout_s=10):
|
|
page_url = "table_metrics?json&name=%s.%s" % (db, table)
|
|
start_time = time()
|
|
while (time() - start_time < timeout_s):
|
|
response = self.open_debug_webpage(page_url)
|
|
assert response.status_code == requests.codes.ok
|
|
response_json = json.loads(response.text)
|
|
assert "table_metrics" in response_json
|
|
table_metrics = response_json["table_metrics"]
|
|
if expect_loaded:
|
|
if "Table not yet loaded" not in table_metrics:
|
|
return
|
|
LOG.info("Table {0}.{1} is not loaded".format(db, table))
|
|
else:
|
|
if "Table not yet loaded" in table_metrics:
|
|
return
|
|
LOG.info("Table {0}.{1} is loaded".format(db, table))
|
|
sleep(1)
|
|
assert False, "Timeout waiting table {0}.{1} to be {2}".format(
|
|
db, table, "loaded" if expect_loaded else "not loaded")
|
|
|
|
class AdmissiondService(BaseImpalaService):
|
|
def __init__(self, hostname, webserver_interface, webserver_port,
|
|
webserver_certificate_file, ssl_client_ca_certificate_file):
|
|
super(AdmissiondService, self).__init__(
|
|
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
|
ssl_client_ca_certificate_file)
|