IMPALA-9425 (part 1): Introduce uuids for impalads

This patch introduces the concept of 'backend ids', which are unique
ids that can be used to identify individual impalads. The ids are
generated by each impalad on startup.

The patch then uses the ids to fix a bug where the statestore may fail
to inform coordinators when an executor has failed and restarted. The
bug was caused by the fact that the statestore cluster membership
topic was keyed on statestore subscriber ids, which are host:port
pairs.

So, if an impalad fails and a new one is started at the same host:port
before a particular coordinator has a cluster membership update
generated for it by the statestore, the statestore has no way of
differentiating the prior impalad from the newly started impalad, and
the topic update will not show the deletion of the original impalad.

With this patch, the cluster membership topic is now keyed by backend
id, so since the restarted impalad will have a different backend id
the next membership update after the prior impalad failed is
guaranteed to reflect that failure.

The patch also logs the backend ids on startup and adds them to the
/backends webui page and to the query locations section of the
/queries page, for use in debugging.

Further patches will apply the backend ids in other places where we
currently key off host:port pairs to identify impalads.

Testing:
- Added an e2e test that uses a new debug action to add delay to
  statestore topic updates. Due to the use of JITTER the test is
  non-deterministic, but it repros the original issue locally for me
  about 50% of the time.
- Passed a full run of existing tests.

Change-Id: Icf8067349ed6b765f6fed830b7140f60738e9061
Reviewed-on: http://gerrit.cloudera.org:8080/15321
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Thomas Tauber-Marshall
2020-02-25 13:04:27 -08:00
committed by Impala Public Jenkins
parent bc3cf8aed2
commit ae0bd674a8
12 changed files with 122 additions and 45 deletions

View File

@@ -69,6 +69,7 @@
#include "util/system-state-info.h"
#include "util/test-info.h"
#include "util/thread-pool.h"
#include "util/uid-util.h"
#include "util/webserver.h"
#include "common/names.h"
@@ -251,6 +252,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
UUIDToTUniqueId(boost::uuids::random_generator()(), &backend_id_);
// Resolve hostname to IP address.
ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_));
@@ -279,7 +281,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
}
cluster_membership_mgr_.reset(new ClusterMembershipMgr(
statestore_subscriber_->id(), statestore_subscriber_.get(), metrics_.get()));
PrintId(backend_id_), statestore_subscriber_.get(), metrics_.get()));
admission_controller_.reset(
new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
@@ -300,6 +302,7 @@ Status ExecEnv::InitForFeTests() {
}
Status ExecEnv::Init() {
LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_);
// Initialize thread pools
if (FLAGS_is_coordinator) {
RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
@@ -544,9 +547,9 @@ void ExecEnv::SetImpalaServer(ImpalaServer* server) {
});
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[server](ClusterMembershipMgr::SnapshotPtr snapshot) {
std::unordered_set<TNetworkAddress> current_backend_set;
std::unordered_set<TBackendId> current_backend_set;
for (const auto& it : snapshot->current_backends) {
current_backend_set.insert(it.second.address);
current_backend_set.insert(it.second.backend_id);
}
server->CancelQueriesOnFailedBackends(current_backend_set);
});

View File

@@ -113,6 +113,8 @@ class ExecEnv {
/// StartServices() was successful.
TNetworkAddress GetThriftBackendAddress() const;
const TBackendId& backend_id() const { return backend_id_; }
KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
ImpalaBackendClientCache* impalad_client_cache() {
@@ -174,6 +176,9 @@ class ExecEnv {
int64_t admission_slots() const { return admission_slots_; }
private:
// Used to uniquely identify this impalad.
TBackendId backend_id_;
boost::scoped_ptr<ObjectPool> obj_pool_;
boost::scoped_ptr<MetricGroup> metrics_;
boost::scoped_ptr<KrpcDataStreamMgr> stream_mgr_;

View File

@@ -494,13 +494,16 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::WebRequest& req,
Value query_locations(kArrayType);
{
lock_guard<mutex> l(server_->query_locations_lock_);
for (const ImpalaServer::QueryLocations::value_type& location:
server_->query_locations_) {
for (const ImpalaServer::QueryLocations::value_type& location :
server_->query_locations_) {
Value location_json(kObjectType);
Value location_name(TNetworkAddressToString(location.first).c_str(),
Value location_name(TNetworkAddressToString(location.second.address).c_str(),
document->GetAllocator());
location_json.AddMember("location", location_name, document->GetAllocator());
location_json.AddMember("count", static_cast<uint64_t>(location.second.size()),
Value backend_id_str(PrintId(location.first).c_str(), document->GetAllocator());
location_json.AddMember("backend_id", backend_id_str, document->GetAllocator());
location_json.AddMember("count",
static_cast<uint64_t>(location.second.query_ids.size()),
document->GetAllocator());
query_locations.PushBack(location_json, document->GetAllocator());
}
@@ -940,6 +943,8 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::WebRequest& req,
TNetworkAddressToString(backend.krpc_address).c_str(), document->GetAllocator());
backend_obj.AddMember("address", str, document->GetAllocator());
backend_obj.AddMember("krpc_address", krpc_address, document->GetAllocator());
Value backend_id_str(PrintId(backend.backend_id).c_str(), document->GetAllocator());
backend_obj.AddMember("backend_id", backend_id_str, document->GetAllocator());
string webserver_url =
Substitute("$0://$1", backend.secure_webserver ? "https" : "http",
TNetworkAddressToString(backend.debug_http_address));

View File

@@ -1175,14 +1175,13 @@ Status ImpalaServer::CloseClientRequestState(
if (!per_backend_params.empty()) {
lock_guard<mutex> l(query_locations_lock_);
for (const auto& entry : per_backend_params) {
const TNetworkAddress& hostport = entry.first;
// Query may have been removed already by cancellation path. In particular, if
// node to fail was last sender to an exchange, the coordinator will realise and
// fail the query at the same time the failure detection path does the same
// thing. They will harmlessly race to remove the query from this map.
QueryLocations::iterator it = query_locations_.find(hostport);
auto it = query_locations_.find(entry.second.be_desc.backend_id);
if (it != query_locations_.end()) {
it->second.erase(request_state->query_id());
it->second.query_ids.erase(request_state->query_id());
}
}
}
@@ -1718,28 +1717,36 @@ void ImpalaServer::RegisterQueryLocations(
if (!per_backend_params.empty()) {
lock_guard<mutex> l(query_locations_lock_);
for (const auto& entry : per_backend_params) {
const TNetworkAddress& host = entry.first;
query_locations_[host].insert(query_id);
const TBackendId& backend_id = entry.second.be_desc.backend_id;
auto it = query_locations_.find(backend_id);
if (it == query_locations_.end()) {
query_locations_.emplace(
backend_id, QueryLocationInfo(entry.second.be_desc.address, query_id));
} else {
it->second.query_ids.insert(query_id);
}
}
}
}
void ImpalaServer::CancelQueriesOnFailedBackends(
const std::unordered_set<TNetworkAddress>& current_membership) {
const std::unordered_set<TBackendId>& current_membership) {
// Maps from query id (to be cancelled) to a list of failed Impalads that are
// the cause of the cancellation.
// the cause of the cancellation. Note that we don't need to use TBackendIds as a single
// query can't be scheduled on two backends with the same TNetworkAddress so there's no
// ambiguity, and passing the TNetworkAddresses into the CancellationWork makes them
// available for generating a user-friendly error message.
map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel;
{
// Build a list of queries that are running on failed hosts (as evidenced by their
// absence from the membership list).
// TODO: crash-restart failures can give false negatives for failed Impala demons.
lock_guard<mutex> l(query_locations_lock_);
QueryLocations::const_iterator loc_entry = query_locations_.begin();
while (loc_entry != query_locations_.end()) {
if (current_membership.find(loc_entry->first) == current_membership.end()) {
// Add failed backend locations to all queries that ran on that backend.
for (const auto& query_id : loc_entry->second) {
queries_to_cancel[query_id].push_back(loc_entry->first);
for (const auto& query_id : loc_entry->second.query_ids) {
queries_to_cancel[query_id].push_back(loc_entry->second.address);
}
loc_entry = query_locations_.erase(loc_entry);
} else {
@@ -1798,6 +1805,7 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_de
DCHECK(services_started_.load());
bool is_quiescing = shutting_down_.Load() != 0;
be_desc->__set_backend_id(exec_env_->backend_id());
be_desc->__set_address(exec_env_->GetThriftBackendAddress());
be_desc->__set_ip_address(exec_env_->ip_address());
be_desc->__set_is_coordinator(FLAGS_is_coordinator);

View File

@@ -463,10 +463,10 @@ class ImpalaServer : public ImpalaServiceIf,
void RegisterQueryLocations(
const PerBackendExecParams& per_backend_params, const TUniqueId& query_id);
/// Takes a set of network addresses of active backends and cancels all the queries
/// running on failed ones (that is, addresses not in the active set).
/// Takes a set of backend ids of active backends and cancels all the queries running on
/// failed ones (that is, ids not in the active set).
void CancelQueriesOnFailedBackends(
const std::unordered_set<TNetworkAddress>& current_membership);
const std::unordered_set<TBackendId>& current_membership);
/// Start the shutdown process. Return an error if it could not be started. Otherwise,
/// if it was successfully started by this or a previous call, return OK along with
@@ -1274,10 +1274,23 @@ class ImpalaServer : public ImpalaServiceIf,
/// Protects query_locations_. Not held in conjunction with other locks.
std::mutex query_locations_lock_;
/// A map from backend to the list of queries currently running or expected to run
/// there.
typedef std::unordered_map<TNetworkAddress, std::unordered_set<TUniqueId>>
QueryLocations;
/// Entries in the 'query_locations' map.
struct QueryLocationInfo {
QueryLocationInfo(TNetworkAddress address, TUniqueId query_id) : address(address) {
query_ids.insert(query_id);
}
/// Used for logging and error messages so that users don't have to translate between
/// the TBackendId and a hostname themselves.
TNetworkAddress address;
/// Queries currently running or expected to run at this location.
std::unordered_set<TUniqueId> query_ids;
};
/// Contains info about what queries are running on each backend, so that they can be
/// cancelled if the backend goes down.
typedef std::unordered_map<TBackendId, QueryLocationInfo> QueryLocations;
QueryLocations query_locations_;
/// The local backend descriptor. Updated in GetLocalBackendDescriptor() and protected

View File

@@ -106,6 +106,7 @@ DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time af
"badly hung machines that are not able to respond to the update RPC in short "
"order.");
DECLARE_string(debug_actions);
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_string(ssl_private_key_password_cmd);
@@ -919,6 +920,8 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
VLOG(3) << "Initial " << update_kind_str << " message for: " << update.subscriber_id;
}
DebugActionNoFail(FLAGS_debug_actions, "DO_SUBSCRIBER_UPDATE");
// Send the right message type, and compute the next deadline
int64_t deadline_ms = 0;
Status status;

View File

@@ -59,42 +59,45 @@ struct TExecutorGroupDesc {
// all other Impalads in the cluster. Impalads can act as coordinators, executors or
// both.
struct TBackendDescriptor {
// Unique identifier for this impalad. Generated on startup.
1: required Types.TBackendId backend_id;
// Network address of the thrift based ImpalaInternalService on this backend
1: required Types.TNetworkAddress address;
2: required Types.TNetworkAddress address;
// IP address corresponding to address.hostname. Explicitly including this saves the
// cost of resolution at every Impalad (since IP addresses are needed for scheduling)
2: required string ip_address;
3: required string ip_address;
// True if this is a coordinator node
3: required bool is_coordinator;
4: required bool is_coordinator;
// True if this is an executor node
4: required bool is_executor;
5: required bool is_executor;
// The address of the debug HTTP server
5: optional Types.TNetworkAddress debug_http_address;
6: optional Types.TNetworkAddress debug_http_address;
// True if the debug webserver is secured (for correctly generating links)
6: optional bool secure_webserver;
7: optional bool secure_webserver;
// IP address + port of KRPC based ImpalaInternalService on this backend
7: optional Types.TNetworkAddress krpc_address;
8: optional Types.TNetworkAddress krpc_address;
// The amount of memory that can be admitted to this backend (in bytes).
8: required i64 admit_mem_limit;
9: required i64 admit_mem_limit;
// True if fragment instances should not be scheduled on this daemon because the
// daemon has been quiescing, e.g. if it shutting down.
9: required bool is_quiescing;
10: required bool is_quiescing;
// The list of executor groups that this backend belongs to. Only valid if is_executor
// is set, and currently must contain exactly one entry.
10: required list<TExecutorGroupDesc> executor_groups;
11: required list<TExecutorGroupDesc> executor_groups;
// The number of admission slots for this backend that can be occupied by running
// queries.
11: required i64 admission_slots;
12: required i64 admission_slots;
}
// Description of a single entry in a topic

View File

@@ -151,6 +151,9 @@ struct TUniqueId {
2: required i64 lo
}
// Used to uniquely identify individual impalads.
typedef TUniqueId TBackendId;
enum TFunctionCategory {
SCALAR = 0
AGGREGATE = 1

View File

@@ -1077,21 +1077,24 @@ class ImpalaTestSuite(BaseTestSuite):
return workload_strategy[1]
return default_strategy
def wait_for_state(self, handle, expected_state, timeout):
"""Waits for the given 'query_handle' to reach the 'expected_state'. If it does not
reach the given state within 'timeout' seconds, the method throws an AssertionError.
def wait_for_state(self, handle, expected_state, timeout, client=None):
"""Waits for the given 'query_handle' to reach the 'expected_state' using 'client', or
with the default connection if 'client' is None. If it does not reach the given state
within 'timeout' seconds, the method throws an AssertionError.
"""
self.wait_for_any_state(handle, [expected_state], timeout)
self.wait_for_any_state(handle, [expected_state], timeout, client)
def wait_for_any_state(self, handle, expected_states, timeout):
"""Waits for the given 'query_handle' to reach one of 'expected_states'. If it does
not reach one of the given states within 'timeout' seconds, the method throws an
AssertionError. Returns the final state.
def wait_for_any_state(self, handle, expected_states, timeout, client=None):
"""Waits for the given 'query_handle' to reach one of 'expected_states' using 'client'
or with the default connection if 'client' is None. If it does not reach one of the
given states within 'timeout' seconds, the method throws an AssertionError. Returns
the final state.
"""
if client is None: client = self.client
start_time = time.time()
actual_state = self.client.get_state(handle)
actual_state = client.get_state(handle)
while actual_state not in expected_states and time.time() - start_time < timeout:
actual_state = self.client.get_state(handle)
actual_state = client.get_state(handle)
time.sleep(0.5)
if actual_state not in expected_states:
raise Timeout("query {0} did not reach one of the expected states {1}, "

View File

@@ -92,6 +92,29 @@ class TestRestart(CustomClusterTestSuite):
client.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
# Debug action to delay statestore updates to give the restarted impalad time to
# register itself before a membership topic update is generated.
statestored_args="--debug_actions=DO_SUBSCRIBER_UPDATE:JITTER@10000")
def test_statestore_update_after_impalad_restart(self):
"""Test that checks that coordinators are informed that an impalad went down even if
the statestore doesn't send a membership update until after a new impalad has been
restarted at the same location."""
if self.exploration_strategy() != 'exhaustive':
pytest.skip()
assert len(self.cluster.impalads) == 3
client = self.cluster.impalads[0].service.create_beeswax_client()
assert client is not None
handle = client.execute_async(
"select count(*) from functional.alltypes where id = sleep(100000)")
node_to_restart = self.cluster.impalads[2]
node_to_restart.restart()
# Verify that the query is cancelled due to the failed impalad quickly.
self.wait_for_state(handle, QueryState.EXCEPTION, 20, client=client)
@pytest.mark.execute_serially
def test_catalog_connection_retries(self):
"""Test that connections to the catalogd are retried, both new connections and cached

View File

@@ -26,6 +26,7 @@ under the License.
<th></th>
<th>Address</th>
<th>Krpc Address</th>
<th>Backend Id</th>
<th>Coordinator</th>
<th>Executor</th>
<th>Memory Limit for Admission</th>
@@ -43,6 +44,7 @@ under the License.
<td><a href='{{webserver_url}}'>Web UI</a></td>
<td>{{address}}</td>
<td>{{krpc_address}}</td>
<td>{{backend_id}}</td>
<td>{{is_coordinator}}</td>
<td>{{is_executor}}</td>
<td>{{admit_mem_limit}}</td>
@@ -66,6 +68,7 @@ under the License.
<th></th>
<th>Address</th>
<th>Krpc Address</th>
<th>Backend Id</th>
<th>Coordinator</th>
<th>Executor</th>
<th>Memory Limit for Admission</th>
@@ -83,6 +86,7 @@ under the License.
<td><a href='{{webserver_url}}'>Web UI</a></td>
<td>{{address}}</td>
<td>{{krpc_address}}</td>
<td>{{backend_id}}</td>
<td>{{is_coordinator}}</td>
<td>{{is_executor}}</td>
<td>{{admit_mem_limit}}</td>
@@ -107,6 +111,7 @@ under the License.
<th></th>
<th>Address</th>
<th>Krpc Address</th>
<th>Backend Id</th>
<th>Blacklisting Cause</th>
<th>Time remaining on blacklist</th>
<th>Coordinator</th>
@@ -126,6 +131,7 @@ under the License.
<td><a href='{{webserver_url}}'>Web UI</a></td>
<td>{{address}}</td>
<td>{{krpc_address}}</td>
<td>{{backend_id}}</td>
<td>{{blacklist_cause}}</td>
<td>{{blacklist_time_remaining}}</td>
<td>{{is_coordinator}}</td>

View File

@@ -147,11 +147,13 @@ archived in memory. The size of that archive is controlled with the
<table class='table table-hover table-bordered'>
<tr>
<th>Location</th>
<th>Backend Id</th>
<th>Number of running queries with fragments on this host</th>
</tr>
{{#query_locations}}
<tr>
<td>{{location}}</td>
<td>{{backend_id}}</td>
<td>{{count}}</td>
</tr>
{{/query_locations}}