Files
impala/tests/custom_cluster/test_custom_statestore.py
Michael Ho 41426077fb IMPALA-9026: Use resolved IP address for statestore subscriber
This change adds a flag (--statestore_subscriber_use_resolved_address)
which, if set to true, allows statestore subscribers to use its
resolved IP address instead of its hostname as the heartbeat
address which statestore sends heartbeats / updates to.

This flag is useful in certain situation in which the subscriber's
DNS entry may not be present for a valid reason (e.g. a Kubernetes
pod whose readiness probe returns false). An example is that there
are multiple Impala coordinators but only one of them will be active
at a time (for admission control reason) and the rest will serve
as backup. In which case, we still want the backup coordinators to
receive updates from statestore but not serve any queries.

Change-Id: Ieb8302dec0e52beb9f0b88306a51c38ff42a63a2
Reviewed-on: http://gerrit.cloudera.org:8080/14388
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2019-10-29 23:11:47 +00:00

103 lines
4.2 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests statestore with non-default startup options
import logging
import os
import pytest
import re
import sys
import uuid
import socket
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_test_suite import ImpalaTestSuite
from Types.ttypes import TNetworkAddress
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
import StatestoreService.StatestoreSubscriber as Subscriber
import StatestoreService.StatestoreService as Statestore
from ErrorCodes.ttypes import TErrorCode
LOG = logging.getLogger('custom_statestore_test')
STATESTORE_SERVICE_PORT = 24000
# A simple wrapper class to launch a cluster where we can tune various
# startup parameters of the statestored to test correct boundary-value
# behavior.
class TestCustomStatestore(CustomClusterTestSuite):
# Grab a port the statestore subscribers will use to connect.
# Note that all subscribers we create below use this port to connect,
# with different subscriber IDs.
handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
handle.bind(('localhost', 0))
_, port = handle.getsockname()
@classmethod
def get_workload(self):
return 'functional-query'
def __register_subscriber(self):
subscriber_id = "python-test-client-%s" % uuid.uuid4()
topics = []
request = Subscriber.TRegisterSubscriberRequest(topic_registrations=topics,
subscriber_location=TNetworkAddress("localhost", self.port),
subscriber_id=subscriber_id)
client_transport = \
TTransport.TBufferedTransport(TSocket.TSocket('localhost', STATESTORE_SERVICE_PORT))
protocol = TBinaryProtocol.TBinaryProtocol(client_transport)
client = Statestore.Client(protocol)
client_transport.open()
return client.RegisterSubscriber(request)
@CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
def test_statestore_max_subscribers(self):
"""Test that the statestored correctly handles the condition where the number
of subscribers exceeds FLAGS_statestore_max_subscribers
(see be/src/statestore/statestore.cc). The expected behavior is for the
statestored to reject the subscription request once the threshold is
exceeded."""
# With a statestore_max_subscribers of 3, we should hit the registration error
# pretty quick.
for x in xrange(20):
response = self.__register_subscriber()
if response.status.status_code == TErrorCode.OK:
self.registration_id = response.registration_id
LOG.log(logging.INFO, "Registration id %s, x=%d" % (response.registration_id, x))
else:
assert 'Maximum subscriber limit reached:' in ''.join(response.status.error_msgs)
return
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--statestore_subscriber_use_resolved_address=true",
catalogd_args="--statestore_subscriber_use_resolved_address=true")
def test_subscriber_with_resolved_address(self, vector):
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
self.assert_impalad_log_contains("INFO",
"Registering with statestore with resolved address")
self.assert_catalogd_log_contains("INFO",
"Registering with statestore with resolved address")