# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function import logging import os import pytest import re import subprocess from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.test_dimensions import create_client_protocol_dimension from tests.common.test_vector import ImpalaTestVector from tests.shell.util import run_impala_shell_cmd, get_impalad_port LOG = logging.getLogger('test_client_keepalive') class TestClientKeepalive(CustomClusterTestSuite): """Tests for enabling server-side keepalive for client connections. The mechanism is slightly different SSL and non-SSL, so this tests both.""" CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] SSL_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem " "--ssl_server_certificate=%s/server-cert.pem " "--ssl_private_key=%s/server-key.pem " "--hostname=localhost " # Required to match hostname in certificate % (CERT_DIR, CERT_DIR, CERT_DIR)) KEEPALIVE_ARGS = ("--client_keepalive_probe_period_s=600") def get_ss_command(self): # HACK: Most systems have ss on the PATH, but sometimes the PATH is misconfigured # while ss is still available in /usr/sbin. This tries the PATH and then falls back # to trying /usr/sbin/ss. possible_ss_commands = ['ss', '/usr/sbin/ss'] with open(os.devnull, "w") as devnull: for ss_command in possible_ss_commands: try: retcode = subprocess.call([ss_command], stdout=devnull, stderr=devnull) LOG.info("{0} returns {1}".format(ss_command, retcode)) if retcode == 0: return ss_command except Exception as e: LOG.info(e) pass raise Exception("No valid ss executable. Tried: {0}".format(possible_ss_commands)) def check_keepalive(self, vector, ssl): ss = self.get_ss_command() impalad_port = get_impalad_port(vector) # Sleep 1 second to make sure the connection is idle, then use the ss utility # to print information about keepalive. # -H disables the header # -t limits it to TCP connections # -o prints the timer information which includes keepalive # -n uses numeric addresses to avoid DNS lookups # sport X - limit to connections for the impalad port that we are using ss_command = "sleep 1 && {0} -Hton sport = {1}".format(ss, impalad_port) LOG.info("Command: {0}".format(ss_command)) shell_options = ["-q", "shell {0}".format(ss_command)] if ssl: shell_options.append("--ssl") result = run_impala_shell_cmd(vector, shell_options) LOG.info("STDOUT: {0} STDERR: {1}".format(result.stdout, result.stderr)) # The message is of the form "timer:(keepalive,$TIME,$NUM_RETRIES)" # e.g. "timer:(keepalive,9min58sec,0)" or "timer:(keepalive,10min,0)" KEEPALIVE_REGEX = r"timer:\(keepalive,([0-9]+)min([0-9]+sec)?,([0-9])\)" match = re.search(KEEPALIVE_REGEX, result.stdout) assert match, "Could not find keepalive information in {0}".format(result.stdout) num_minutes = int(match.group(1)) num_retries = int(match.group(3)) assert num_minutes == 9 or num_minutes == 10 assert num_retries == 0 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=(SSL_ARGS + KEEPALIVE_ARGS), statestored_args=SSL_ARGS, catalogd_args=SSL_ARGS) def test_ssl_keepalive(self, vector): # Keepalive applies to all client ports / protocols, so test all protocols # Iterate over test vector within test function to avoid restart cluster for vector in\ [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]: self.check_keepalive(vector, ssl=True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=KEEPALIVE_ARGS) def test_nonssl_keepalive(self, vector): # Keepalive applies to all client ports / protocols, so test all protocols # Iterate over test vector within test function to avoid restart cluster for vector in\ [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]: self.check_keepalive(vector, ssl=False)