mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
KUDU-2218. tls_socket: properly handle temporary socket errors in Writev
This fixes a bug which caused RaftConsensusITest.TestLargeBatches to fail when run under stress, as in the following command line: taskset -c 0-4 \ build/latest/bin/raft_consensus-itest \ --gtest_filter=\*LargeBat\* \ --stress-cpu-threads=8 This would produce an error like: Network error: failed to write to TLS socket: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry:s3_pkt.c:878 This means that we were retrying a write after getting EAGAIN, but with a different buffer than the first time. I tracked this down to mishandling of temporary socket errors in TlsSocket::Writev(). In the case that we successfully write part of the io vector but hit such an error trying to write a later element in the vector, we were still propagating the error back up to the caller. The caller didn't realize that part of the write was successful, and thus it would retry the write from the beginning. The fix is to fix the above, but also to enable partial writes in TlsContext. The new test fails if either of the above two changes are backed out. Change-Id: If797f220f42bfb2e6f452b66f15e7a758e883472 Reviewed-on: http://gerrit.cloudera.org:8080/8570 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <aserbin@cloudera.com> Reviewed-on: http://gerrit.cloudera.org:8080/9361 Reviewed-by: Michael Ho <kwho@cloudera.com> Tested-by: Impala Public Jenkins
This commit is contained in:
committed by
Impala Public Jenkins
parent
baec8cae34
commit
678bf28e23
@@ -96,7 +96,7 @@ Status TlsContext::Init() {
|
||||
if (!ctx_) {
|
||||
return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
|
||||
}
|
||||
SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY);
|
||||
SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
|
||||
|
||||
// Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
|
||||
// We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "kudu/security/cert.h"
|
||||
#include "kudu/security/openssl_util.h"
|
||||
#include "kudu/util/errno.h"
|
||||
#include "kudu/util/net/socket.h"
|
||||
|
||||
namespace kudu {
|
||||
namespace security {
|
||||
@@ -42,11 +43,11 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
|
||||
CHECK(ssl_);
|
||||
SCOPED_OPENSSL_NO_PENDING_ERRORS;
|
||||
|
||||
*nwritten = 0;
|
||||
if (PREDICT_FALSE(amt == 0)) {
|
||||
// Writing an empty buffer is a no-op. This happens occasionally, eg in the
|
||||
// case where the response has an empty sidecar. We have to special case
|
||||
// it, because SSL_write can return '0' to indicate certain types of errors.
|
||||
*nwritten = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -61,7 +62,6 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
|
||||
ErrnoToString(save_errno), save_errno);
|
||||
}
|
||||
// Socket not ready to write yet.
|
||||
*nwritten = 0;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::NetworkError("failed to write to TLS socket",
|
||||
@@ -90,6 +90,12 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
|
||||
}
|
||||
RETURN_NOT_OK(SetTcpCork(0));
|
||||
*nwritten = total_written;
|
||||
// If we did manage to write something, but not everything, due to a temporary socket
|
||||
// error, then we should still return an OK status indicating a successful _partial_
|
||||
// write.
|
||||
if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
|
||||
return Status::OK();
|
||||
}
|
||||
return write_status;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,11 @@
|
||||
|
||||
#include "kudu/security/tls_handshake.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <csignal>
|
||||
@@ -28,6 +32,7 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <glog/logging.h>
|
||||
#include <gtest/gtest.h>
|
||||
@@ -38,6 +43,8 @@
|
||||
#include "kudu/util/monotime.h"
|
||||
#include "kudu/util/net/sockaddr.h"
|
||||
#include "kudu/util/net/socket.h"
|
||||
#include "kudu/util/random.h"
|
||||
#include "kudu/util/random_util.h"
|
||||
#include "kudu/util/scoped_cleanup.h"
|
||||
#include "kudu/util/status.h"
|
||||
#include "kudu/util/test_macros.h"
|
||||
@@ -46,25 +53,27 @@
|
||||
using std::string;
|
||||
using std::thread;
|
||||
using std::unique_ptr;
|
||||
|
||||
using std::vector;
|
||||
|
||||
namespace kudu {
|
||||
namespace security {
|
||||
|
||||
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
|
||||
|
||||
// Size is big enough to not fit into output socket buffer of default size
|
||||
// (controlled by setsockopt() with SO_SNDBUF).
|
||||
constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
|
||||
|
||||
class TlsSocketTest : public KuduTest {
|
||||
public:
|
||||
void SetUp() override {
|
||||
KuduTest::SetUp();
|
||||
|
||||
ASSERT_OK(client_tls_.Init());
|
||||
ASSERT_OK(server_tls_.Init());
|
||||
ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
|
||||
}
|
||||
|
||||
protected:
|
||||
void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
|
||||
TlsContext client_tls_;
|
||||
TlsContext server_tls_;
|
||||
};
|
||||
|
||||
Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
|
||||
@@ -101,19 +110,112 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
|
||||
unique_ptr<Socket> client_sock(new Socket());
|
||||
ASSERT_OK(client_sock->Init(0));
|
||||
ASSERT_OK(client_sock->Connect(addr));
|
||||
|
||||
TlsHandshake client;
|
||||
ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
|
||||
ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
|
||||
ASSERT_OK(client.Finish(&client_sock));
|
||||
*sock = std::move(client_sock);
|
||||
}
|
||||
|
||||
class EchoServer {
|
||||
public:
|
||||
EchoServer()
|
||||
: pthread_sync_(1) {
|
||||
}
|
||||
~EchoServer() {
|
||||
Stop();
|
||||
Join();
|
||||
}
|
||||
|
||||
void Start() {
|
||||
ASSERT_OK(server_tls_.Init());
|
||||
ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
|
||||
ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
|
||||
ASSERT_OK(listener_.Init(0));
|
||||
ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
|
||||
ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
|
||||
|
||||
thread_ = thread([&] {
|
||||
pthread_ = pthread_self();
|
||||
pthread_sync_.CountDown();
|
||||
unique_ptr<Socket> sock(new Socket());
|
||||
Sockaddr remote;
|
||||
CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
|
||||
|
||||
TlsHandshake server;
|
||||
CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
|
||||
CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
|
||||
CHECK_OK(server.Finish(&sock));
|
||||
|
||||
CHECK_OK(sock->SetRecvTimeout(kTimeout));
|
||||
unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
|
||||
// An "echo" loop for kEchoChunkSize byte buffers.
|
||||
while (!stop_) {
|
||||
size_t n;
|
||||
Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
|
||||
if (!s.ok()) {
|
||||
CHECK(stop_) << "unexpected error reading: " << s.ToString();
|
||||
}
|
||||
|
||||
LOG(INFO) << "server echoing " << n << " bytes";
|
||||
size_t written;
|
||||
s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
|
||||
if (!s.ok()) {
|
||||
CHECK(stop_) << "unexpected error writing: " << s.ToString();
|
||||
}
|
||||
if (slow_read_) {
|
||||
SleepFor(MonoDelta::FromMilliseconds(10));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void EnableSlowRead() {
|
||||
slow_read_ = true;
|
||||
}
|
||||
|
||||
const Sockaddr& listen_addr() const {
|
||||
return listen_addr_;
|
||||
}
|
||||
|
||||
bool stopped() const {
|
||||
return stop_;
|
||||
}
|
||||
|
||||
void Stop() {
|
||||
stop_ = true;
|
||||
}
|
||||
void Join() {
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
const pthread_t& pthread() {
|
||||
pthread_sync_.Wait();
|
||||
return pthread_;
|
||||
}
|
||||
|
||||
private:
|
||||
TlsContext server_tls_;
|
||||
Socket listener_;
|
||||
Sockaddr listen_addr_;
|
||||
thread thread_;
|
||||
pthread_t pthread_;
|
||||
CountDownLatch pthread_sync_;
|
||||
std::atomic<bool> stop_ { false };
|
||||
|
||||
bool slow_read_ = false;
|
||||
};
|
||||
|
||||
void handler(int /* signal */) {}
|
||||
|
||||
// Test for failures to handle EINTR during TLS connection
|
||||
// negotiation and data send/receive.
|
||||
TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
|
||||
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
|
||||
Sockaddr listen_addr;
|
||||
ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
|
||||
Socket listener;
|
||||
ASSERT_OK(listener.Init(0));
|
||||
ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
|
||||
ASSERT_OK(listener.GetSocketAddress(&listen_addr));
|
||||
|
||||
// Set up a no-op signal handler for SIGUSR2.
|
||||
struct sigaction sa, sa_old;
|
||||
memset(&sa, 0, sizeof(sa));
|
||||
@@ -121,76 +223,117 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
|
||||
sigaction(SIGUSR2, &sa, &sa_old);
|
||||
SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
|
||||
|
||||
// Size is big enough to not fit into output socket buffer of default size
|
||||
// (controlled by setsockopt() with SO_SNDBUF).
|
||||
constexpr size_t kSize = 32 * 1024 * 1024;
|
||||
|
||||
pthread_t server_tid;
|
||||
CountDownLatch server_tid_sync(1);
|
||||
std::atomic<bool> stop { false };
|
||||
thread server([&] {
|
||||
server_tid = pthread_self();
|
||||
server_tid_sync.CountDown();
|
||||
unique_ptr<Socket> sock(new Socket());
|
||||
Sockaddr remote;
|
||||
CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
|
||||
|
||||
TlsHandshake server;
|
||||
CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
|
||||
CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
|
||||
CHECK_OK(server.Finish(&sock));
|
||||
|
||||
CHECK_OK(sock->SetRecvTimeout(kTimeout));
|
||||
unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
|
||||
// An "echo" loop for kSize byte buffers.
|
||||
while (!stop) {
|
||||
size_t n;
|
||||
Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
|
||||
if (s.ok()) {
|
||||
size_t written;
|
||||
s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
CHECK(stop) << "unexpected error: " << s.ToString();
|
||||
}
|
||||
}
|
||||
});
|
||||
SCOPED_CLEANUP({ server.join(); });
|
||||
EchoServer server;
|
||||
NO_FATALS(server.Start());
|
||||
|
||||
// Start a thread to send signals to the server thread.
|
||||
thread killer([&]() {
|
||||
server_tid_sync.Wait();
|
||||
while (!stop) {
|
||||
PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
|
||||
SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
|
||||
}
|
||||
});
|
||||
while (!server.stopped()) {
|
||||
PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
|
||||
SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
|
||||
}
|
||||
});
|
||||
SCOPED_CLEANUP({ killer.join(); });
|
||||
|
||||
unique_ptr<Socket> client_sock(new Socket());
|
||||
ASSERT_OK(client_sock->Init(0));
|
||||
ASSERT_OK(client_sock->Connect(listen_addr));
|
||||
unique_ptr<Socket> client_sock;
|
||||
NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
|
||||
|
||||
TlsHandshake client;
|
||||
ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
|
||||
ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
|
||||
ASSERT_OK(client.Finish(&client_sock));
|
||||
|
||||
unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
|
||||
unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SleepFor(MonoDelta::FromMilliseconds(1));
|
||||
size_t nwritten;
|
||||
ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
|
||||
ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
|
||||
MonoTime::Now() + kTimeout));
|
||||
size_t n;
|
||||
ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
|
||||
ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
|
||||
MonoTime::Now() + kTimeout));
|
||||
}
|
||||
stop = true;
|
||||
server.Stop();
|
||||
ASSERT_OK(client_sock->Close());
|
||||
|
||||
LOG(INFO) << "client done";
|
||||
}
|
||||
|
||||
// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
|
||||
// but split into random-sized chunks. The chunks are sized randomly between 1 and
|
||||
// 'max_chunk_size' bytes.
|
||||
vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
|
||||
vector<struct iovec> ret;
|
||||
uint8_t* p = buf;
|
||||
int rem = len;
|
||||
while (rem > 0) {
|
||||
int len = rng->Uniform(max_chunk_size) + 1;
|
||||
len = std::min(len, rem);
|
||||
ret.push_back({p, static_cast<size_t>(len)});
|
||||
p += len;
|
||||
rem -= len;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Regression test for KUDU-2218, a bug in which Writev would improperly handle
|
||||
// partial writes in non-blocking mode.
|
||||
TEST_F(TlsSocketTest, TestNonBlockingWritev) {
|
||||
Random rng(GetRandomSeed32());
|
||||
|
||||
EchoServer server;
|
||||
server.EnableSlowRead();
|
||||
NO_FATALS(server.Start());
|
||||
|
||||
unique_ptr<Socket> client_sock;
|
||||
NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
|
||||
|
||||
int sndbuf = 16 * 1024;
|
||||
CHECK_ERR(setsockopt(client_sock->GetFd(), SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)));
|
||||
|
||||
unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
|
||||
unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
|
||||
RandomString(buf.get(), kEchoChunkSize, &rng);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ASSERT_OK(client_sock->SetNonBlocking(true));
|
||||
|
||||
// Prepare an IOV with the input data split into a bunch of randomly-sized
|
||||
// chunks.
|
||||
vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
|
||||
|
||||
// Loop calling writev until the iov is exhausted
|
||||
int rem = kEchoChunkSize;
|
||||
while (rem > 0) {
|
||||
CHECK(!iov.empty()) << rem;
|
||||
int32_t n;
|
||||
Status s = client_sock->Writev(&iov[0], iov.size(), &n);
|
||||
if (Socket::IsTemporarySocketError(s.posix_code())) {
|
||||
sched_yield();
|
||||
continue;
|
||||
}
|
||||
ASSERT_OK(s);
|
||||
rem -= n;
|
||||
ASSERT_GE(n, 0);
|
||||
while (n > 0) {
|
||||
if (n < iov[0].iov_len) {
|
||||
iov[0].iov_len -= n;
|
||||
iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
|
||||
n = 0;
|
||||
} else {
|
||||
n -= iov[0].iov_len;
|
||||
iov.erase(iov.begin());
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "client waiting";
|
||||
|
||||
size_t n;
|
||||
ASSERT_OK(client_sock->SetNonBlocking(false));
|
||||
ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
|
||||
MonoTime::Now() + kTimeout));
|
||||
LOG(INFO) << "client got response";
|
||||
|
||||
ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
|
||||
}
|
||||
|
||||
server.Stop();
|
||||
ASSERT_OK(client_sock->Close());
|
||||
}
|
||||
|
||||
} // namespace security
|
||||
} // namespace kudu
|
||||
|
||||
Reference in New Issue
Block a user