IMPALA-13253: Add option to enable keepalive for client connections

Client connections can drop without an explicit close. This can
happen if client machine resets or there is a network disruption.
Some load balancers have an idle timeout that result in the
connection becoming invalid without an explicit teardown. With
short idle timeouts (e.g. AWS LB has a timeout of 350 seconds),
this can impact many connections.

This adds startup options to enable / tune TCP keepalive settings for
client connections:
client_keepalive_probe_period_s - idle time before doing keepalive probes
  If set to > 0, keepalive is enabled.
client_keepalive_retry_period_s - time between keepalive probes
client_keepalive_retry_count - number of keepalive probes
These startup options mirror the startup options for Kudu's
equivalent functionality.

Thrift has preexisting support for turning on keepalive, but that
support uses the OS defaults for keepalive settings. To add the
ability to tune the keepalive settings, this implements a wrapper
around the Thrift socket (both TLS and non-TLS) and manually sets
the keepalive options on the socket (mirroring code from Kudu's
Socket::SetTcpKeepAlive).

This does not enable keepalive by default to make it easy to backport.
A separate patch will turn keepalive on by default.

Testing:
 - Added a custom cluster test that connects with impala-shell
   and verifies that the socket has the keepalive timer.
   Verified that it works on Ubuntu 20, Centos 7, and Redhat 8.
 - Used iptables to manually test cases where the client is unreachable
   and verified that the server detects that and closes the connection.

Change-Id: I9e50f263006c456bc0797b8306aa4065e9713450
Reviewed-on: http://gerrit.cloudera.org:8080/22254
Reviewed-by: Yida Wu <wydbaggio000@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Joe McDonnell
2024-07-25 12:26:15 -07:00
parent 99545fbf45
commit 71818c673b
6 changed files with 271 additions and 4 deletions

View File

@@ -341,15 +341,22 @@ Status ThriftServer::CreateSocket(std::shared_ptr<TServerSocket>* socket) {
disable_tls12_);
socket_factory->loadCertificate(certificate_path_.c_str());
socket_factory->loadPrivateKey(private_key_path_.c_str());
socket->reset(new TSSLServerSocket(port_, socket_factory));
ImpalaKeepAliveServerSocket<TSSLServerSocket>* server_socket =
new ImpalaKeepAliveServerSocket<TSSLServerSocket>(port_, socket_factory);
server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
keepalive_retry_period_s_, keepalive_retry_count_);
socket->reset(server_socket);
} catch (const TException& e) {
return Status(TErrorCode::SSL_SOCKET_CREATION_FAILED, e.what());
}
return Status::OK();
} else {
socket->reset(new TServerSocket(port_));
return Status::OK();
ImpalaKeepAliveServerSocket<TServerSocket>* server_socket =
new ImpalaKeepAliveServerSocket<TServerSocket>(port_);
server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
keepalive_retry_period_s_, keepalive_retry_count_);
socket->reset(server_socket);
}
return Status::OK();
}
Status ThriftServer::EnableSsl(SSLProtocol version, const string& certificate,
@@ -389,6 +396,13 @@ Status ThriftServer::EnableSsl(SSLProtocol version, const string& certificate,
return Status::OK();
}
void ThriftServer::SetKeepAliveOptions(int32_t probe_period_s, int32_t retry_period_s,
int32_t retry_count) {
keepalive_probe_period_s_ = probe_period_s;
keepalive_retry_period_s_ = retry_period_s;
keepalive_retry_count_ = retry_count;
}
Status ThriftServer::Start() {
DCHECK(!started_);
std::shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());

View File

@@ -307,6 +307,14 @@ class ThriftServer {
kudu::security::SecurityDefaults::kDefaultTlsCipherSuites,
bool disable_tls12 = false);
/// Sets keepalive options for the client TCP connections. Keepalive is only enabled
/// if probe_period_s > 0. These are the three standard keepalive settings for Linux:
/// If a client connection is idle for probe_period_s seconds, it starts sending
/// keepalives. If it doesn't hear back, it retries every retry_period_s seconds until
/// it exceeds retry_count.
void SetKeepAliveOptions(int32_t probe_period_s, int32_t retry_period_s,
int32_t retry_count);
/// Creates the server socket on which this server listens. May be SSL enabled. Returns
/// OK unless there was a Thrift error.
Status CreateSocket(std::shared_ptr<apache::thrift::transport::TServerSocket>* socket);
@@ -406,6 +414,11 @@ class ThriftServer {
TransportType transport_type_;
bool is_external_facing_;
/// Keepalive options for client connections.
int32_t keepalive_probe_period_s_ = 0;
int32_t keepalive_retry_period_s_ = 0;
int32_t keepalive_retry_count_ = 0;
};
/// Helper class to build new ThriftServer instances.
@@ -502,6 +515,19 @@ class ThriftServerBuilder {
return *this;
}
/// Sets keepalive options for the client TCP connections. Keepalive is only enabled
/// if probe_period_s > 0. These are the three standard keepalive settings for Linux:
/// If a client connection is idle for probe_period_s seconds, it starts sending
/// keepalives. If it doesn't hear back, it retries every retry_period_s seconds until
/// it exceeds retry_count.
ThriftServerBuilder& keepalive(int32_t probe_period_s,
int32_t retry_period_s, int32_t retry_count) {
keepalive_probe_period_s_ = probe_period_s;
keepalive_retry_period_s_ = retry_period_s;
keepalive_retry_count_ = retry_count;
return *this;
}
/// Constructs a new ThriftServer and puts it in 'server', if construction was
/// successful, returns an error otherwise. In the error case, 'server' will not have
/// been set and will not need to be freed, otherwise the caller assumes ownership of
@@ -516,6 +542,8 @@ class ThriftServerBuilder {
version_, certificate_, private_key_, pem_password_cmd_, cipher_list_,
tls_ciphersuites_, disable_tls12_));
}
ptr->SetKeepAliveOptions(keepalive_probe_period_s_, keepalive_retry_period_s_,
keepalive_retry_count_);
(*server) = ptr.release();
return Status::OK();
}
@@ -544,6 +572,9 @@ class ThriftServerBuilder {
kudu::security::SecurityDefaults::kDefaultTlsCipherSuites;
bool disable_tls12_ = false;
bool is_external_facing_ = true;
int32_t keepalive_probe_period_s_ = 0;
int32_t keepalive_retry_period_s_ = 0;
int32_t keepalive_retry_count_ = 0;
};
/// Contains a map from string for --ssl_minimum_version to Thrift's SSLProtocol.

View File

@@ -18,12 +18,15 @@
#include "rpc/thrift-util.h"
#include <limits>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <gtest/gtest.h>
#include <thrift/config.h>
#include "kudu/security/security_flags.h"
#include "kudu/util/openssl_util.h"
#include "util/error-util.h"
#include "util/hash-util.h"
#include "util/openssl-util.h"
#include "util/time.h"
@@ -203,6 +206,34 @@ void ImpalaTlsSocketFactory::configureCiphers(const string& cipher_list,
#endif
}
template<typename T>
Status SetSockOpt(THRIFT_SOCKET socket, int level, int option,
string option_string, const T& value) {
if (::setsockopt(socket, level, option, &value, sizeof(T)) == -1) {
int err = errno;
return Status(Substitute("Failed to set $0 to $1: $2", option_string,
value, GetStrErrMsg(err)));
}
return Status::OK();
}
Status SetKeepAliveOptionsForSocket(THRIFT_SOCKET socket, int32_t probe_period_s,
int32_t retry_period_s, int32_t retry_count) {
if (probe_period_s > 0) {
RETURN_IF_ERROR(SetSockOpt(socket, IPPROTO_TCP, TCP_KEEPIDLE,
"TCP_KEEPIDLE (keepalive probe period)", probe_period_s));
}
if (retry_period_s > 0) {
RETURN_IF_ERROR(SetSockOpt(socket, IPPROTO_TCP, TCP_KEEPINTVL,
"TCP_KEEPINTVL (keepalive retry period)", retry_period_s));
}
if (retry_count > 0) {
RETURN_IF_ERROR(SetSockOpt(socket, IPPROTO_TCP, TCP_KEEPCNT,
"TCP_KEEPCNT (keepalive retry count)", retry_count));
}
return Status::OK();
}
static void ThriftOutputFunction(const char* output) {
VLOG_QUERY << output;
}

View File

@@ -185,6 +185,64 @@ class ImpalaTlsSocketFactory : public apache::thrift::transport::TSSLSocketFacto
bool disable_tls12);
};
// Helper function to set keepalive options on the provided THRIFT_SOCKET.
// These options are only effective if keepalive is enabled separately (by Thrift).
Status SetKeepAliveOptionsForSocket(THRIFT_SOCKET, int32_t probe_period_s,
int32_t retry_period_s, int32_t retry_count);
// Impala uses TServerSocket and TSSLServerSocket for external client connections.
// Thrift has a built-in ability to turn on keepalive for the TCP socket. However, it
// does not have an ability to tune the keepalive options, so the socket would use the
// OS default settings. ImpalaKeepAliveServerSocket is a templated class that takes
// in an underlying TServerSocket / TSSLServerSocket type. It behaves like the
// underlying type (and uses the same constructor signatures) except that it sets these
// keepalive options on the socket returned by createSocket():
// - probe period / TCP_KEEPIDLE: Time before first keepalive probe
// - retry period / TCP_KEEPINTVL: Time between retries after keepalive starts
// - number of retries / TCP_KEEPCNT: Maximum number of retries
template <typename ThriftServerSocketType>
class ImpalaKeepAliveServerSocket : public ThriftServerSocketType {
public:
using ThriftServerSocketType::ThriftServerSocketType;
// This is called immediately after calling the constructor to store the
// keepalive settings to apply to the future sockets. This will set
// Thrift's keepalive setting as well. It must be called before anything
// will call createSocket().
void setKeepAliveOptions(int32_t probe_period_s, int32_t retry_period_s,
int32_t retry_count) {
keepalive_enabled_ = probe_period_s > 0;
ThriftServerSocketType::setKeepAlive(keepalive_enabled_);
keepalive_probe_period_s_ = probe_period_s;
keepalive_retry_period_s_ = retry_period_s;
keepalive_retry_count_ = retry_count;
}
protected:
// Get a socket from the underlying TServerSocket / TSSLServerSocket type, then set the
// keepalive options on the socket before returning it. Note: The THRIFT_SOCKET
// type on Linux is a standard socket file descriptor.
std::shared_ptr<apache::thrift::transport::TSocket> createSocket(THRIFT_SOCKET socket) {
std::shared_ptr<apache::thrift::transport::TSocket> tsocket =
ThriftServerSocketType::createSocket(socket);
if (keepalive_enabled_) {
Status status = SetKeepAliveOptionsForSocket(socket, keepalive_probe_period_s_,
keepalive_retry_period_s_, keepalive_retry_count_);
if (!status.ok()) {
throw apache::thrift::transport::TTransportException(
apache::thrift::transport::TTransportException::INTERNAL_ERROR,
status.msg().msg());
}
}
return tsocket;
}
private:
bool keepalive_enabled_ = false;
int32_t keepalive_probe_period_s_ = 0;
int32_t keepalive_retry_period_s_ = 0;
int32_t keepalive_retry_count_ = 0;
};
/// Redirects all Thrift logging to VLOG(1)
void InitThriftLogging();

View File

@@ -337,6 +337,18 @@ DEFINE_int64(accepted_client_cnxn_timeout, 300000,
"the post-accept, pre-setup connection queue before it is timed out and the "
"connection request is rejected. A value of 0 means there is no timeout.");
DEFINE_int32(client_keepalive_probe_period_s, 0,
"The duration in seconds after a client connection has gone idle before a TCP "
"keepalive probe is sent to the client. Set to 0 to disable TCP keepalive probes "
"from being sent.");
DEFINE_int32(client_keepalive_retry_period_s, 0,
"The duration in second between successive keepalive probes from a client connection "
"if the previous probes are not acknowledged. Effective only if "
"--client_keepalive_probe_period_s is not 0.");
DEFINE_int32(client_keepalive_retry_count, 0,
"The maximum number of keepalive probes sent before declaring the client as dead. "
"Effective only if --client_keepalive_probe_period_s is not 0.");
DEFINE_string(query_event_hook_classes, "", "Comma-separated list of java QueryEventHook "
"implementation classes to load and register at Impala startup. Class names should "
"be fully-qualified and on the classpath. Whitespace acceptable around delimiters.");
@@ -3166,6 +3178,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
@@ -3195,6 +3210,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
hs2_server_.reset(server);
hs2_server_->SetConnectionHandler(this);
@@ -3226,6 +3244,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
external_fe_server_.reset(server);
external_fe_server_->SetConnectionHandler(this);
@@ -3257,6 +3278,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&http_server));
hs2_http_server_.reset(http_server);
hs2_http_server_->SetConnectionHandler(this);

View File

@@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import logging
import os
import pytest
import re
import subprocess
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import create_client_protocol_dimension
from tests.common.test_vector import ImpalaTestVector
from tests.shell.util import run_impala_shell_cmd, get_impalad_port
LOG = logging.getLogger('test_client_keepalive')
class TestClientKeepalive(CustomClusterTestSuite):
"""Tests for enabling server-side keepalive for client connections.
The mechanism is slightly different SSL and non-SSL, so this tests both."""
CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
SSL_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem "
"--ssl_server_certificate=%s/server-cert.pem "
"--ssl_private_key=%s/server-key.pem "
"--hostname=localhost " # Required to match hostname in certificate
% (CERT_DIR, CERT_DIR, CERT_DIR))
KEEPALIVE_ARGS = ("--client_keepalive_probe_period_s=600")
def get_ss_command(self):
# HACK: Most systems have ss on the PATH, but sometimes the PATH is misconfigured
# while ss is still available in /usr/sbin. This tries the PATH and then falls back
# to trying /usr/sbin/ss.
possible_ss_commands = ['ss', '/usr/sbin/ss']
with open(os.devnull, "w") as devnull:
for ss_command in possible_ss_commands:
try:
retcode = subprocess.call([ss_command], stdout=devnull, stderr=devnull)
LOG.info("{0} returns {1}".format(ss_command, retcode))
if retcode == 0:
return ss_command
except Exception as e:
LOG.info(e)
pass
raise Exception("No valid ss executable. Tried: {0}".format(possible_ss_commands))
def check_keepalive(self, vector, ssl):
ss = self.get_ss_command()
impalad_port = get_impalad_port(vector)
# Sleep 1 second to make sure the connection is idle, then use the ss utility
# to print information about keepalive.
# -H disables the header
# -t limits it to TCP connections
# -o prints the timer information which includes keepalive
# -n uses numeric addresses to avoid DNS lookups
# sport X - limit to connections for the impalad port that we are using
ss_command = "sleep 1 && {0} -Hton sport = {1}".format(ss, impalad_port)
LOG.info("Command: {0}".format(ss_command))
shell_options = ["-q", "shell {0}".format(ss_command)]
if ssl:
shell_options.append("--ssl")
result = run_impala_shell_cmd(vector, shell_options)
LOG.info("STDOUT: {0} STDERR: {1}".format(result.stdout, result.stderr))
# The message is of the form "timer:(keepalive,$TIME,$NUM_RETRIES)"
# e.g. "timer:(keepalive,9min58sec,0)" or "timer:(keepalive,10min,0)"
KEEPALIVE_REGEX = r"timer:\(keepalive,([0-9]+)min([0-9]+sec)?,([0-9])\)"
match = re.search(KEEPALIVE_REGEX, result.stdout)
assert match, "Could not find keepalive information in {0}".format(result.stdout)
num_minutes = int(match.group(1))
num_retries = int(match.group(3))
assert num_minutes == 9 or num_minutes == 10
assert num_retries == 0
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=(SSL_ARGS + KEEPALIVE_ARGS),
statestored_args=SSL_ARGS,
catalogd_args=SSL_ARGS)
def test_ssl_keepalive(self, vector):
# Keepalive applies to all client ports / protocols, so test all protocols
# Iterate over test vector within test function to avoid restart cluster
for vector in\
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
self.check_keepalive(vector, ssl=True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=KEEPALIVE_ARGS)
def test_nonssl_keepalive(self, vector):
# Keepalive applies to all client ports / protocols, so test all protocols
# Iterate over test vector within test function to avoid restart cluster
for vector in\
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
self.check_keepalive(vector, ssl=False)