mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Client connections can drop without an explicit close. This can happen if client machine resets or there is a network disruption. Some load balancers have an idle timeout that result in the connection becoming invalid without an explicit teardown. With short idle timeouts (e.g. AWS LB has a timeout of 350 seconds), this can impact many connections. This adds startup options to enable / tune TCP keepalive settings for client connections: client_keepalive_probe_period_s - idle time before doing keepalive probes If set to > 0, keepalive is enabled. client_keepalive_retry_period_s - time between keepalive probes client_keepalive_retry_count - number of keepalive probes These startup options mirror the startup options for Kudu's equivalent functionality. Thrift has preexisting support for turning on keepalive, but that support uses the OS defaults for keepalive settings. To add the ability to tune the keepalive settings, this implements a wrapper around the Thrift socket (both TLS and non-TLS) and manually sets the keepalive options on the socket (mirroring code from Kudu's Socket::SetTcpKeepAlive). This does not enable keepalive by default to make it easy to backport. A separate patch will turn keepalive on by default. Testing: - Added a custom cluster test that connects with impala-shell and verifies that the socket has the keepalive timer. Verified that it works on Ubuntu 20, Centos 7, and Redhat 8. - Used iptables to manually test cases where the client is unreachable and verified that the server detects that and closes the connection. Change-Id: I9e50f263006c456bc0797b8306aa4065e9713450 Reviewed-on: http://gerrit.cloudera.org:8080/22254 Reviewed-by: Yida Wu <wydbaggio000@gmail.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
110 lines
4.9 KiB
Python
110 lines
4.9 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import logging
|
|
import os
|
|
import pytest
|
|
import re
|
|
import subprocess
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.test_dimensions import create_client_protocol_dimension
|
|
from tests.common.test_vector import ImpalaTestVector
|
|
from tests.shell.util import run_impala_shell_cmd, get_impalad_port
|
|
|
|
LOG = logging.getLogger('test_client_keepalive')
|
|
|
|
|
|
class TestClientKeepalive(CustomClusterTestSuite):
|
|
"""Tests for enabling server-side keepalive for client connections.
|
|
The mechanism is slightly different SSL and non-SSL, so this tests both."""
|
|
|
|
CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
|
|
SSL_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem "
|
|
"--ssl_server_certificate=%s/server-cert.pem "
|
|
"--ssl_private_key=%s/server-key.pem "
|
|
"--hostname=localhost " # Required to match hostname in certificate
|
|
% (CERT_DIR, CERT_DIR, CERT_DIR))
|
|
|
|
KEEPALIVE_ARGS = ("--client_keepalive_probe_period_s=600")
|
|
|
|
def get_ss_command(self):
|
|
# HACK: Most systems have ss on the PATH, but sometimes the PATH is misconfigured
|
|
# while ss is still available in /usr/sbin. This tries the PATH and then falls back
|
|
# to trying /usr/sbin/ss.
|
|
possible_ss_commands = ['ss', '/usr/sbin/ss']
|
|
with open(os.devnull, "w") as devnull:
|
|
for ss_command in possible_ss_commands:
|
|
try:
|
|
retcode = subprocess.call([ss_command], stdout=devnull, stderr=devnull)
|
|
LOG.info("{0} returns {1}".format(ss_command, retcode))
|
|
if retcode == 0:
|
|
return ss_command
|
|
except Exception as e:
|
|
LOG.info(e)
|
|
pass
|
|
|
|
raise Exception("No valid ss executable. Tried: {0}".format(possible_ss_commands))
|
|
|
|
def check_keepalive(self, vector, ssl):
|
|
ss = self.get_ss_command()
|
|
impalad_port = get_impalad_port(vector)
|
|
# Sleep 1 second to make sure the connection is idle, then use the ss utility
|
|
# to print information about keepalive.
|
|
# -H disables the header
|
|
# -t limits it to TCP connections
|
|
# -o prints the timer information which includes keepalive
|
|
# -n uses numeric addresses to avoid DNS lookups
|
|
# sport X - limit to connections for the impalad port that we are using
|
|
ss_command = "sleep 1 && {0} -Hton sport = {1}".format(ss, impalad_port)
|
|
LOG.info("Command: {0}".format(ss_command))
|
|
shell_options = ["-q", "shell {0}".format(ss_command)]
|
|
if ssl:
|
|
shell_options.append("--ssl")
|
|
result = run_impala_shell_cmd(vector, shell_options)
|
|
LOG.info("STDOUT: {0} STDERR: {1}".format(result.stdout, result.stderr))
|
|
|
|
# The message is of the form "timer:(keepalive,$TIME,$NUM_RETRIES)"
|
|
# e.g. "timer:(keepalive,9min58sec,0)" or "timer:(keepalive,10min,0)"
|
|
KEEPALIVE_REGEX = r"timer:\(keepalive,([0-9]+)min([0-9]+sec)?,([0-9])\)"
|
|
match = re.search(KEEPALIVE_REGEX, result.stdout)
|
|
assert match, "Could not find keepalive information in {0}".format(result.stdout)
|
|
num_minutes = int(match.group(1))
|
|
num_retries = int(match.group(3))
|
|
assert num_minutes == 9 or num_minutes == 10
|
|
assert num_retries == 0
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(impalad_args=(SSL_ARGS + KEEPALIVE_ARGS),
|
|
statestored_args=SSL_ARGS,
|
|
catalogd_args=SSL_ARGS)
|
|
def test_ssl_keepalive(self, vector):
|
|
# Keepalive applies to all client ports / protocols, so test all protocols
|
|
# Iterate over test vector within test function to avoid restart cluster
|
|
for vector in\
|
|
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
|
|
self.check_keepalive(vector, ssl=True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(impalad_args=KEEPALIVE_ARGS)
|
|
def test_nonssl_keepalive(self, vector):
|
|
# Keepalive applies to all client ports / protocols, so test all protocols
|
|
# Iterate over test vector within test function to avoid restart cluster
|
|
for vector in\
|
|
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
|
|
self.check_keepalive(vector, ssl=False)
|