IMPALA-5558/IMPALA-5576: Reopen stale client connection

Previously, the retry logic in DoRpc() only allows retry to
happen if send() didn't complete successfully and the exception
indicates a closed connection. However, send() returning
successfully doesn't guarantee that the bytes have actually
reached the remote peer. According to the man page of send(),
when the message does not fit into the send buffer of the socket,
send() normally blocks. So the payload of RPC may be buffered in
the kernel if there is room for it. TCP allows a connection to
be half-open. If an Impalad node is restarted, a stale client
connection to that node may still allow send() to appear to succeed
even though the payload wasn't sent. However, upon calling recv()
in the RPC call to fetch the response, the client will get a return
value of 0. In which case, thrift will throw an exception as the
connection to the remote peer is closed already. Apparently, the
existing retry logic doesn't quite handle this case. One can
consistently reproduce the problem by warming the client cache
followed by restarting one of the Impalad nodes. It will result
a series of query failures due to stale connections.

This change augments the retry logic to also retry the entire RPC
if the exception string contains the messages "No more data to read."
or "SSL_read: Connection reset by peer" to capture the case of stale
connections. Our usage of thrift doesn't involve half-open TCP connection
so having a broken connection in recv() indicates the remote end has
closed the socket already. The generated thrift code doesn't knowingly
close the socket before an RPC completes unless the process crashes,
the connection is stale (e.g. the remote node was restarted) or the
remote end fails to read from the client. In either cases, the entire
RPC should just be retried with a new connection.

This change also fixes QueryState::ReportExecStatusAux() to
unconditionally for up to 3 times when reporting exec status of a
fragment instance. Previously, it may break out of the loop early
if RPC fails with 'retry_is_safe' == true (e.g. due to recv() timeout)
or if the connection to coordinator fails (IMPALA-5576). Declaring the
RPC to have failed may cause all fragment instances of a query to be
cancelled locally, triggering query hang due to IMPALA-2990. Similarly,
the cancellation RPC is also idempotent so it should be unconditionally
retried up to 3 times with 100ms sleep time in between.

The status reporting is idempotent as the handler simply ignores
RPC if it determines that all fragment instances on a given backend
is done so it should be safe to retry the RPC. This change updates
ApplyExecStatusReport() to handle duplicated status reports with
done bit set. Previously we would drop some other fragment instances'
statuses if we received duplicate 'done' statuses from the same
fragment instance(s).

Testing done: Warmed up client cache by running stress test followed by
restarting some Impalad nodes. Running queries used to fail or hang
consistently in the past. It now works with patch. Also ran CSL enduranace
tests and exhaustive builds.

Change-Id: I4d722c8ad3bf0e78e89887b6cb286c69ca61b8f5
Reviewed-on: http://gerrit.cloudera.org:8080/7284
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
Michael Ho
2017-06-22 00:07:00 -07:00
committed by Impala Public Jenkins
parent 604759afbc
commit e0ba5ef6a2
10 changed files with 158 additions and 142 deletions

View File

@@ -194,24 +194,15 @@ bool IsRecvTimeoutTException(const TTransportException& e) {
strstr(e.what(), "SSL_read: Resource temporarily unavailable") != nullptr);
}
// This function implements some heuristics to match against exception details
// thrown by functions in TSocket.cpp and TSSLSocket.cpp in thrift library. It's
// expected the caller (e.g. DoRpc()) has already verified the send part of the RPC
// didn't complete. It's only safe to retry an RPC if the send part didn't complete.
// It's also expected that the RPC client will close the existing connection and reopen
// a new connection before retrying the RPC. If the exception occurs after the send part
// is done, only the recv part of the RPC can be retried.
bool IsSendFailTException(const TTransportException& e) {
// String taken from TSocket::write_partial() in Thrift's TSocket.cpp
return (e.getType() == TTransportException::TIMED_OUT &&
strstr(e.what(), "send timeout expired") != nullptr) ||
(e.getType() == TTransportException::NOT_OPEN &&
// "TTransportException: Transport not open" can be from TSSLSocket.cpp
// when the underlying socket was closed.
(strstr(e.what(), "TTransportException: Transport not open") ||
strstr(e.what(), "write() send()") != nullptr ||
strstr(e.what(), "Called write on non-open socket") != nullptr ||
strstr(e.what(), "Socket send returned 0.") != nullptr));
bool IsConnResetTException(const TTransportException& e) {
// Strings taken from TTransport::readAll(). This happens iff TSocket::read() returns 0.
// As readAll() is reading non-zero length payload, this can only mean recv() called
// by read() returns 0. According to man page of recv(), this implies a stream socket
// peer has performed an orderly shutdown.
return (e.getType() == TTransportException::END_OF_FILE &&
strstr(e.what(), "No more data to read.") != nullptr) ||
(e.getType() == TTransportException::INTERNAL_ERROR &&
strstr(e.what(), "SSL_read: Connection reset by peer") != nullptr);
}
}

View File

@@ -158,9 +158,8 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress&
/// Returns true if the TTransportException corresponds to a TCP socket recv timeout.
bool IsRecvTimeoutTException(const apache::thrift::transport::TTransportException& e);
/// Returns true if the TTransportException corresponds to a send failure due to
/// lost network connection or timeout.
bool IsSendFailTException(const apache::thrift::transport::TTransportException& e);
/// Returns true if the exception indicates the other end of the TCP socket was closed.
bool IsConnResetTException(const apache::thrift::transport::TTransportException& e);
}

View File

@@ -48,31 +48,37 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
void ExecQueryFInstances(TExecQueryFInstancesResult& _return,
const TExecQueryFInstancesParams& params, bool* send_done) {
DCHECK(!*send_done);
FAULT_INJECTION_SEND_RPC_EXCEPTION(5);
ImpalaInternalServiceClient::send_ExecQueryFInstances(params);
*send_done = true;
// Cannot inject fault on recv() side as the callers cannot handle it.
ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return);
}
void ReportExecStatus(TReportExecStatusResult& _return,
const TReportExecStatusParams& params, bool* send_done) {
DCHECK(!*send_done);
FAULT_INJECTION_SEND_RPC_EXCEPTION(3);
ImpalaInternalServiceClient::send_ReportExecStatus(params);
*send_done = true;
FAULT_INJECTION_RECV_RPC_EXCEPTION(3);
ImpalaInternalServiceClient::recv_ReportExecStatus(_return);
}
void CancelQueryFInstances(TCancelQueryFInstancesResult& _return,
const TCancelQueryFInstancesParams& params, bool* send_done) {
DCHECK(!*send_done);
FAULT_INJECTION_SEND_RPC_EXCEPTION(3);
ImpalaInternalServiceClient::send_CancelQueryFInstances(params);
*send_done = true;
FAULT_INJECTION_RECV_RPC_EXCEPTION(3);
ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return);
}
void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& params,
bool* send_done) {
DCHECK(!*send_done);
FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, true /* is_send */);
FAULT_INJECTION_SEND_RPC_EXCEPTION(1024);
if (transmit_csw_ != NULL) {
SCOPED_CONCURRENT_COUNTER(transmit_csw_);
ImpalaInternalServiceClient::send_TransmitData(params);
@@ -80,7 +86,7 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
ImpalaInternalServiceClient::send_TransmitData(params);
}
*send_done = true;
FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, false /* is_send */);
FAULT_INJECTION_RECV_RPC_EXCEPTION(1024);
ImpalaInternalServiceClient::recv_TransmitData(_return);
}

View File

@@ -201,6 +201,7 @@ class ClientConnection {
ClientConnection(ClientCache<T>* client_cache, TNetworkAddress address, Status* status)
: client_cache_(client_cache), client_(NULL), address_(address),
client_is_unrecoverable_(false) {
// TODO: Inject fault here to exercise IMPALA-5576.
*status = client_cache_->GetClient(address, &client_);
if (status->ok()) DCHECK(client_ != NULL);
}
@@ -226,26 +227,15 @@ class ClientConnection {
/// this can lead to f() being called twice, as this method may retry f() once,
/// depending on the error received from the first attempt.
///
/// retry_is_safe is an output parameter. In case of connection failure,
/// '*retry_is_safe' is set to true because the send never occurred and it's
/// safe to retry the RPC. Otherwise, it's set to false to indicate that the RPC was
/// in progress when it failed or the RPC was completed, therefore retrying the RPC
/// is not safe.
///
/// Returns RPC_RECV_TIMEOUT if a timeout occurred while waiting for a response,
/// RPC_CLIENT_CONNECT_FAILURE if the client failed to connect, and RPC_GENERAL_ERROR
/// if the RPC could not be completed for any other reason (except for an unexpectedly
/// closed cnxn).
/// Application-level failures should be signalled through the response type.
///
/// TODO: Consider replacing 'retry_is_safe' with a bool which callers pass to
/// indicate intention to retry recv part of the RPC if it times out.
template <class F, class Request, class Response>
Status DoRpc(const F& f, const Request& request, Response* response,
bool* retry_is_safe = NULL) {
DCHECK(response != NULL);
Status DoRpc(const F& f, const Request& request, Response* response) {
DCHECK(response != nullptr);
client_is_unrecoverable_ = true;
if (retry_is_safe != nullptr) *retry_is_safe = false;
bool send_done = false;
try {
(client_->*f)(*response, request, &send_done);
@@ -254,12 +244,20 @@ class ClientConnection {
return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
"Client $0 timed-out during recv call.", TNetworkAddressToString(address_)));
}
if (!send_done && IsSendFailTException(e)) {
return RetryRpc(f, request, response, retry_is_safe);
// Client may have unexpectedly been closed, so re-open and retry once if we didn't
// successfully send the payload yet or if the exception indicates the connection
// was closed on the other end. Note that TCP can have a half-open connection so
// send() may still succeed even after the other end already closed the socket.
// The payload can just be buffered in the kernel.
if (!send_done || IsConnResetTException(e)) {
return RetryRpc(f, request, response);
}
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
// Payload was sent and failure wasn't a timeout waiting for response. Fail the RPC.
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e, send_done));
} catch (const apache::thrift::TException& e) {
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e, send_done));
}
client_is_unrecoverable_ = false;
return Status::OK();
@@ -298,34 +296,33 @@ class ClientConnection {
/// recovered.
bool client_is_unrecoverable_;
std::string ExceptionMsg(const apache::thrift::TException& e) {
return strings::Substitute("Client for $0 hits an unexpected exception: $1, type: $2",
TNetworkAddressToString(address_), e.what(), typeid(e).name());
std::string ExceptionMsg(const apache::thrift::TException& e, bool send_done) {
return strings::Substitute("Client for $0 hits an unexpected exception: $1, type: $2"
" rpc send completed: $3", TNetworkAddressToString(address_), e.what(),
typeid(e).name(), send_done ? "true" : "false");
}
/// Retry the RPC if TCP connection underpinning this client has been closed
/// unexpectedly. Called only when IsSendFailTException() is true for the failure
/// returned in the first invocation of RPC call. Returns RPC_CLIENT_CONNECT_FAILURE
/// unexpectedly. Called only when we didn't succeed in sending all the payload
/// in the first invocation of RPC call. Returns RPC_CLIENT_CONNECT_FAILURE
/// on connection failure or RPC_GENERAL_ERROR for all other RPC failures.
template <class F, class Request, class Response>
Status RetryRpc(const F& f, const Request& request, Response* response,
bool* retry_is_safe) {
Status RetryRpc(const F& f, const Request& request, Response* response) {
DCHECK(client_is_unrecoverable_);
// Client may have unexpectedly been closed, so re-open and retry.
// TODO: ThriftClient should return proper error codes.
Status status = Reopen();
if (!status.ok()) {
if (retry_is_safe != nullptr) *retry_is_safe = true;
return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail());
}
bool send_done = false;
try {
bool send_done = false;
(client_->*f)(*response, request, &send_done);
} catch (const apache::thrift::TException& e) {
// By this point the RPC really has failed.
// TODO: Revisit this logic later. It's possible that the new connection
// works but we hit timeout here.
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e));
return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e, send_done));
}
client_is_unrecoverable_ = false;
return Status::OK();

View File

@@ -239,12 +239,14 @@ void Coordinator::BackendState::ApplyExecStatusReport(
for (const TFragmentInstanceExecStatus& instance_exec_status:
backend_exec_status.instance_exec_status) {
Status instance_status(instance_exec_status.status);
int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id);
DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
InstanceStats* instance_stats = instance_stats_map_[instance_idx];
DCHECK_EQ(instance_stats->exec_params_.instance_id,
instance_exec_status.fragment_instance_id);
// Ignore duplicate or out-of-order messages.
if (instance_stats->done_) continue;
if (instance_status.ok()) {
int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id);
DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
InstanceStats* instance_stats = instance_stats_map_[instance_idx];
DCHECK_EQ(instance_stats->exec_params_.instance_id,
instance_exec_status.fragment_instance_id);
instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
if (instance_stats->peak_mem_counter_ != nullptr) {
// protect against out-of-order status updates
@@ -261,7 +263,11 @@ void Coordinator::BackendState::ApplyExecStatusReport(
}
}
DCHECK_GT(num_remaining_instances_, 0);
if (instance_exec_status.done) --num_remaining_instances_;
if (instance_exec_status.done) {
DCHECK(!instance_stats->done_);
instance_stats->done_ = true;
--num_remaining_instances_;
}
// TODO: clean up the ReportQuerySummary() mess
if (status_.ok()) {
@@ -324,30 +330,41 @@ bool Coordinator::BackendState::Cancel() {
// set an error status to make sure we only cancel this once
if (status_.ok()) status_ = Status::CANCELLED;
Status status;
ImpalaBackendConnection backend_client(
ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status);
if (!status.ok()) return false;
TCancelQueryFInstancesParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
params.__set_query_id(query_id_);
TCancelQueryFInstancesResult dummy;
VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id="
<< query_id_ << " backend=" << impalad_address();
<< query_id_ << " backend=" << TNetworkAddressToString(impalad_address());
Status rpc_status;
Status client_status;
// Try to send the RPC 3 times before failing.
bool retry_is_safe;
for (int i = 0; i < 3; ++i) {
rpc_status = backend_client.DoRpc(
&ImpalaBackendClient::CancelQueryFInstances, params, &dummy, &retry_is_safe);
if (rpc_status.ok() || !retry_is_safe) break;
ImpalaBackendConnection backend_client(ExecEnv::GetInstance()->impalad_client_cache(),
impalad_address(), &client_status);
if (client_status.ok()) {
// The return value 'dummy' is ignored as it's only set if the fragment instance
// cannot be found in the backend. The fragment instances of a query can all be
// cancelled locally in a backend due to RPC failure to coordinator. In which case,
// the query state can be gone already.
rpc_status = backend_client.DoRpc(
&ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
if (rpc_status.ok()) break;
}
}
if (!client_status.ok()) {
status_.MergeStatus(client_status);
VLOG_QUERY << "CancelQueryFInstances query_id= " << query_id_
<< " failed to connect to " << TNetworkAddressToString(impalad_address())
<< " :" << client_status.msg().msg();
return true;
}
if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
stringstream msg;
msg << "CancelQueryFInstances rpc query_id=" << query_id_
<< " failed: " << rpc_status.msg().msg();
status_.AddDetail(msg.str());
VLOG_QUERY << "CancelQueryFInstances query_id= " << query_id_
<< " rpc to " << TNetworkAddressToString(impalad_address())
<< " failed: " << rpc_status.msg().msg();
return true;
}
return true;
@@ -377,6 +394,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
ObjectPool* obj_pool)
: exec_params_(exec_params),
profile_(nullptr),
done_(false),
profile_created_(false),
total_split_size_(0),
total_ranges_complete_(0) {

View File

@@ -135,6 +135,11 @@ class Coordinator::BackendState {
/// owned by coordinator object pool provided in the c'tor, created in Update()
RuntimeProfile* profile_;
/// true if the final report has been received for the fragment instance.
/// Used to handle duplicate done ReportExecStatus RPC messages. Used only
/// in ApplyExecStatusReport()
bool done_;
/// true after the first call to profile->Update()
bool profile_created_;

View File

@@ -37,8 +37,6 @@
using namespace impala;
#define RETRY_SLEEP_MS 100
QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -183,18 +181,6 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
Status coord_status;
ImpalaBackendConnection coord(ExecEnv::GetInstance()->impalad_client_cache(),
query_ctx().coord_address, &coord_status);
if (!coord_status.ok()) {
// TODO: this might flood the log
LOG(WARNING) << "Couldn't get a client for " << query_ctx().coord_address
<<"\tReason: " << coord_status.GetDetail();
if (instances_started) Cancel();
return;
}
TReportExecStatusParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
params.__set_query_id(query_ctx().query_id);
@@ -234,16 +220,20 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
params.__isset.error_log = (params.error_log.size() > 0);
}
TReportExecStatusResult res;
Status rpc_status;
bool retry_is_safe;
// Try to send the RPC 3 times before failing.
TReportExecStatusResult res;
DCHECK_EQ(res.status.status_code, TErrorCode::OK);
// Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
// It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
for (int i = 0; i < 3; ++i) {
rpc_status = coord.DoRpc(
&ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
if (rpc_status.ok()) break;
if (!retry_is_safe) break;
if (i < 2) SleepForMs(RETRY_SLEEP_MS);
Status client_status;
ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(),
query_ctx().coord_address, &client_status);
if (client_status.ok()) {
rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
if (rpc_status.ok()) break;
}
if (i < 2) SleepForMs(100);
}
Status result_status(res.status);
if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
@@ -251,6 +241,8 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
// report, following this Cancel(), may not succeed anyway.)
// TODO: not keeping an error status here means that all instances might
// abort with CANCELLED status, despite there being an error
// TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
// hang as the coordinator may not be aware of the cancellation.
Cancel();
}
}

View File

@@ -51,42 +51,45 @@ void FaultInjectionUtil::InjectRpcDelay(RpcCallType my_type) {
if (target_rpc_type == my_type) SleepForMs(delay_ms);
}
void FaultInjectionUtil::InjectRpcException(RpcCallType my_type, bool is_send) {
void FaultInjectionUtil::InjectRpcException(bool is_send, int freq) {
static AtomicInt32 send_count(-1);
static AtomicInt32 recv_count(-1);
int32_t xcp_type = FLAGS_fault_injection_rpc_exception_type;
if (xcp_type == RPC_EXCEPTION_NONE) return;
// We currently support injecting exception at TransmitData() RPC only.
int32_t target_rpc_type = GetTargetRPCType();
DCHECK_EQ(target_rpc_type, RPC_TRANSMITDATA);
// We currently support injecting exception at some RPCs only.
if (is_send) {
if (send_count.Add(1) % 1024 == 0) {
if (send_count.Add(1) % freq == 0) {
switch (xcp_type) {
case RPC_EXCEPTION_SEND_LOST_CONNECTION:
case RPC_EXCEPTION_SEND_CLOSED_CONNECTION:
throw TTransportException(TTransportException::NOT_OPEN,
"Called write on non-open socket");
case RPC_EXCEPTION_SEND_TIMEDOUT:
throw TTransportException(TTransportException::TIMED_OUT,
"send timeout expired");
case RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION:
case RPC_EXCEPTION_SSL_SEND_CLOSED_CONNECTION:
throw TTransportException(TTransportException::NOT_OPEN);
case RPC_EXCEPTION_SSL_SEND_TIMEDOUT:
throw TSSLException("SSL_write: Resource temporarily unavailable");
// Simulate half-opened connections.
case RPC_EXCEPTION_SEND_STALE_CONNECTION:
throw TTransportException(TTransportException::END_OF_FILE,
"No more data to read.");
case RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION:
throw TSSLException("SSL_read: Connection reset by peer");
// fall through for the default case.
}
}
} else {
if (recv_count.Add(1) % 1024 == 0) {
if (recv_count.Add(1) % freq == 0) {
switch (xcp_type) {
case RPC_EXCEPTION_RECV_LOST_CONNECTION:
case RPC_EXCEPTION_RECV_CLOSED_CONNECTION:
throw TTransportException(TTransportException::NOT_OPEN,
"Called read on non-open socket");
case RPC_EXCEPTION_RECV_TIMEDOUT:
throw TTransportException(TTransportException::TIMED_OUT,
"EAGAIN (timed out)");
case RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION:
case RPC_EXCEPTION_SSL_RECV_CLOSED_CONNECTION:
throw TTransportException(TTransportException::NOT_OPEN);
case RPC_EXCEPTION_SSL_RECV_TIMEDOUT:
throw TSSLException("SSL_read: Resource temporarily unavailable");

View File

@@ -39,13 +39,15 @@ class FaultInjectionUtil {
enum RpcExceptionType {
RPC_EXCEPTION_NONE = 0,
RPC_EXCEPTION_SEND_LOST_CONNECTION,
RPC_EXCEPTION_SEND_CLOSED_CONNECTION,
RPC_EXCEPTION_SEND_STALE_CONNECTION,
RPC_EXCEPTION_SEND_TIMEDOUT,
RPC_EXCEPTION_RECV_LOST_CONNECTION,
RPC_EXCEPTION_RECV_CLOSED_CONNECTION,
RPC_EXCEPTION_RECV_TIMEDOUT,
RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION,
RPC_EXCEPTION_SSL_SEND_CLOSED_CONNECTION,
RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION,
RPC_EXCEPTION_SSL_SEND_TIMEDOUT,
RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION,
RPC_EXCEPTION_SSL_RECV_CLOSED_CONNECTION,
RPC_EXCEPTION_SSL_RECV_TIMEDOUT,
};
@@ -57,27 +59,28 @@ class FaultInjectionUtil {
static void InjectRpcDelay(RpcCallType my_type);
/// Test util function that injects exceptions to RPC client functions.
/// 'my_type' specifies which RPC type of the current function. Currently, only
/// TransmitData() is supported.
/// 'is_send' indicates whether injected fault is at the send RPC call or recv RPC.
/// It's true if for send RPC call and false for recv RPC call.
/// FLAGS_fault_injection_rpc_exception_type specifies the exception to be injected.
static void InjectRpcException(RpcCallType my_type, bool is_send);
/// 'is_send' indicates whether injected fault is at the send() or recv() of an RPC.
/// The exception specified in 'FLAGS_fault_injection_rpc_exception_type' is injected
/// on every 'freq' invocations of this function.
static void InjectRpcException(bool is_send, int freq);
private:
static int32_t GetTargetRPCType();
};
#define FAULT_INJECTION_RPC_DELAY(type) \
#define FAULT_INJECTION_RPC_DELAY(type) \
FaultInjectionUtil::InjectRpcDelay(FaultInjectionUtil::type)
#define FAULT_INJECTION_RPC_EXCEPTION(type, is_send) \
FaultInjectionUtil::InjectRpcException(FaultInjectionUtil::type, is_send)
#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq) \
FaultInjectionUtil::InjectRpcException(true, freq)
#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq) \
FaultInjectionUtil::InjectRpcException(false, freq)
#else // NDEBUG
#define FAULT_INJECTION_RPC_DELAY(type)
#define FAULT_INJECTION_RPC_EXCEPTION(type, is_send)
#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)
#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)
#endif

View File

@@ -52,49 +52,51 @@ class TestRPCException(CustomClusterTestSuite):
assert exception_string in str(e)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1"
" --fault_injection_rpc_type=5")
def test_transmitdata_send_lost_connection(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1")
def test_rpc_send_closed_connection(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2"
" --fault_injection_rpc_type=5")
def test_transmitdata_send_timed_out(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2")
def test_rpc_send_stale_connection(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3"
" --fault_injection_rpc_type=5")
def test_transmitdata_recv_lost_connection(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3")
def test_rpc_send_timed_out(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
def test_rpc_recv_closed_connection(self, vector):
self.execute_test_query("Called read on non-open socket")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4"
" --fault_injection_rpc_type=5")
def test_transmitdata_recv_timed_out(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
def test_rpc_recv_timed_out(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5"
" --fault_injection_rpc_type=5")
def test_transmitdata_secure_send_lost_connection(self, vector):
self.execute_test_query(None);
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6")
def test_rpc_secure_send_closed_connection(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6"
" --fault_injection_rpc_type=5")
def test_transmitdata_secure_send_timed_out(self, vector):
self.execute_test_query("SSL_write: Resource temporarily unavailable")
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7")
def test_rpc_secure_send_stale_connection(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7"
" --fault_injection_rpc_type=5")
def test_transmitdata_secure_recv_lost_connection(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8")
def test_rpc_secure_send_timed_out(self, vector):
self.execute_test_query(None)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
def test_rpc_secure_recv_closed_connection(self, vector):
self.execute_test_query("TTransportException: Transport not open")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8"
" --fault_injection_rpc_type=5")
def test_transmitdata_secure_recv_timed_out(self, vector):
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
def test_rpc_secure_recv_timed_out(self, vector):
self.execute_test_query(None)