IMPALA-12156: Support High Availability for Statestore

To support statestore HA, we allow two statestored instances in an
Active-Passive HA pair to be added to an Impala cluster. We add the
preemptive behavior for statestored. When HA is enabled, the preemptive
behavior allows the statestored with the higher priority to become
active and the paired statestored becomes standby. The active
statestored acts as the owner of Impala cluster and provides statestore
service for the cluster members.

To enable catalog HA for a cluster, two statestoreds in the HA pair and
all subscribers must be started with starting flag
"enable_statestored_ha" as true.

This patch makes following changes:
- Defined new service for Statestore HA.
- Statestored negotiates the role for HA with its peer statestore
  instance on startup.
- Create HA monitor thread:
  Active statestored sends heartbeat to standby statestored.
  Standby statestored monitors peer's connection states with their
  subscribers.
- Standby statestored sends heartbeat to subscribers with request
  for connection state between active statestore and subscribers.
  Standby statestored saves the connection state as failure detecter.
- When standby statestored lost connection with active statestore,
  it checks the connection states for active statestore, and takes over
  active role if majority of subscribers lost connections with active
  statestore.
- New active statestored sends RPC notification to all subscribers
  for new active statestored and active catalogd elected by the new
  active statestored.
- New active statestored starts sending heartbeat to its peer when it
  receives handshake from its peer.
- Active statestored enters recovery mode if it lost connections with
  its peer statestored and all subscribers. It keeps sending HA
  handshake to its peer until receiving response.
- All subscribers (impalad/catalogd/admissiond) register to two
  statestoreds.
- Subscribers report connection state for the requests from standby
  statestore.
- Subscribers switch to new active statestore when receiving RPC
  notifications from new active statestored.
- Only active statestored sends topic update messages to subscribers.
- Add a new option "enable_statestored_ha" in script
  bin/start-impala-cluster.py for starting Impala mini-cluster with
  statestored HA enabled.
- Add a new Thrift API in statestore service to disable network
  for statestored. It's only used for unit-test to simulate network
  failure. For safety, it's only working when the debug action is set
  in starting flags.

Testings:
 - Added end-to-end unit tests for statestored HA.
 - Passed core tests

Change-Id: Ibd2c814bbad5c04c1d50c2edaa5b910c82a6fd87
Reviewed-on: http://gerrit.cloudera.org:8080/20372
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
wzhou-code
2023-06-22 18:06:08 -07:00
committed by Wenzhe Zhou
parent 141e3f1748
commit c9c5fb89b5
23 changed files with 2541 additions and 114 deletions

View File

@@ -43,6 +43,8 @@ from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
DEFAULT_ADMISSIOND_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT,
DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT,
DEFAULT_EXTERNAL_FE_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
DEFAULT_STATESTORE_SERVICE_PORT, DEFAULT_STATESTORE_HA_SERVICE_PORT,
DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT,
find_user_processes, run_daemon)
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
@@ -158,6 +160,10 @@ parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha",
parser.add_option("--jni_frontend_class", dest="jni_frontend_class",
action="store", default="org/apache/impala/service/JniFrontend",
help="Use a custom java frontend interface.")
parser.add_option("--enable_statestored_ha", dest="enable_statestored_ha",
action="store_true", default=False,
help="If true, enables StatestoreD HA - the cluster will be launched "
"with two statestored instances as Active-Passive HA pair.")
# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
# replica initialization. The ith delay is applied to the ith impalad.
@@ -318,6 +324,48 @@ def catalogd_service_name(i):
return "catalogd_node{node_num}".format(node_num=i)
def choose_statestored_ports(enable_statestored_ha, instance_num):
"""Compute the ports for statestored instance num 'instance_num', returning as a map
from the argument name to the port number."""
if not enable_statestored_ha:
return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num,
'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num}
else:
# Assume two statestore instances will be launched when statestore HA is enabled
state_store_peer_ha_port =\
DEFAULT_STATESTORE_HA_SERVICE_PORT + ((instance_num + 1) % 2)
return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num,
'state_store_ha_port': DEFAULT_STATESTORE_HA_SERVICE_PORT + instance_num,
'state_store_peer_ha_port': state_store_peer_ha_port,
'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num}
def build_statestored_port_args(enable_statestored_ha, instance_num):
if not enable_statestored_ha:
STATESTORED_PORTS = (
"-state_store_port={state_store_port} "
"-webserver_port={webserver_port}")
return STATESTORED_PORTS.format(
**choose_statestored_ports(enable_statestored_ha, instance_num))
else:
STATESTORED_PORTS = (
"-state_store_port={state_store_port} "
"-state_store_ha_port={state_store_ha_port} "
"-state_store_peer_ha_port={state_store_peer_ha_port} "
"-webserver_port={webserver_port}")
return STATESTORED_PORTS.format(
**choose_statestored_ports(enable_statestored_ha, instance_num))
def statestored_service_name(i):
"""Return the name to use for the ith statestore daemon in the cluster."""
if i == 0:
# The first statestored always logs to statestored.INFO
return "statestored"
else:
return "statestored_node{node_num}".format(node_num=i)
def combine_arg_list_opts(opt_args):
"""Helper for processing arguments like impalad_args. The input is a list of strings,
each of which is the string passed into one instance of the argument, e.g. for
@@ -328,17 +376,29 @@ def combine_arg_list_opts(opt_args):
return list(itertools.chain(*[shlex.split(arg) for arg in opt_args]))
def build_statestored_arg_list(enable_catalogd_ha):
"""Build a list of command line arguments to pass to the statestored."""
args = (build_logging_args("statestored") + build_kerberos_args("statestored")
+ combine_arg_list_opts(options.state_store_args))
if (enable_catalogd_ha):
args.extend(["-enable_catalogd_ha=true"])
return args
def build_statestored_arg_list(num_statestored, remap_ports):
"""Build a list of lists of command line arguments to pass to each statestored
instance. Build args for two statestored instances if statestored HA is enabled."""
statestored_arg_list = []
for i in range(num_statestored):
service_name = statestored_service_name(i)
args = (build_logging_args(service_name)
+ build_kerberos_args("statestored")
+ combine_arg_list_opts(options.state_store_args))
if remap_ports:
statestored_port_args =\
build_statestored_port_args(options.enable_statestored_ha, i)
args.extend(shlex.split(statestored_port_args))
if options.enable_catalogd_ha:
args.extend(["-enable_catalogd_ha=true"])
if options.enable_statestored_ha:
args.extend(["-enable_statestored_ha=true"])
statestored_arg_list.append(args)
return statestored_arg_list
def build_catalogd_arg_list(num_catalogd, enable_catalogd_ha, remap_ports):
"""Build a list of command line arguments to pass to the catalogd.
def build_catalogd_arg_list(num_catalogd, remap_ports):
"""Build a list of lists of command line arguments to pass to each catalogd instance.
Build args for two catalogd instances if catalogd HA is enabled."""
catalogd_arg_list = []
for i in range(num_catalogd):
@@ -350,16 +410,32 @@ def build_catalogd_arg_list(num_catalogd, enable_catalogd_ha, remap_ports):
if remap_ports:
catalogd_port_args = build_catalogd_port_args(i)
args.extend(shlex.split(catalogd_port_args))
if enable_catalogd_ha:
if options.enable_catalogd_ha:
args.extend(["-enable_catalogd_ha=true"])
if options.enable_statestored_ha:
args.extend(["-enable_statestored_ha=true"])
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
args.extend(
["-state_store_port={0}".format(state_store_port)])
args.extend(
["-state_store_2_port={0}".format(state_store_port + 1)])
catalogd_arg_list.append(args)
return catalogd_arg_list
def build_admissiond_arg_list():
"""Build a list of command line arguments to pass to the admissiond."""
return (build_logging_args("admissiond") + build_kerberos_args("admissiond") +
combine_arg_list_opts(options.admissiond_args))
args = (build_logging_args("admissiond")
+ build_kerberos_args("admissiond")
+ combine_arg_list_opts(options.admissiond_args))
if options.enable_statestored_ha:
args.extend(["-enable_statestored_ha=true"])
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
args.extend(
["-state_store_port={0}".format(state_store_port)])
args.extend(
["-state_store_2_port={0}".format(state_store_port + 1)])
return args
def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators,
@@ -484,6 +560,14 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
args = "{args} -admission_service_host={host}".format(
args=args, host=admissiond_host)
if options.enable_statestored_ha:
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
state_store_2_port = DEFAULT_STATESTORE_SERVICE_PORT + 1
args = "{args} -enable_statestored_ha=true -state_store_port={state_store_port} "\
"-state_store_2_port={state_store_2_port}".format(
args=args, state_store_port=state_store_port,
state_store_2_port=state_store_2_port)
if "geospatial_library" not in args:
args = "{args} -geospatial_library={geospatial_library}".format(
args=args, geospatial_library=options.geospatial_library)
@@ -557,29 +641,36 @@ class MiniClusterOperations(object):
def kill_all_catalogds(self, force=False):
kill_matching_processes(["catalogd"], force=force)
def kill_statestored(self, force=False):
def kill_all_statestoreds(self, force=False):
kill_matching_processes(["statestored"], force=force)
def kill_admissiond(self, force=False):
kill_matching_processes(["admissiond"], force=force)
def start_statestore(self):
LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
log_dir=options.log_dir))
output_file = os.path.join(options.log_dir, "statestore-out.log")
run_daemon_with_options("statestored",
build_statestored_arg_list(options.enable_catalogd_ha), output_file)
if not check_process_exists("statestored", 10):
raise RuntimeError("Unable to start statestored. Check log or file permissions"
" for more details.")
if options.enable_statestored_ha:
num_statestored = 2
else:
num_statestored = 1
statestored_arg_lists = build_statestored_arg_list(num_statestored, remap_ports=True)
for i in range(num_statestored):
service_name = statestored_service_name(i)
LOG.info(
"Starting State Store logging to {log_dir}/{service_name}.INFO".format(
log_dir=options.log_dir, service_name=service_name))
output_file = os.path.join(
options.log_dir, "{service_name}-out.log".format(service_name=service_name))
run_daemon_with_options("statestored", statestored_arg_lists[i], output_file)
if not check_process_exists("statestored", 10):
raise RuntimeError("Unable to start statestored. Check log or file permissions"
" for more details.")
def start_catalogd(self):
if options.enable_catalogd_ha:
num_catalogd = 2
else:
num_catalogd = 1
catalogd_arg_lists = build_catalogd_arg_list(
num_catalogd, options.enable_catalogd_ha, remap_ports=True)
catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=True)
for i in range(num_catalogd):
service_name = catalogd_service_name(i)
LOG.info(
@@ -654,7 +745,7 @@ class DockerMiniClusterOperations(object):
use_admission_service=options.enable_admission_service)
def kill_all_daemons(self, force=False):
self.kill_statestored(force=force)
self.kill_all_statestoreds(force=force)
self.kill_all_catalogds(force=force)
self.kill_admissiond(force=force)
self.kill_all_impalads(force=force)
@@ -679,24 +770,46 @@ class DockerMiniClusterOperations(object):
LOG.info("Stopping container {0}".format(container_name))
check_call(["docker", "stop", container_name])
def kill_statestored(self, force=False):
self.__stop_container__("statestored")
def kill_all_statestoreds(self, force=False):
# List all running containers on the network and kill those with the statestored name
# prefix to make sure that no running container are left over from previous clusters.
container_name_prefix = self.__gen_container_name__("statestored")
for container_id, info in self.__get_network_info__()["Containers"].items():
container_name = info["Name"]
if container_name.startswith(container_name_prefix):
LOG.info("Stopping container {0}".format(container_name))
check_call(["docker", "stop", container_name])
def kill_admissiond(self, force=False):
self.__stop_container__("admissiond")
def start_statestore(self):
self.__run_container__("statestored",
build_statestored_arg_list(options.enable_catalogd_ha),
{DEFAULT_STATESTORED_WEBSERVER_PORT: DEFAULT_STATESTORED_WEBSERVER_PORT})
if not options.enable_statestored_ha:
statestored_arg_lists =\
build_statestored_arg_list(num_statestored=1, remap_ports=False)
self.__run_container__("statestored", statestored_arg_lists[0],
{DEFAULT_STATESTORED_WEBSERVER_PORT: DEFAULT_STATESTORED_WEBSERVER_PORT})
else:
num_statestored = 2
statestored_arg_lists =\
build_statestored_arg_list(num_statestored, remap_ports=False)
for i in range(num_statestored):
chosen_ports = choose_statestored_ports(
enable_statestored_ha=True, instance_num=i)
port_map = {
DEFAULT_STATESTORE_SERVICE_PORT: chosen_ports['state_store_port'],
DEFAULT_STATESTORE_HA_SERVICE_PORT: chosen_ports['state_store_ha_port'],
DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT:
chosen_ports['state_store_peer_ha_port'],
DEFAULT_STATESTORED_WEBSERVER_PORT: chosen_ports['webserver_port']}
self.__run_container__("statestored", statestored_arg_lists[i], port_map, i)
def start_catalogd(self):
if options.enable_catalogd_ha:
num_catalogd = 2
else:
num_catalogd = 1
catalogd_arg_lists = build_catalogd_arg_list(
num_catalogd, options.enable_catalogd_ha, remap_ports=False)
catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=False)
for i in range(num_catalogd):
chosen_ports = choose_catalogd_ports(i)
port_map = {DEFAULT_CATALOG_SERVICE_PORT: chosen_ports['catalog_service_port'],
@@ -705,7 +818,7 @@ class DockerMiniClusterOperations(object):
def start_admissiond(self):
self.__run_container__("admissiond", build_admissiond_arg_list(),
{DEFAULT_ADMISSIOND_WEBSERVER_PORT: DEFAULT_ADMISSIOND_WEBSERVER_PORT})
{DEFAULT_ADMISSIOND_WEBSERVER_PORT: DEFAULT_ADMISSIOND_WEBSERVER_PORT})
def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators):
impalad_arg_lists = build_impalad_arg_lists(cluster_size, num_coordinators,
@@ -884,7 +997,7 @@ if __name__ == "__main__":
elif options.restart_catalogd_only:
cluster_ops.kill_all_catalogds(force=options.force_kill)
elif options.restart_statestored_only:
cluster_ops.kill_statestored(force=options.force_kill)
cluster_ops.kill_all_statestoreds(force=options.force_kill)
elif options.add_executors:
pass
else: