Files
impala/tests/common/impala_service.py
stiga-huang 73de6517a4 IMPALA-14280: Deflake catalogd HA failover tests
Several tests on catalogd HA failover have a loop of the following
pattern:
 - Do some operations
 - Kills the active catalogd
 - Verifies some results
 - Starts the killed catalogd
After starting the killed catalogd, the test gets the new active and
standby catalogds and check their /healthz pages immediately. This could
fail if the web pages are not registered yet. The cause is when starting
catalogd, we just wait for its 'statestore-subscriber.connected' to be
True. This doesn't guarantee that the web pages are initialized. This
patch adds a wait for this, i.e. when getting the web pages hits 404
(Not Found) error, wait and retry.

Another flaky issue of these failover tests is cleanup unique_database
could fail due to impalad still using the old active catalogd address
even in RPC failure retries (IMPALA-14228). This patch adds a retry on
the DROP DATABASE statement to work around this.

Sets disable_log_buffering to True so the killed catalogd has complete
logs.

Sets catalog_client_connection_num_retries to 2 to save time in
coordinator retrying RPCs to the killed catalogd. This reduce the
duration of test_warmed_up_metadata_failover_catchup from 100s to 50s.

Tests:
 - Ran all (15) failover tests in test_catalogd_ha.py 10 times (each
   round takes 450s).

Change-Id: Iad42a55ed7c357ed98d85c69e16ff705a8cae89d
Reviewed-on: http://gerrit.cloudera.org:8080/23235
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Quanlong Huang <huangquanlong@gmail.com>
2025-08-04 09:12:30 +00:00

638 lines
28 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic object model of a Impala Services (impalad + statestored). Provides a way to
# programatically interact with the services and perform operations such as querying
# the debug webpage, getting metric values, or creating client connections.
from __future__ import absolute_import, division, print_function
from collections import defaultdict
from datetime import datetime
import json
import logging
import os
import re
import socket
import subprocess
from time import sleep, time
import requests
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from tests.common.impala_connection import create_connection, create_ldap_connection
from tests.common.network import to_host_port, CERT_TO_CA_MAP
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
LOG = logging.getLogger('impala_service')
LOG.setLevel(level=logging.DEBUG)
WEBSERVER_USERNAME = os.environ.get('IMPALA_WEBSERVER_USERNAME', None)
WEBSERVER_PASSWORD = os.environ.get('IMPALA_WEBSERVER_PASSWORD', None)
# Base class for all Impala services
# TODO: Refactor the retry/timeout logic into a common place.
class BaseImpalaService(object):
def __init__(self, hostname, webserver_interface, webserver_port,
webserver_certificate_file, ssl_client_ca_certificate_file):
self.hostname = hostname
self.webserver_interface = webserver_interface
if webserver_interface == "":
# If no webserver interface was specified, just use the hostname.
self.webserver_interface = hostname
self.webserver_port = webserver_port
self.webserver_certificate_file = webserver_certificate_file
self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file
self.webserver_username_password = None
if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None:
self.webserver_username_password = (WEBSERVER_USERNAME, WEBSERVER_PASSWORD)
def open_debug_webpage(self, page_name, timeout=10, interval=1):
start_time = time()
while (time() - start_time < timeout):
try:
protocol = "http"
if self.webserver_certificate_file != "":
protocol = "https"
host_port = to_host_port(self.webserver_interface, self.webserver_port)
url = "%s://%s/%s" % (protocol, host_port, page_name)
cert = self.webserver_certificate_file
# Instead of cert use its CA cert if available.
file_part = cert.split("/")[-1]
if file_part in CERT_TO_CA_MAP:
cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part])
return requests.get(url, verify=cert, auth=self.webserver_username_password)
except Exception as e:
LOG.info("Debug webpage not yet available: %s", str(e))
sleep(interval)
assert 0, 'Debug webpage did not become available in expected time.'
def read_debug_webpage(self, page_name, timeout=10, interval=1):
return self.open_debug_webpage(page_name, timeout=timeout, interval=interval).text
def get_debug_webpage_json(self, page_name, timeout=10, interval=1):
"""Returns the json for the given Impala debug webpage, eg. '/queries'"""
return json.loads(self.read_debug_webpage(
page_name + "?json", timeout=timeout, interval=interval))
def dump_debug_webpage_json(self, page_name, filename):
"""Dumps the json for a given Impalad debug webpage to the specified file.
Prints the JSON with indenting to be somewhat human readable."""
debug_json = self.get_debug_webpage_json(page_name)
with open(filename, "w") as json_out:
json.dump(debug_json, json_out, indent=2)
def get_metric_value(self, metric_name, default_value=None):
"""Returns the value of the the given metric name from the Impala debug webpage"""
return self.get_metric_values([metric_name], [default_value])[0]
def get_metric_values(self, metric_names, default_values=None):
"""Returns the value of the given metrics from the Impala debug webpage. If
default_values is provided and a metric is not present, the default value
is returned instead."""
if default_values is None:
default_values = [None for m in metric_names]
assert len(metric_names) == len(default_values)
metrics = self.get_debug_webpage_json('jsonmetrics')
return [metrics.get(metric_name, default_value)
for metric_name, default_value in zip(metric_names, default_values)]
def get_flag_current_value(self, flag):
"""Returns the value of the the given flag name from the Impala /varz debug webpage.
If the flag does not exist it returns None."""
varz = self.get_debug_webpage_json("varz")
for var in varz.get("flags"):
if var["name"] == flag:
return var["current"]
return None
def get_flag_current_values(self):
"""Returns the value of all flags from the Impala /varz debug webpage."""
flags = dict()
varz = self.get_debug_webpage_json("varz")
for var in varz.get("flags"):
flags[var['name']] = var['current']
return flags
def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interval=1,
allow_greater=False):
start_time = time()
total_wait = 0
while (total_wait < timeout):
LOG.info("Getting metric: %s from %s:%s" %
(metric_name, self.hostname, self.webserver_port))
value = None
try:
value = self.get_metric_value(metric_name)
except Exception as e:
LOG.error(e)
# if allow_greater is True we wait until the metric value becomes >= the expected
# value.
if (value == expected_value) or (allow_greater and value >= expected_value):
LOG.info("Metric '{0}' has reached desired value: {1}. total_wait: {2}s".format(
metric_name, value, total_wait))
return value
else:
LOG.info("Waiting for metric value '{0}'{1}{2}. Current value: {3}. "
"total_wait: {4}s".format(
metric_name, ('>=' if allow_greater else '='), expected_value, value,
total_wait))
LOG.info("Sleeping %ds before next retry." % interval)
sleep(interval)
total_wait = time() - start_time
LOG.info("Metric {0} did not reach value {1} in {2}s. Actual value was '{3}'. "
"total_wait: {4}s. Failing...".format(
metric_name, expected_value, timeout, value, total_wait))
self.__metric_timeout_assert(metric_name, expected_value, timeout, value)
def wait_for_page_ready(self, page_name, timeout=10, interval=1):
start_time = time()
total_wait = 0
while (total_wait < timeout):
response = self.open_debug_webpage(page_name)
total_wait = time() - start_time
if response.status_code == requests.codes.not_found:
LOG.info(
"Debug webpage {}:{}/{} not yet available. Sleeping {}s before next retry"
.format(self.webserver_interface, self.webserver_port, page_name,
interval))
sleep(interval)
continue
LOG.info(
"Debug webpage {}:{}/{} is ready. total_wait: {}s"
.format(self.webserver_interface, self.webserver_port, page_name, total_wait))
return
def __request_minidump(self, pid):
"""
Impala processes (impalad, catalogd, statestored) have a signal handler for
SIGUSR1 that dumps a minidump. This sends a SIGUSR1 to the specified pid
to trigger a minidump. Note that this will not work for processes that don't
implement a handler for SIGUSR1. This does not wait for the minidump to be
generated, so callers will need to consider this.
"""
cmd = ["kill", "-SIGUSR1", pid]
subprocess.check_output(cmd)
def __metric_timeout_assert(self, metric_name, expected_value, timeout, actual_value):
"""
Helper function to dump diagnostic information for debugging a metric timeout and
then assert.
"""
impala_home = os.environ["IMPALA_HOME"]
log_dir = os.environ["IMPALA_LOGS_DIR"]
# Create a diagnostic directory to contain all the relevent information
datetime_string = datetime.now().strftime("%Y%m%d_%H:%M:%S")
diag_dir = os.path.join(log_dir, "metric_timeout_diags_{0}".format(datetime_string))
if not os.path.exists(diag_dir):
os.makedirs(diag_dir)
# Read debug pages from the Web UI and dump them to files in the diagnostic
# directory
json_dir = os.path.join(diag_dir, "json")
if not os.path.exists(json_dir):
os.makedirs(json_dir)
json_diag_string = "Dumping debug webpages in JSON format...\n"
debug_pages = ["memz", "metrics", "queries", "sessions", "threadz", "rpcz"]
for debug_page in debug_pages:
json_filename = os.path.join(json_dir, "{0}.json".format(debug_page))
self.dump_debug_webpage_json(debug_page, json_filename)
json_filename_rewritten = json_filename.replace(impala_home, "$IMPALA_HOME")
json_diag_string += \
"Dumped {0} JSON to {1}\n".format(debug_page, json_filename_rewritten)
# Requests a minidump for each running impalad/catalogd. The minidump will be
# written to the processes's minidump_path. For simplicity, we leave it there,
# as it will be preserved along with everything else in the log directory.
impalad_pids = subprocess.check_output(["pgrep", "impalad"],
universal_newlines=True).split("\n")[:-1]
catalogd_pids = subprocess.check_output(["pgrep", "catalogd"],
universal_newlines=True).split("\n")[:-1]
minidump_diag_string = "Dumping minidumps for impalads/catalogds...\n"
for pid in impalad_pids:
self.__request_minidump(pid)
minidump_diag_string += "Dumped minidump for Impalad PID {0}\n".format(pid)
for pid in catalogd_pids:
self.__request_minidump(pid)
minidump_diag_string += "Dumped minidump for Catalogd PID {0}\n".format(pid)
# This is not a critical path, so sleep 30 seconds to be sure that the minidumps
# have been written.
sleep(30)
# Now, fire the assert with the information about the dumps. This provides
# information about where to find the json files and other diagnostics.
# This is in the logs directory, so it should be packed up along with everything
# else for automated jobs.
assert_string = \
"Metric {0} did not reach value {1} in {2}s. Actual value was '{3}'.\n" \
.format(metric_name, expected_value, timeout, actual_value)
assert_string += json_diag_string
assert_string += minidump_diag_string
assert 0, assert_string
def get_catalog_object_dump(self, object_type, object_name):
""" Gets the web-page for the given 'object_type' and 'object_name'."""
return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %
(object_type, object_name))
def get_catalog_objects(self, excludes=['_impala_builtins']):
""" Returns a dictionary containing all catalog objects. Each entry's key is the fully
qualified object name and the value is a tuple of the form (type, version).
Does not return databases listed in the 'excludes' list."""
catalog = self.get_debug_webpage_json('catalog')
objects = {}
for db_desc in catalog["databases"]:
db_name = db_desc["name"]
if db_name in excludes:
continue
db = self.get_catalog_object_dump('DATABASE', db_name)
objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db))
for table_desc in db_desc["tables"]:
table_name = table_desc["fqtn"]
table = self.get_catalog_object_dump('TABLE', table_name)
objects[table_name] = ('TABLE', self.extract_catalog_object_version(table))
return objects
def extract_catalog_object_version(self, thrift_txt):
""" Extracts and returns the version of the catalog object's 'thrift_txt'
representation."""
result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt)
assert result, 'Unable to find catalog version in object: ' + thrift_txt
return int(result.group(1))
# Allows for interacting with an Impalad instance to perform operations such as creating
# new connections or accessing the debug webpage.
class ImpaladService(BaseImpalaService):
def __init__(self, hostname, webserver_interface="", external_interface="",
webserver_port=25000, beeswax_port=21000, krpc_port=27000, hs2_port=21050,
hs2_http_port=28000, webserver_certificate_file="",
ssl_client_ca_certificate_file=""):
super(ImpaladService, self).__init__(
hostname, webserver_interface, webserver_port, webserver_certificate_file,
ssl_client_ca_certificate_file)
self.external_interface = external_interface if external_interface else hostname
self.beeswax_port = beeswax_port
self.krpc_port = krpc_port
self.hs2_port = hs2_port
self.hs2_http_port = hs2_http_port
def get_num_known_live_executors(self, timeout=30, interval=1,
include_shutting_down=True):
return self.get_num_known_live_backends(timeout=timeout,
interval=interval,
include_shutting_down=include_shutting_down,
only_executors=True)
def get_num_known_live_backends(self, timeout=30, interval=1,
include_shutting_down=True, only_coordinators=False, only_executors=False):
LOG.debug("Getting num_known_live_backends from %s:%s" %
(self.hostname, self.webserver_port))
result = self.get_debug_webpage_json('backends', timeout, interval)
count = 0
for backend in result['backends']:
if backend['is_quiescing'] and not include_shutting_down:
continue
if only_coordinators and not backend['is_coordinator']:
continue
if only_executors and not backend['is_executor']:
continue
count += 1
return count
def get_executor_groups(self, timeout=30, interval=1):
"""Returns a mapping from executor group name to a list of all KRPC endpoints of a
group's executors."""
LOG.debug("Getting executor groups from %s:%s" %
(self.hostname, self.webserver_port))
result = self.get_debug_webpage_json('backends', timeout, interval)
groups = defaultdict(list)
for backend in result['backends']:
groups[backend['executor_groups']].append(backend['krpc_address'])
return groups
def get_queries_json(self, timeout=30, interval=1):
"""Return the full JSON from the /queries page."""
return json.loads(
self.read_debug_webpage('queries?json', timeout=timeout, interval=interval))
def get_query_locations(self):
# Returns a dictionary of the format <host_address, num_of_queries_running_there>
result = self.get_queries_json()
if result['query_locations'] is not None:
return dict([(loc["location"], loc["count"]) for loc in result['query_locations']])
return None
def get_in_flight_queries(self, timeout=30, interval=1):
"""Returns the number of in flight queries."""
return self.get_queries_json(timeout=timeout, interval=interval)['in_flight_queries']
def get_completed_queries(self, timeout=30, interval=1):
"""Returns the number of completed queries."""
result = self.get_debug_webpage_json('queries', timeout, interval)
return result['completed_queries']
def _get_pool_counter(self, pool_name, counter_name, timeout=30, interval=1):
"""Returns the value of the field 'counter_name' in pool 'pool_name' or 0 if the pool
doesn't exist."""
result = self.get_debug_webpage_json('admission', timeout, interval)
pools = result['resource_pools']
for pool in pools:
if pool['pool_name'] == pool_name:
return pool[counter_name]
return 0
def get_num_queued_queries(self, pool_name, timeout=30, interval=1):
"""Returns the number of queued queries in pool 'pool_name' or 0 if the pool doesn't
exist."""
return self._get_pool_counter(pool_name, 'agg_num_queued', timeout, interval)
def get_total_admitted_queries(self, pool_name, timeout=30, interval=1):
"""Returns the total number of queries that have been admitted to pool 'pool_name' or
0 if the pool doesn't exist."""
return self._get_pool_counter(pool_name, 'total_admitted', timeout, interval)
def get_num_running_queries(self, pool_name, timeout=30, interval=1):
"""Returns the number of queries currently running in pool 'pool_name' or 0 if the
pool doesn't exist."""
return self._get_pool_counter(pool_name, 'agg_num_running', timeout, interval)
def get_num_in_flight_queries(self, timeout=30, interval=1):
LOG.info("Getting num_in_flight_queries from %s:%s" %
(self.hostname, self.webserver_port))
result = self.read_debug_webpage('inflight_query_ids?raw', timeout, interval)
return None if result is None else len(
[line for line in result.split('\n') if line])
def wait_for_num_in_flight_queries(self, expected_val, timeout=10):
"""Waits for the number of in-flight queries to reach a certain value"""
start_time = time()
while (time() - start_time < timeout):
num_in_flight_queries = self.get_num_in_flight_queries()
if num_in_flight_queries == expected_val: return True
sleep(1)
LOG.info("The number of in flight queries: %s, expected: %s" %
(num_in_flight_queries, expected_val))
return False
def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1,
include_shutting_down=True, early_abort_fn=lambda: False):
"""Poll the debug web server until the number of backends known by this service
reaches 'expected_value'. 'early_abort_fn' is called periodically and can
throw an exception if polling should be aborted early."""
start_time = time()
while (time() - start_time < timeout):
early_abort_fn()
value = None
try:
value = self.get_num_known_live_backends(timeout=1, interval=interval,
include_shutting_down=include_shutting_down)
except Exception as e:
LOG.error(e)
if value == expected_value:
LOG.info("num_known_live_backends has reached value: %s" % value)
return value
else:
LOG.info("Waiting for num_known_live_backends=%s. Current value: %s" %
(expected_value, value))
sleep(interval)
assert 0, 'num_known_live_backends did not reach expected value in time'
def read_query_profile_page(self, query_id, timeout=10, interval=1):
"""Fetches the raw contents of the query's runtime profile webpage.
Fails an assertion if Impala's webserver is unavailable or the query's
profile page doesn't exist."""
return self.read_debug_webpage(
"query_profile?query_id=%s&raw" % (query_id), timeout=timeout, interval=interval)
def get_query_status(self, query_id, timeout=10, interval=1):
"""Gets the 'Query Status' section of the query's runtime profile."""
page = self.read_query_profile_page(query_id, timeout=timeout, interval=interval)
status_line =\
next((x for x in page.split('\n') if re.search('Query Status:', x)), None)
return status_line.split('Query Status:')[1].strip()
def wait_for_query_state(self, client, query_handle, target_state,
timeout=10, interval=1):
"""Keeps polling for the query's state using client in the given interval until
the query's state reaches the target state or the given timeout has been reached."""
start_time = time()
while (time() - start_time < timeout):
try:
query_state = client.get_state(query_handle)
except Exception as e:
LOG.error("Exception while getting state of query {0}\n{1}".format(
client.handle_id(query_handle), str(e)))
if query_state == target_state:
return
sleep(interval)
assert target_state == query_state, \
'Query {0} did not reach query state in time target={1} actual={2}'.format(
client.handle_id(query_handle), target_state, query_state)
return
def wait_for_query_status(self, query_id, expected_content, timeout=30, interval=1):
"""Polls for the query's status in the query profile web page to contain the
specified content. Returns False if the timeout was reached before a successful
match, True otherwise."""
start_time = time()
query_status = ""
while (time() - start_time < timeout):
try:
query_status = self.get_query_status(
query_id, timeout=timeout, interval=interval)
if query_status is None:
raise Exception("Could not find 'Query Status' section in profile of "
"query with id {}".format(query_id))
except Exception:
pass
if expected_content in query_status:
return True
sleep(interval)
return False
def use_ssl_for_clients(self):
return self.ssl_client_ca_certificate_file != ""
def is_port_open(self, host, port):
try:
sock = socket.create_connection((host, port), timeout=1)
sock.close()
return True
except Exception:
return False
def webserver_port_is_open(self):
return self.is_port_open(self.webserver_interface, self.webserver_port)
def create_beeswax_client(self, use_kerberos=False):
"""Creates a new beeswax client connection to the impalad.
DEPRECATED: Use create_hs2_client() instead."""
client = create_connection(to_host_port(self.external_interface, self.beeswax_port),
use_kerberos, BEESWAX, use_ssl=self.use_ssl_for_clients())
client.connect()
return client
def beeswax_port_is_open(self):
"""Test if the beeswax port is open. Does not need to authenticate."""
# Check if the port is open first to avoid chatty logging of Thrift connection.
if not self.is_port_open(self.external_interface, self.beeswax_port): return False
try:
# The beeswax client will connect successfully even if not authenticated.
client = self.create_beeswax_client()
client.close()
return True
except Exception as e:
return False
def create_ldap_beeswax_client(self, user, password, use_ssl=False):
client = create_ldap_connection(to_host_port(self.hostname, self.beeswax_port),
user=user, password=password, use_ssl=use_ssl)
client.connect()
return client
def create_hs2_client(self, user=None):
"""Creates a new HS2 client connection to the impalad"""
client = create_connection('%s:%d' % (self.external_interface, self.hs2_port),
protocol=HS2, user=user,
use_ssl=self.use_ssl_for_clients())
client.connect()
return client
def hs2_port_is_open(self):
"""Test if the HS2 port is open. Does not need to authenticate."""
# Check if the port is open first to avoid chatty logging of Thrift connection.
if not self.is_port_open(self.external_interface, self.hs2_port): return False
# Impyla will try to authenticate as part of connecting, so preserve previous logic
# that uses the HS2 thrift code directly.
try:
sock = TSocket(self.external_interface, self.hs2_port)
transport = TBufferedTransport(sock)
transport.open()
transport.close()
return True
except Exception as e:
LOG.info(e)
return False
def hs2_http_port_is_open(self):
# Only check if the port is open, do not create Thrift transport.
return self.is_port_open(self.external_interface, self.hs2_http_port)
def create_client(self, protocol):
"""Creates a new client connection for given protocol to this impalad"""
port = self.hs2_port
if protocol == HS2_HTTP:
port = self.hs2_http_port
if protocol == BEESWAX:
LOG.warning('beeswax protocol is deprecated.')
port = self.beeswax_port
client = create_connection(to_host_port(self.external_interface, port),
protocol=protocol)
client.connect()
return client
def create_client_from_vector(self, vector):
"""A shorthand for create_client with test vector as input.
Vector must have 'protocol' and 'exec_option' dimension.
Return a client of specified 'protocol' and with cofiguration 'exec_option' set."""
client = self.create_client(protocol=vector.get_value('protocol'))
client.set_configuration(vector.get_exec_option_dict())
return client
# Allows for interacting with the StateStore service to perform operations such as
# accessing the debug webpage.
class StateStoredService(BaseImpalaService):
def __init__(self, hostname, webserver_interface, webserver_port,
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
super(StateStoredService, self).__init__(
hostname, webserver_interface, webserver_port, webserver_certificate_file,
ssl_client_ca_certificate_file)
self.service_port = service_port
def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1):
self.wait_for_metric_value('statestore.live-backends', num_subscribers,
timeout=timeout, interval=interval)
def get_statestore_service_port(self):
return self.service_port
# Allows for interacting with the Catalog service to perform operations such as
# accessing the debug webpage.
class CatalogdService(BaseImpalaService):
def __init__(self, hostname, webserver_interface, webserver_port,
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
super(CatalogdService, self).__init__(
hostname, webserver_interface, webserver_port, webserver_certificate_file,
ssl_client_ca_certificate_file)
self.service_port = service_port
def get_catalog_version(self, timeout=10, interval=1):
""" Gets catalogd's latest catalog version. Retry for 'timeout'
seconds, sleeping 'interval' seconds between tries. If the
version cannot be obtained, this method fails."""
start_time = time()
while (time() - start_time < timeout):
try:
info = self.get_debug_webpage_json('catalog')
if "version" in info: return info['version']
except Exception:
LOG.info('Catalogd version not yet available.')
sleep(interval)
assert False, 'Catalog version not ready in expected time.'
def get_catalog_service_port(self):
return self.service_port
def verify_table_metadata_loaded(self, db, table, expect_loaded=True, timeout_s=10):
page_url = "table_metrics?json&name=%s.%s" % (db, table)
start_time = time()
while (time() - start_time < timeout_s):
response = self.open_debug_webpage(page_url)
assert response.status_code == requests.codes.ok
response_json = json.loads(response.text)
assert "table_metrics" in response_json
table_metrics = response_json["table_metrics"]
if expect_loaded:
if "Table not yet loaded" not in table_metrics:
return
LOG.info("Table {0}.{1} is not loaded".format(db, table))
else:
if "Table not yet loaded" in table_metrics:
return
LOG.info("Table {0}.{1} is loaded".format(db, table))
sleep(1)
assert False, "Timeout waiting table {0}.{1} to be {2}".format(
db, table, "loaded" if expect_loaded else "not loaded")
class AdmissiondService(BaseImpalaService):
def __init__(self, hostname, webserver_interface, webserver_port,
webserver_certificate_file, ssl_client_ca_certificate_file):
super(AdmissiondService, self).__init__(
hostname, webserver_interface, webserver_port, webserver_certificate_file,
ssl_client_ca_certificate_file)