Files
impala/tests/statestore/test_statestore.py
Riza Suminto 28cff4022d IMPALA-14333: Run impala-py.test using Python3
Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true
reveals some tests that require adjustment. This patch made such
adjustment, which mostly revolves around encoding differences and string
vs bytes type in Python3. This patch also switch the default to run
pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The
following are the details:

Change hash() function in conftest.py to crc32() to produce
deterministic hash. Hash randomization is enabled by default since
Python 3.3 (see
https://docs.python.org/3/reference/datamodel.html#object.__hash__).
This cause test sharding (like --shard_tests=1/2) produce inconsistent
set of tests per shard. Always restart minicluster during custom cluster
tests if --shard_tests argument is set, because test order may change
and affect test correctness, depending on whether running on fresh
minicluster or not.

Moved one test case from delimited-latin-text.test to
test_delimited_text.py for easier binary comparison.

Add bytes_to_str() as a utility function to decode bytes in Python3.
This is often needed when inspecting the return value of
subprocess.check_output() as a string.

Implement DataTypeMetaclass.__lt__ to substitute
DataTypeMetaclass.__cmp__ that is ignored in Python3 (see
https://peps.python.org/pep-0207/).

Fix WEB_CERT_ERR difference in test_ipv6.py.

Fix trivial integer parsing in test_restart_services.py.

Fix various encoding issues in test_saml2_sso.py,
test_shell_commandline.py, and test_shell_interactive.py.

Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1.

Switch to binary comparison in test_iceberg.py where needed.

Specify text mode when calling tempfile.NamedTemporaryFile().

Simplify create_impala_shell_executable_dimension to skip testing dev
and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason
is that several UTF-8 related tests in test_shell_commandline.py break
in Python3 pytest + Python2 impala-shell combo. This skipping already
happen automatically in build OS without system Python2 available like
RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty).

Removed unused vector argument and fixed some trivial flake8 issues.

Several test logic require modification due to intermittent issue in
Python3 pytest. These include:

Add _run_query_with_client() in test_ranger.py to allow reusing a single
Impala client for running several queries. Ensure clients are closed
when the test is done. Mark several tests in test_ranger.py with

SkipIfFS.hive because they run queries through beeline + HiveServer2,
but Ozone and S3 build environment does not start HiveServer2 by
default.

Increase the sleep period from 0.1 to 0.5 seconds per iteration in
test_statestore.py and mark TestStatestore to execute serially. This is
because TServer appears to shut down more slowly when run concurrently
with other tests. Handle the deprecation of Thread.setDaemon() as well.

Always force_restart=True each test method in TestLoggingCore,
TestShellInteractiveReconnect, and TestQueryRetries to prevent them from
reusing minicluster from previous test method. Some of these tests
destruct minicluster (kill impalad) and will produce minidump if metrics
verifier for next tests fail to detect healthy minicluster state.

Testing:
Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true.

Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4
Reviewed-on: http://gerrit.cloudera.org:8080/23319
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-03 10:01:29 +00:00

904 lines
38 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from collections import defaultdict
import json
import logging
import socket
import sys
import threading
import time
import traceback
import uuid
import pytest
from builtins import range
from thrift.protocol import TBinaryProtocol
from thrift.server.TServer import TServer
from thrift.transport import TSocket, TTransport
from impala_thrift_gen.ErrorCodes.ttypes import TErrorCode
import impala_thrift_gen.StatestoreService.StatestoreService as Statestore
import impala_thrift_gen.StatestoreService.StatestoreSubscriber as Subscriber
from impala_thrift_gen.StatestoreService.StatestoreSubscriber import (
TTopicRegistration,
TUpdateStateResponse,
)
from impala_thrift_gen.Status.ttypes import TStatus
from impala_thrift_gen.Types.ttypes import TNetworkAddress
from tests.common.base_test_suite import BaseTestSuite
from tests.common.environ import build_flavor_timeout
from tests.common.skip import SkipIfDockerizedCluster
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
LOG = logging.getLogger('test_statestore')
# Tests for the statestore. The StatestoreSubscriber class is a skeleton implementation of
# a Python-based statestore subscriber with additional hooks to allow testing. Each
# StatestoreSubscriber runs its own server so that the statestore may contact it.
#
# All tests in this file may be run in parallel. They assume that a statestore instance is
# already running, and is configured with out-of-the-box defaults (as is the case in our
# usual test environment) which govern failure-detector timeouts etc.
#
# These tests do not yet provide sufficient coverage.
# If no topic entries, do the first and second subscribers always get a callback?
# Adding topic entries to non-existant topic
# Test for from_version and to_version behavior
# Test with many concurrent subscribers
# Test that only the subscribed-to topics are sent
# Test that topic deletions take effect correctly.
def get_statestore_subscribers(host='localhost', port=25010):
response = urlopen("http://{0}:{1}/subscribers?json".format(host, port))
page = response.read()
return json.loads(page)
STATUS_OK = TStatus(TErrorCode.OK)
DEFAULT_UPDATE_STATE_RESPONSE = TUpdateStateResponse(status=STATUS_OK, topic_updates=[],
skipped=False)
# IMPALA-3501: the timeout needs to be higher in code coverage builds
WAIT_FOR_FAILURE_TIMEOUT = build_flavor_timeout(40, code_coverage_build_timeout=60)
WAIT_FOR_HEARTBEAT_TIMEOUT = build_flavor_timeout(
40, code_coverage_build_timeout=60)
WAIT_FOR_UPDATE_TIMEOUT = build_flavor_timeout(40, code_coverage_build_timeout=60)
class WildcardServerSocket(TSocket.TSocketBase, TTransport.TServerTransportBase):
"""Specialised server socket that binds to a random port at construction"""
def __init__(self, host=None, port=0):
self.host = host
self.handle = None
self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.handle.bind(('localhost', 0))
_, self.port = self.handle.getsockname()
def listen(self):
self.handle.listen(128)
def accept(self):
client, addr = self.handle.accept()
result = TSocket.TSocket()
result.setHandle(client)
return result
class KillableThreadedServer(TServer):
"""Based on TServer.TThreadedServer, this server may be shutdown (by calling
shutdown()), after which no new connections may be made. Most of the implementation is
directly copied from Thrift."""
def __init__(self, *args, **kwargs):
TServer.__init__(self, *args)
self.daemon = kwargs.get("daemon", False)
self.is_shutdown = False
self.port = self.serverTransport.port
def shutdown(self):
LOG.info('Server localhost:{} is shutting down'.format(self.port))
self.is_shutdown = True
self.serverTransport.close()
self.wait_until_down()
# The processor contains a reference to a StatestoreSubscriber. Clean up that
# reference to avoid a circular reference that would prevent object deletion.
self.processor = None
def wait_until_up(self, num_tries=10):
for i in range(num_tries):
cnxn = TSocket.TSocket('localhost', self.port)
try:
cnxn.open()
LOG.info('Server localhost:{} is up'.format(cnxn.port))
return
except Exception:
if i == num_tries - 1: raise
time.sleep(0.5)
def wait_until_down(self, num_tries=10):
for i in range(num_tries):
cnxn = TSocket.TSocket('localhost', self.port)
try:
cnxn.open()
except Exception:
LOG.info('Server localhost:{} is down'.format(cnxn.port))
return
time.sleep(0.5)
raise Exception("Server localhost:{} did not stop".format(cnxn.port))
def serve(self):
self.serverTransport.listen()
while not self.is_shutdown:
client = self.serverTransport.accept()
# Since accept() can take a while, check again if the server is shutdown to avoid
# starting an unnecessary thread.
if self.is_shutdown: return
t = None
if sys.version_info.major < 3:
t = threading.Thread(target=self.handle, args=(client,))
t.setDaemon(True)
else:
t = threading.Thread(target=self.handle, args=(client,), daemon=self.daemon)
t.start()
def handle(self, client):
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while not self.is_shutdown:
self.processor.process(iprot, oprot)
except TTransport.TTransportException:
pass
except Exception as x:
print(x)
itrans.close()
otrans.close()
class StatestoreSubscriber(object):
"""A bare-bones subscriber skeleton. Tests should create a new StatestoreSubscriber(),
call start() and then register(). The subscriber will run a Thrift server on an unused
port, and after registration the statestore will call Heartbeat() and UpdateState() via
RPC. Tests can provide callbacks to the constructor that will be called during those
RPCs, and this is the easiest way to check that the statestore protocol is being
correctly followed. Tests should use wait_for_* methods to confirm that some event (like
an RPC call) has happened asynchronously.
Since RPC callbacks will execute on a different thread from the main one, any assertions
there will not trigger a test failure without extra plumbing. What we do is simple: any
exceptions during an RPC are caught and stored, and the check_thread_exceptions() method
will re-raise them.
The methods that may be called by a test deliberately return 'self' to allow for
chaining, see test_failure_detected() for an example of how this makes the test flow
more readable."""
def __init__(self, heartbeat_cb=None, update_cb=None):
self.heartbeat_event, self.heartbeat_count = threading.Condition(), 0
# Track the number of updates received per topic.
self.update_counts = defaultdict(lambda: 0)
# Variables to notify for updates on each topic.
self.update_event = threading.Condition()
self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
self.exception = None
self.server = None
self.server_thread = None
self.client_transport = None
def __enter__(self):
return self
def __exit__(self, *args):
self.kill()
self.wait_for_failure()
def Heartbeat(self, args):
"""Heartbeat RPC handler. Calls heartbeat callback if one exists."""
self.heartbeat_event.acquire()
try:
self.heartbeat_count += 1
response = Subscriber.THeartbeatResponse(status=STATUS_OK)
if self.heartbeat_cb is not None and self.exception is None:
try:
response = self.heartbeat_cb(self, args)
except Exception as e:
self.exception = e
self.heartbeat_event.notify()
finally:
self.heartbeat_event.release()
return response
def UpdateState(self, args):
"""UpdateState RPC handler. Calls update callback if one exists."""
self.update_event.acquire()
try:
for topic_name in args.topic_deltas: self.update_counts[topic_name] += 1
response = DEFAULT_UPDATE_STATE_RESPONSE
if self.update_cb is not None and self.exception is None:
try:
response = self.update_cb(self, args)
except Exception as e:
# Print the original backtrace so it doesn't get lost.
traceback.print_exc()
self.exception = e
self.update_event.notify()
finally:
self.update_event.release()
return response
def __init_server(self):
LOG.info('Initializing server')
processor = Subscriber.Processor(self)
transport = WildcardServerSocket()
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
self.server = KillableThreadedServer(processor, transport, tfactory, pfactory,
daemon=True)
if sys.version_info.major < 3:
self.server_thread = threading.Thread(target=self.server.serve)
self.server_thread.setDaemon(True)
else:
self.server_thread = threading.Thread(target=self.server.serve, daemon=True)
self.server_thread.start()
self.server.wait_until_up()
self.port = self.server.port
def __init_client(self):
LOG.info('Initializing client')
self.client_transport = \
TTransport.TBufferedTransport(TSocket.TSocket('localhost', 24000))
self.protocol = TBinaryProtocol.TBinaryProtocol(self.client_transport)
self.client = Statestore.Client(self.protocol)
self.client_transport.open()
def check_thread_exceptions(self):
"""Checks if an exception was raised and stored in a callback thread"""
if self.exception is not None: raise self.exception
def kill(self):
"""Closes both the server and client sockets, and waits for the server to become
unavailable"""
if self.client_transport:
self.client_transport.close()
if self.server:
self.server.shutdown()
return self
def start(self):
"""Starts a subscriber server, and opens a client to the statestore. Returns only when
the server is running."""
self.__init_server()
self.__init_client()
return self
def register(self, topics=None):
"""Call the Register() RPC"""
if topics is None: topics = []
request = Subscriber.TRegisterSubscriberRequest(
topic_registrations=topics,
subscriber_location=TNetworkAddress("localhost", self.port),
subscriber_type=Subscriber.TStatestoreSubscriberType.COORDINATOR_EXECUTOR,
subscriber_id=self.subscriber_id)
response = self.client.RegisterSubscriber(request)
if response.status.status_code == TErrorCode.OK:
self.registration_id = response.registration_id
else:
raise Exception("Registration failed: %s, %s" %
(response.status.status_code,
'\n'.join(response.status.error_msgs)))
return self
def wait_for_heartbeat(self, count=None):
"""Waits for some number of heartbeats. If 'count' is provided, waits until the number
of heartbeats seen by this subscriber exceeds count, otherwise waits for one further
heartbeat."""
self.heartbeat_event.acquire()
try:
if count is not None and self.heartbeat_count >= count: return self
if count is None: count = self.heartbeat_count + 1
while count > self.heartbeat_count:
self.check_thread_exceptions()
last_count = self.heartbeat_count
self.heartbeat_event.wait(WAIT_FOR_HEARTBEAT_TIMEOUT)
if last_count == self.heartbeat_count:
raise Exception(
"Heartbeat not received within {0}s (heartbeat count: {1})".format(
WAIT_FOR_HEARTBEAT_TIMEOUT, self.heartbeat_count))
self.check_thread_exceptions()
return self
finally:
self.heartbeat_event.release()
def wait_for_update(self, topic_name, count=None):
"""Waits for some number of updates of 'topic_name'. If 'count' is provided, waits
until the number updates seen by this subscriber exceeds count, otherwise waits
for one further update."""
self.update_event.acquire()
start_time = time.time()
try:
if count is not None and self.update_counts[topic_name] >= count: return self
if count is None: count = self.update_counts[topic_name] + 1
while count > self.update_counts[topic_name]:
self.check_thread_exceptions()
last_count = self.update_counts[topic_name]
self.update_event.wait(WAIT_FOR_UPDATE_TIMEOUT)
if (time.time() > start_time + WAIT_FOR_UPDATE_TIMEOUT
and last_count == self.update_counts[topic_name]):
raise Exception(
"Update not received for {0} within {1} (update count: {2})".format(
topic_name, WAIT_FOR_UPDATE_TIMEOUT, last_count))
self.check_thread_exceptions()
return self
finally:
self.update_event.release()
def wait_for_failure(self, timeout=WAIT_FOR_FAILURE_TIMEOUT):
"""Waits until this subscriber no longer appears in the statestore's subscriber
list. If 'timeout' seconds pass, throws an exception."""
start = time.time()
while True:
subs = [s["id"] for s in get_statestore_subscribers()["subscribers"]]
if self.subscriber_id not in subs: return self
if time.time() - start > timeout:
raise Exception("Subscriber {0} did not fail in {1}s".format(
self.subscriber_id, timeout))
time.sleep(0.2)
@pytest.mark.execute_serially
@SkipIfDockerizedCluster.statestore_not_exposed
class TestStatestore(BaseTestSuite):
def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
num_updates=1, clear_topic_entries=False):
topic_entries = [
Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
for x in range(num_updates)]
return Subscriber.TTopicDelta(topic_name=topic_name,
topic_entries=topic_entries,
is_delta=False,
clear_topic_entries=clear_topic_entries)
def test_registration_ids_different(self):
"""Test that if a subscriber with the same id registers twice, the registration ID is
different"""
with StatestoreSubscriber() as sub:
sub.start().register()
old_reg_id = sub.registration_id
sub.register()
assert old_reg_id != sub.registration_id
def test_receive_heartbeats(self):
"""Smoke test to confirm that heartbeats get sent to a correctly registered
subscriber"""
with StatestoreSubscriber() as sub:
sub.start().register().wait_for_heartbeat(5)
def test_receive_updates(self):
"""Test that updates are correctly received when a subscriber alters a topic"""
topic_name = "topic_delta_%s" % uuid.uuid1()
def topic_update_correct(sub, args):
delta = self.make_topic_update(topic_name)
update_count = sub.update_counts[topic_name]
if topic_name not in args.topic_deltas:
# The update doesn't contain our topic.
pass
elif update_count == 1:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
skipped=False)
elif update_count == 2:
assert len(args.topic_deltas) == 1, args.topic_deltas
assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
assert args.topic_deltas[topic_name].topic_name == delta.topic_name
elif update_count == 3:
# After the content-bearing update was processed, the next delta should be empty
assert len(args.topic_deltas[topic_name].topic_entries) == 0
return DEFAULT_UPDATE_STATE_RESPONSE
with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 3)
)
def test_filter_prefix(self):
topic_name = "topic_delta_%s" % uuid.uuid1()
def topic_update_correct(sub, args):
foo_delta = self.make_topic_update(topic_name, num_updates=1)
bar_delta = self.make_topic_update(topic_name, num_updates=2, key_template='bar')
update_count = sub.update_counts[topic_name]
if topic_name not in args.topic_deltas:
# The update doesn't contain our topic.
pass
elif update_count == 1:
# Send some values with both prefixes.
return TUpdateStateResponse(status=STATUS_OK,
topic_updates=[foo_delta, bar_delta],
skipped=False)
elif update_count == 2:
# We should only get the 'bar' entries back.
assert len(args.topic_deltas) == 1, args.topic_deltas
assert args.topic_deltas[topic_name].topic_entries == bar_delta.topic_entries
assert args.topic_deltas[topic_name].topic_name == bar_delta.topic_name
elif update_count == 3:
# Send some more updates that only have 'foo' prefixes.
return TUpdateStateResponse(status=STATUS_OK,
topic_updates=[foo_delta],
skipped=False)
elif update_count == 4:
# We shouldn't see any entries from the above update, but we should still see
# the version number change due to the new entries in the topic.
assert len(args.topic_deltas[topic_name].topic_entries) == 0
assert args.topic_deltas[topic_name].from_version == 3
assert args.topic_deltas[topic_name].to_version == 4
elif update_count == 5:
# After the content-bearing update was processed, the next delta should be empty
assert len(args.topic_deltas[topic_name].topic_entries) == 0
assert args.topic_deltas[topic_name].from_version == 4
assert args.topic_deltas[topic_name].to_version == 4
return DEFAULT_UPDATE_STATE_RESPONSE
with StatestoreSubscriber(update_cb=topic_update_correct) as sub:
reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
filter_prefix="bar")
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 5)
)
def test_update_is_delta(self):
"""Test that the 'is_delta' flag is correctly set. The first update for a topic should
always not be a delta, and so should all subsequent updates until the subscriber says
it has not skipped the update."""
topic_name = "test_update_is_delta_%s" % uuid.uuid1()
def check_delta(sub, args):
update_count = sub.update_counts[topic_name]
if topic_name not in args.topic_deltas:
# The update doesn't contain our topic.
pass
elif update_count == 1:
assert args.topic_deltas[topic_name].is_delta is False
delta = self.make_topic_update(topic_name)
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
skipped=False)
elif update_count == 2:
assert args.topic_deltas[topic_name].is_delta is False
elif update_count == 3:
assert args.topic_deltas[topic_name].is_delta is True
assert len(args.topic_deltas[topic_name].topic_entries) == 0
assert args.topic_deltas[topic_name].to_version == 1
return DEFAULT_UPDATE_STATE_RESPONSE
with StatestoreSubscriber(update_cb=check_delta) as sub:
reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 3)
)
def test_skipped(self):
"""Test that skipping an update causes it to be resent"""
topic_name = "test_skipped_%s" % uuid.uuid1()
def check_skipped(sub, args):
# Ignore responses that don't contain our topic.
if topic_name not in args.topic_deltas: return DEFAULT_UPDATE_STATE_RESPONSE
update_count = sub.update_counts[topic_name]
if update_count == 1:
update = self.make_topic_update(topic_name)
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[update],
skipped=False)
# All subsequent updates: set skipped=True and expected the full topic to be resent
# every time
assert args.topic_deltas[topic_name].is_delta is False
assert len(args.topic_deltas[topic_name].topic_entries) == 1
return TUpdateStateResponse(status=STATUS_OK, skipped=True)
with StatestoreSubscriber(update_cb=check_skipped) as sub:
reg = TTopicRegistration(topic_name=topic_name, is_transient=False)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 3)
)
def test_failure_detected(self):
with StatestoreSubscriber() as sub:
topic_name = "test_failure_detected"
reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 1)
.kill()
.wait_for_failure()
)
def test_hung_heartbeat(self):
"""Test for IMPALA-1712: If heartbeats hang (which we simulate by sleeping for five
minutes) the statestore should time them out every 3s and then eventually fail after
40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) as sub:
topic_name = "test_hung_heartbeat"
reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 1)
.wait_for_failure(timeout=60)
)
def test_intermittent_hung_heartbeats(self):
"""Heartbeats that occasionally time out should not cause a failure to be detected."""
heartbeat_count = [0] # Use array to allow mutating from inside callback.
def heartbeat_cb(sub, args):
heartbeat_count[0] += 1
# Delay every second heartbeat.
if (heartbeat_count[0] % 2 == 1):
time.sleep(4)
return Subscriber.THeartbeatResponse(status=STATUS_OK)
with StatestoreSubscriber(heartbeat_cb=heartbeat_cb) as sub:
topic_name = "test_intermittent_hung_heartbeats"
reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
(
sub.start()
.register(topics=[reg])
.wait_for_update(topic_name, 30)
.kill()
.wait_for_failure()
)
def test_slow_subscriber(self):
"""Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for multiple
intervals of about 1 second each, this lets the heartbeats to the subscriber fail.
It polls the subscribers page of the statestore to ensure that the
'secs_since_heartbeat' field is updated with an acceptable value. This test only
checks for a strictly increasing value since the actual value of time might depend
on the system load. It stops polling the page once the subscriber is removed from
the set of active subscribers. It also checks that a valid heartbeat record of the
subscriber is found at least once."""
sub = StatestoreSubscriber()
sub.start().register().wait_for_heartbeat(1)
sub.kill()
# secs_since_heartbeat is initially unknown.
secs_since_heartbeat = -1
valid_heartbeat_record = False
while secs_since_heartbeat != 0:
sleep_start_time = time.time()
while time.time() - sleep_start_time < 1:
time.sleep(0.1)
prev_secs_since_heartbeat = secs_since_heartbeat
secs_since_heartbeat = 0
subscribers = get_statestore_subscribers()["subscribers"]
for s in subscribers:
if str(s["id"]) == sub.subscriber_id:
secs_since_heartbeat = float(s["secs_since_heartbeat"])
assert (secs_since_heartbeat > prev_secs_since_heartbeat)
valid_heartbeat_record = True
assert valid_heartbeat_record
def test_topic_persistence(self):
"""Test that persistent topic entries survive subscriber failure, but transent topic
entries are erased when the associated subscriber fails"""
topic_id = str(uuid.uuid1())
persistent_topic_name = "test_topic_persistence_persistent_%s" % topic_id
transient_topic_name = "test_topic_persistence_transient_%s" % topic_id
def add_entries(sub, args):
# None of, one or both of the topics may be in the update.
updates = []
if (persistent_topic_name in args.topic_deltas
and sub.update_counts[persistent_topic_name] == 1):
updates.append(self.make_topic_update(persistent_topic_name))
if (transient_topic_name in args.topic_deltas
and sub.update_counts[transient_topic_name] == 1):
updates.append(self.make_topic_update(transient_topic_name))
if len(updates) > 0:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
skipped=False)
return DEFAULT_UPDATE_STATE_RESPONSE
def check_entries(sub, args):
# None of, one or both of the topics may be in the update.
if (persistent_topic_name in args.topic_deltas
and sub.update_counts[persistent_topic_name] == 1):
assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
# Statestore should not send deletions when the update is not a delta, see
# IMPALA-1891
assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted is False
if (transient_topic_name in args.topic_deltas
and sub.update_counts[persistent_topic_name] == 1):
assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
return DEFAULT_UPDATE_STATE_RESPONSE
reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
TTopicRegistration(topic_name=transient_topic_name, is_transient=True)]
with StatestoreSubscriber(update_cb=add_entries) as sub:
(
sub.start()
.register(topics=reg)
.wait_for_update(persistent_topic_name, 2)
.wait_for_update(transient_topic_name, 2)
.kill()
.wait_for_failure()
)
with StatestoreSubscriber(update_cb=check_entries) as sub2:
(
sub2.start()
.register(topics=reg)
.wait_for_update(persistent_topic_name, 1)
.wait_for_update(transient_topic_name, 1)
)
def test_update_with_clear_entries_flag(self):
"""Test that the statestore clears all topic entries when a subscriber
sets the clear_topic_entries flag in a topic update message (IMPALA-6948)."""
topic_name = "test_topic_%s" % str(uuid.uuid1())
def add_entries(sub, args):
updates = []
if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 1):
updates.append(self.make_topic_update(topic_name, num_updates=2,
key_template="old"))
if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 2):
updates.append(self.make_topic_update(topic_name, num_updates=1,
key_template="new", clear_topic_entries=True))
if len(updates) > 0:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
skipped=False)
return DEFAULT_UPDATE_STATE_RESPONSE
def check_entries(sub, args):
if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 1):
assert len(args.topic_deltas[topic_name].topic_entries) == 1
assert args.topic_deltas[topic_name].topic_entries[0].key == "new0"
return DEFAULT_UPDATE_STATE_RESPONSE
reg = [TTopicRegistration(topic_name=topic_name, is_transient=False)]
with StatestoreSubscriber(update_cb=add_entries) as sub1:
(
sub1.start()
.register(topics=reg)
.wait_for_update(topic_name, 1)
.kill()
.wait_for_failure()
.start()
.register(topics=reg)
.wait_for_update(topic_name, 2)
)
with StatestoreSubscriber(update_cb=check_entries) as sub2:
(
sub2.start()
.register(topics=reg)
.wait_for_update(topic_name, 2)
)
def test_heartbeat_failure_reset(self):
"""Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID
should be reset when it resubscribes, not after the first successful heartbeat. Delay
the heartbeat to force the topic update to finish first."""
with StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5)) as sub:
topic_name = "test_heartbeat_failure_reset"
reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
sub.start()
sub.register(topics=[reg])
LOG.info("Registered with id {0}".format(sub.subscriber_id))
sub.wait_for_heartbeat(1)
sub.kill()
LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
sub.wait_for_failure()
# IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
# be received to confirm that the subsequent updates are being scheduled repeatedly.
target_updates = sub.update_counts[topic_name] + 5
sub.start()
sub.register(topics=[reg])
LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
sub.wait_for_update(topic_name, target_updates)
def test_min_subscriber_topic_version(self):
self._do_test_min_subscriber_topic_version(False)
def test_min_subscriber_topic_version_with_straggler(self):
self._do_test_min_subscriber_topic_version(True)
def _do_test_min_subscriber_topic_version(self, simulate_straggler):
"""Implementation of test that the 'min_subscriber_topic_version' flag is correctly
set when requested. This tests runs two subscribers concurrently and tracks the
minimum version each has processed. If 'simulate_straggler' is true, one subscriber
rejects updates so that its version is not advanced."""
topic_name = "test_min_subscriber_topic_version_%s" % uuid.uuid1()
# This lock is held while processing the update to protect last_to_versions.
update_lock = threading.Lock()
last_to_versions = {}
TOTAL_SUBSCRIBERS = 2
def callback(sub, args, is_producer, sub_name):
"""Callback for subscriber to verify min_subscriber_topic_version behaviour.
If 'is_producer' is true, this acts as the producer, otherwise it acts as the
consumer. 'sub_name' is a name used to index into last_to_versions."""
if topic_name not in args.topic_deltas:
# The update doesn't contain our topic.
pass
with update_lock:
LOG.info("{0} got update {1}".format(sub_name,
repr(args.topic_deltas[topic_name])))
LOG.info("Versions: {0}".format(last_to_versions))
to_version = args.topic_deltas[topic_name].to_version
from_version = args.topic_deltas[topic_name].from_version
min_subscriber_topic_version = \
args.topic_deltas[topic_name].min_subscriber_topic_version
if is_producer:
assert min_subscriber_topic_version is not None
assert (to_version == 0 and min_subscriber_topic_version == 0) or\
min_subscriber_topic_version < to_version,\
"'to_version' hasn't been created yet by this subscriber."
# Only validate version once all subscribers have processed an update.
if len(last_to_versions) == TOTAL_SUBSCRIBERS:
min_to_version = min(last_to_versions.values())
assert min_subscriber_topic_version <= min_to_version,\
"The minimum subscriber topic version seen by the producer cannot get " +\
"ahead of the minimum version seem by the consumer, by definition."
assert min_subscriber_topic_version >= min_to_version - 2,\
"The min topic version can be two behind the last version seen by " + \
"this subscriber because the updates for both subscribers are " + \
"prepared in parallel and because it's possible that the producer " + \
"processes two updates in-between consumer updates. This is not " + \
"absolute but depends on updates not being delayed a large amount."
else:
# Consumer did not request topic version.
assert min_subscriber_topic_version is None
# Check the 'to_version' and update 'last_to_versions'.
last_to_version = last_to_versions.get(sub_name, 0)
if to_version > 0:
# Non-empty update.
assert from_version == last_to_version
# Stragglers should accept the first update then skip later ones.
skip_update = simulate_straggler and not is_producer and last_to_version > 0
if not skip_update: last_to_versions[sub_name] = to_version
if is_producer:
delta = self.make_topic_update(topic_name)
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
skipped=False)
elif skip_update:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True)
else:
return DEFAULT_UPDATE_STATE_RESPONSE
# Two concurrent subscribers, which pushes out updates and checks the minimum
# version, the other which just consumes the updates.
def producer_callback(sub, args):
return callback(sub, args, True, "producer")
def consumer_callback(sub, args):
return callback(sub, args, False, "consumer")
with StatestoreSubscriber(update_cb=consumer_callback) as consumer_sub:
with StatestoreSubscriber(update_cb=producer_callback) as producer_sub:
consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
populate_min_subscriber_topic_version=True)
NUM_UPDATES = 6
(
consumer_sub.start()
.register(topics=[consumer_reg])
)
(
producer_sub.start()
.register(topics=[producer_reg])
.wait_for_update(topic_name, NUM_UPDATES)
)
consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
def test_transient_entry_removal_race(self):
"""IMPALA-7306: transient entries were not deleted if the subscriber is unregistered
while it is in the middle of a callback. This test exercises that case by blocking
the update callback so that it is still running when the statestore unregisters the
subscriber for failed heartbeats. It also confirms that non-transient entries are not
removed."""
transient_topic_name = "test_transient_entry_removal_race_transient"
non_transient_topic_name = "test_transient_entry_removal_race_non_transient"
topic_regs = [TTopicRegistration(topic_name=transient_topic_name, is_transient=True),
TTopicRegistration(topic_name=non_transient_topic_name, is_transient=False)]
# The heartbeat timeout is 3s, so sleep for long enough for it to expire
HEARTBEAT_DELAY = 10
def delayed_heartbeat(sub, args):
LOG.info("Heartbeat callback called")
time.sleep(HEARTBEAT_DELAY)
LOG.debug("Heartbeat callback about to return")
def add_transient_entries_after_hb_failure(sub, args):
LOG.info("Update callback called")
# Add an additional delay so that this returns after the heartbeat.
time.sleep(WAIT_FOR_FAILURE_TIMEOUT)
updates = [self.make_topic_update(transient_topic_name, "k", "v"),
self.make_topic_update(non_transient_topic_name, "k", "v")]
LOG.debug("Update callback about to return")
return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates, skipped=False)
# Subscriber with delay creates a transient entry, which should not be added since
# the subscriber failed and was unregistered.
with StatestoreSubscriber(heartbeat_cb=delayed_heartbeat,
update_cb=add_transient_entries_after_hb_failure) as sub:
# Wait for the first update (which should happen after failure), then confirm
# that the failure occurred.
(
sub.start()
.register(topics=topic_regs)
.wait_for_update(transient_topic_name, 1)
.wait_for_failure(timeout=WAIT_FOR_FAILURE_TIMEOUT)
)
def verify_transient_entry_removed(sub, args):
transient_delta = args.topic_deltas[transient_topic_name]
assert len(transient_delta.topic_entries) == 0, args
non_transient_delta = args.topic_deltas[non_transient_topic_name]
# Non-transient update should include topic that was not removed
assert len(non_transient_delta.topic_entries) == 1, args
entry = non_transient_delta.topic_entries[0]
assert entry.key == "k0"
assert entry.value == "v0"
assert not entry.deleted
# Skip updates so that statestore will re-send non-transient entries and the above
# assertions remain valid on subsequent callbacks.
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True)
# Verify that the transient entry for the failed subscriber is not present.
with StatestoreSubscriber(update_cb=verify_transient_entry_removed) as sub:
(
sub.start()
.register(topics=topic_regs)
.wait_for_update(transient_topic_name, 1)
)