mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
This patch stop setting up and building impala-shell for Python 2. A more thorough clean up will be done in the future. Testing: Pass build and test/shell/ in RHEL8. Change-Id: Ic7d59b283f4e2f011880ff6221d550b52714a538 Reviewed-on: http://gerrit.cloudera.org:8080/23750 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
415 lines
18 KiB
Python
Executable File
415 lines
18 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from contextlib import closing
|
|
import logging
|
|
import os
|
|
import re
|
|
import socket
|
|
from subprocess import PIPE, Popen
|
|
import time
|
|
|
|
import pexpect
|
|
import pytest
|
|
|
|
# This import is the actual ImpalaShell class from impala_shell.py.
|
|
# We rename it to ImpalaShellClass here because we later import another
|
|
# class called ImpalaShell from tests/shell/util.py, and we don't want
|
|
# to mask it.
|
|
from impala_shell.impala_shell import ImpalaShell as ImpalaShellClass
|
|
from tests.common.environ import IMPALA_LOCAL_BUILD_VERSION, ImpalaTestClusterProperties
|
|
from tests.common.impala_service import ImpaladService
|
|
from tests.common.impala_test_suite import (
|
|
IMPALAD_BEESWAX_HOST_PORT,
|
|
IMPALAD_HS2_HOST_PORT,
|
|
IMPALAD_HS2_HTTP_HOST_PORT,
|
|
STRICT_HS2_HOST_PORT,
|
|
STRICT_HS2_HTTP_HOST_PORT,
|
|
)
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
|
|
LOG = logging.getLogger('tests/shell/util.py')
|
|
LOG.addHandler(logging.StreamHandler())
|
|
|
|
SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory")
|
|
IMPALA_HOME = os.environ['IMPALA_HOME']
|
|
|
|
|
|
def build_shell_env(env=None):
|
|
""" Construct the environment for the shell to run in based on 'env', or the current
|
|
process's environment if env is None."""
|
|
if not env: env = os.environ
|
|
# Don't inherit PYTHONPATH or LD_LIBRARY_PATH - the shell launch script must set
|
|
# these to include dependencies. Copy 'env' to avoid mutating argument or os.environ.
|
|
env = dict(env)
|
|
if "PYTHONPATH" in env:
|
|
del env["PYTHONPATH"]
|
|
if "LD_LIBRARY_PATH" in env:
|
|
del env["LD_LIBRARY_PATH"]
|
|
return env
|
|
|
|
|
|
def assert_var_substitution(result):
|
|
assert_pattern(r'\bfoo_number=.*$', 'foo_number= 123123', result.stdout,
|
|
'Numeric values not replaced correctly')
|
|
assert_pattern(r'\bfoo_string=.*$', 'foo_string=123', result.stdout,
|
|
'String values not replaced correctly')
|
|
assert_pattern(r'\bVariables:[\s\n]*BAR:\s*[0-9]*\n\s*FOO:\s*[0-9]*',
|
|
'Variables:\n\tBAR: 456\n\tFOO: 123', result.stdout,
|
|
"Set variable not listed correctly by the first SET command")
|
|
assert_pattern(r'\bError: Unknown variable FOO1$',
|
|
'Error: Unknown variable FOO1', result.stderr,
|
|
'Missing variable FOO1 not reported correctly')
|
|
assert_pattern(r'\bmulti_test=.*$', 'multi_test=456_123_456_123',
|
|
result.stdout, 'Multiple replaces not working correctly')
|
|
assert_pattern(r'\bError:\s*Unknown\s*substitution\s*syntax\s*'
|
|
r'\(RANDOM_NAME\). Use \${VAR:var_name}',
|
|
'Error: Unknown substitution syntax (RANDOM_NAME). Use ${VAR:var_name}',
|
|
result.stderr, "Invalid variable reference")
|
|
assert_pattern(r'"This should be not replaced: \${VAR:foo} \${HIVEVAR:bar}"',
|
|
'"This should be not replaced: ${VAR:foo} ${HIVEVAR:bar}"',
|
|
result.stdout, "Variable escaping not working")
|
|
assert_pattern(r'\bVariable MYVAR set to.*$', 'Variable MYVAR set to foo123',
|
|
result.stderr, 'No evidence of MYVAR variable being set.')
|
|
assert_pattern(r'\bVariables:[\s\n]*BAR:.*[\s\n]*FOO:.*[\s\n]*MYVAR:.*$',
|
|
'Variables:\n\tBAR: 456\n\tFOO: 123\n\tMYVAR: foo123', result.stdout,
|
|
'Set variables not listed correctly by the second SET command')
|
|
assert_pattern(r'\bUnsetting variable FOO$', 'Unsetting variable FOO',
|
|
result.stdout, 'No evidence of variable FOO being unset')
|
|
assert_pattern(r'\bUnsetting variable BAR$', 'Unsetting variable BAR',
|
|
result.stdout, 'No evidence of variable BAR being unset')
|
|
assert_pattern(r'\bVariables:[\s\n]*No variables defined\.$',
|
|
'Variables:\n\tNo variables defined.', result.stdout,
|
|
'Unset variables incorrectly listed by third SET command.')
|
|
assert_pattern(r'\bNo variable called NONEXISTENT is set',
|
|
'No variable called NONEXISTENT is set', result.stdout,
|
|
'Problem unsetting non-existent variable.')
|
|
assert_pattern(r'\bVariable COMMENT_TYPE1 set to.*$',
|
|
'Variable COMMENT_TYPE1 set to ok', result.stderr,
|
|
'No evidence of COMMENT_TYPE1 variable being set.')
|
|
assert_pattern(r'\bVariable COMMENT_TYPE2 set to.*$',
|
|
'Variable COMMENT_TYPE2 set to ok', result.stderr,
|
|
'No evidence of COMMENT_TYPE2 variable being set.')
|
|
assert_pattern(r'\bVariable COMMENT_TYPE3 set to.*$',
|
|
'Variable COMMENT_TYPE3 set to ok', result.stderr,
|
|
'No evidence of COMMENT_TYPE3 variable being set.')
|
|
assert_pattern(r'\bVariables:[\s\n]*COMMENT_TYPE1:.*[\s\n]*'
|
|
r'COMMENT_TYPE2:.*[\s\n]*COMMENT_TYPE3:.*$',
|
|
'Variables:\n\tCOMMENT_TYPE1: ok\n\tCOMMENT_TYPE2: ok\n\tCOMMENT_TYPE3: ok',
|
|
result.stdout, 'Set variables not listed correctly by the SET command')
|
|
|
|
|
|
def assert_pattern(pattern, result, text, message):
|
|
"""Asserts that the pattern, when applied to text, returns the expected result"""
|
|
m = re.search(pattern, text, re.MULTILINE)
|
|
assert m and m.group(0) == result, message
|
|
|
|
|
|
def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
|
|
stdin_input=None, stdout_file=None, stderr_file=None):
|
|
"""Runs the Impala shell on the commandline.
|
|
|
|
'shell_args' is a string which represents the commandline options.
|
|
Returns a ImpalaShellResult.
|
|
"""
|
|
result = run_impala_shell_cmd_no_expect(vector, shell_args, env, stdin_input,
|
|
wait_until_connected=expect_success,
|
|
stdout_file=stdout_file,
|
|
stderr_file=stderr_file)
|
|
if expect_success:
|
|
assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
|
|
result.stderr)
|
|
else:
|
|
assert result.rc != 0, "Cmd %s was expected to fail" % shell_args
|
|
return result
|
|
|
|
|
|
def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=None,
|
|
wait_until_connected=True, stdout_file=None,
|
|
stderr_file=None):
|
|
"""Runs the Impala shell on the commandline.
|
|
|
|
'shell_args' is a string which represents the commandline options.
|
|
Returns a ImpalaShellResult.
|
|
|
|
Does not assert based on success or failure of command.
|
|
"""
|
|
# Only wait_until_connected if we also have input to send. Otherwise expect that
|
|
# the command will run without input and exit. Avoid waiting as ImpalaShell may
|
|
# exit before it can detect there is a successful connection and assert False.
|
|
p = ImpalaShell(vector, shell_args, env=env,
|
|
wait_until_connected=wait_until_connected and stdin_input,
|
|
stdout_file=stdout_file, stderr_file=stderr_file)
|
|
result = p.get_result(stdin_input)
|
|
return result
|
|
|
|
|
|
def get_impalad_host_port(vector):
|
|
"""Get host and port to connect to based on test vector provided."""
|
|
protocol = vector.get_value("protocol")
|
|
strict = vector.get_value_with_default("strict_hs2_protocol", False)
|
|
if protocol == 'hs2':
|
|
if strict:
|
|
return STRICT_HS2_HOST_PORT
|
|
else:
|
|
return IMPALAD_HS2_HOST_PORT
|
|
elif protocol == 'hs2-http':
|
|
if strict:
|
|
return STRICT_HS2_HTTP_HOST_PORT
|
|
else:
|
|
return IMPALAD_HS2_HTTP_HOST_PORT
|
|
else:
|
|
assert protocol == 'beeswax', protocol
|
|
return IMPALAD_BEESWAX_HOST_PORT
|
|
|
|
|
|
def get_impalad_port(vector):
|
|
"""Get integer port to connect to based on test vector provided."""
|
|
return int(get_impalad_host_port(vector).split(":")[1])
|
|
|
|
|
|
def get_shell_cmd(vector):
|
|
"""Get the basic shell command to start the shell, given the provided test vector.
|
|
Returns the command as a list of string arguments."""
|
|
impala_shell_executable = get_impala_shell_executable(vector)
|
|
if vector.get_value_with_default("strict_hs2_protocol", False):
|
|
protocol = vector.get_value("protocol")
|
|
return impala_shell_executable + [
|
|
"--protocol={0}".format(protocol),
|
|
"--strict_hs2_protocol",
|
|
"--use_ldap_test_password",
|
|
"-i{0}".format(get_impalad_host_port(vector))]
|
|
else:
|
|
return impala_shell_executable + [
|
|
"--protocol={0}".format(vector.get_value("protocol")),
|
|
"-i{0}".format(get_impalad_host_port(vector))]
|
|
|
|
|
|
def spawn_shell(shell_cmd):
|
|
"""Spawn a shell process with the provided command line. Returns the Pexpect object."""
|
|
return pexpect.spawn(shell_cmd[0], shell_cmd[1:], env=build_shell_env())
|
|
|
|
|
|
def get_open_sessions_metric(vector):
|
|
"""Get the name of the vector that tracks open sessions for the protocol in vector."""
|
|
protocol = vector.get_value("protocol")
|
|
if protocol in ('hs2', 'hs2-http'):
|
|
return 'impala-server.num-open-hiveserver2-sessions'
|
|
else:
|
|
assert protocol == 'beeswax', protocol
|
|
return 'impala-server.num-open-beeswax-sessions'
|
|
|
|
|
|
class ImpalaShellResult(object):
|
|
def __init__(self):
|
|
self.rc = 0
|
|
self.stdout = str()
|
|
self.stderr = str()
|
|
|
|
|
|
class ImpalaShell(object):
|
|
"""A single instance of the Impala shell. The process is started when this object is
|
|
constructed, and then users should repeatedly call send_cmd(), followed eventually by
|
|
get_result() to retrieve the process output. This constructor will wait until
|
|
Impala shell is connected for the specified timeout unless wait_until_connected is
|
|
set to False or --quiet is passed into the args."""
|
|
def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60,
|
|
stdout_file=None, stderr_file=None):
|
|
self.shell_process = self._start_new_shell_process(vector, args, env=env,
|
|
stdout_file=stdout_file,
|
|
stderr_file=stderr_file)
|
|
# When --quiet option is passed to Impala shell, we should not wait until we see
|
|
# "Connected to" because it will never be printed to stderr. The same is true
|
|
# if stderr is redirected.
|
|
if wait_until_connected and (args is None or "--quiet" not in args) and \
|
|
stderr_file is None:
|
|
# We don't want to hang waiting for input. So, here are the scenarios
|
|
# we need to handle:
|
|
# 1. Shell process exits
|
|
# 2. Shell fails to connect. This can lead to an interactive prompt
|
|
# that blocks for input forever. The two messages to look for are
|
|
# "Error connecting" and "Socket error"
|
|
# 3. Process successfully connecting: "Connected to"
|
|
# Cases 1 and 2 should lead to an assert.
|
|
start_time = time.time()
|
|
process_status = None
|
|
connection_err = None
|
|
connected = False
|
|
while time.time() - start_time < timeout:
|
|
# Condition 1: check if the shell process has exited
|
|
# poll() returns None until the process exits
|
|
process_status = self.shell_process.poll()
|
|
if process_status is not None:
|
|
break
|
|
# readline() can block forever, so the timeout logic may not be effective
|
|
# if something gets stuck here.
|
|
line = self.shell_process.stderr.readline()
|
|
# Condition 2: check for errors connecting
|
|
if ImpalaShellClass.ERROR_CONNECTING_MESSAGE in line or \
|
|
ImpalaShellClass.SOCKET_ERROR_MESSAGE in line:
|
|
connection_err = line
|
|
break
|
|
|
|
# Condition 3: check if the connection is successful
|
|
connected = ImpalaShellClass.CONNECTED_TO_MESSAGE in line
|
|
if connected:
|
|
break
|
|
|
|
assert process_status is None, \
|
|
"Impala shell exited with return code {0}".format(process_status)
|
|
assert connection_err is None, connection_err
|
|
assert connected, "Impala shell is not connected"
|
|
|
|
def pid(self):
|
|
return self.shell_process.pid
|
|
|
|
def send_cmd(self, cmd):
|
|
"""Send a single command to the shell. This method adds the end-of-query
|
|
terminator (';'). """
|
|
self.shell_process.stdin.write("%s;\n" % cmd)
|
|
self.shell_process.stdin.flush()
|
|
# Allow fluent-style chaining of commands
|
|
return self
|
|
|
|
def wait_for_query_start(self):
|
|
"""If this shell was started with the '-q' option, this mathod will block until the
|
|
query has started running"""
|
|
# readline() will block until a line is actually printed, so this loop should always
|
|
# read something like:
|
|
# Starting Impala Shell without Kerberos authentication
|
|
# Connected to localhost:21000
|
|
# Server version: impalad version...
|
|
# Query: select sleep(10)
|
|
# Query submitted at:...
|
|
# Query state can be monitored at:...
|
|
# We stop at 10 iterations to prevent an infinite loop if somehting goes wrong.
|
|
iters = 0
|
|
while "Query state" not in self.shell_process.stderr.readline() and iters < 10:
|
|
iters += 1
|
|
|
|
def get_result(self, stdin_input=None):
|
|
"""Returns an ImpalaShellResult produced by the shell process on exit. After this
|
|
method returns, send_cmd() no longer has any effect."""
|
|
result = ImpalaShellResult()
|
|
result.stdout, result.stderr = self.shell_process.communicate(input=stdin_input)
|
|
# We need to close STDIN if we gave it an input, in order to send an EOF that will
|
|
# allow the subprocess to exit.
|
|
if stdin_input is not None: self.shell_process.stdin.close()
|
|
result.rc = self.shell_process.returncode
|
|
return result
|
|
|
|
def _start_new_shell_process(self, vector, args=None, env=None, stdout_file=None,
|
|
stderr_file=None):
|
|
"""Starts a shell process and returns the process handle"""
|
|
cmd = get_shell_cmd(vector)
|
|
if args is not None: cmd += args
|
|
stdout_arg = stdout_file if stdout_file is not None else PIPE
|
|
stderr_arg = stderr_file if stderr_file is not None else PIPE
|
|
return Popen(cmd, shell=False, stdout=stdout_arg, stdin=PIPE, stderr=stderr_arg,
|
|
universal_newlines=True, env=build_shell_env(env))
|
|
|
|
|
|
def get_unused_port():
|
|
""" Find an unused port http://stackoverflow.com/questions/1365265 """
|
|
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
|
|
s.bind(('', 0))
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
return s.getsockname()[1]
|
|
|
|
|
|
def wait_for_query_state(vector, stmt, state, max_retry=15):
|
|
"""Waits for the given query 'stmt' to reach the given query 'state'. The state of the
|
|
query is taken from the debug web ui. Polls the state of 'stmt' every second until the
|
|
query gets to a state given via 'state' or a maximum retry count is reached.
|
|
Restriction: Only works if there is only one in flight query."""
|
|
impalad_service = ImpaladService(get_impalad_host_port(vector).split(':')[0])
|
|
if not impalad_service.wait_for_num_in_flight_queries(1):
|
|
raise Exception("No in flight query found")
|
|
|
|
retry_count = 0
|
|
while retry_count <= max_retry:
|
|
query_info = impalad_service.get_in_flight_queries()[0]
|
|
print(str(query_info))
|
|
if query_info['stmt'] != stmt:
|
|
exc_text = "The found in flight query is not the one under test: " + \
|
|
query_info['stmt']
|
|
raise Exception(exc_text)
|
|
if query_info['state'] == state:
|
|
return
|
|
retry_count += 1
|
|
time.sleep(1.0)
|
|
raise Exception("Query didn't reach desired state: " + state)
|
|
|
|
|
|
# Returns shell executable, and whether to include pypi variants
|
|
def get_dev_impala_shell_executable():
|
|
# Note that pytest.config.getoption is deprecated usage. We use this
|
|
# in a couple of other places. Ultimately, it needs to be addressed if
|
|
# we ever want to get off of pytest 2.9.2.
|
|
impala_shell_executable = pytest.config.getoption('shell_executable')
|
|
|
|
if impala_shell_executable is not None:
|
|
return impala_shell_executable, False
|
|
|
|
if ImpalaTestClusterProperties.get_instance().is_remote_cluster():
|
|
# With remote cluster testing, we cannot assume that the shell was built locally.
|
|
return os.path.join(IMPALA_HOME, "bin/impala-shell.sh"), False
|
|
else:
|
|
# Test the locally built shell distribution.
|
|
return os.path.join(IMPALA_HOME, "shell/build",
|
|
"impala-shell-" + IMPALA_LOCAL_BUILD_VERSION, "impala-shell"), True
|
|
|
|
|
|
def create_impala_shell_executable_dimension(dev_only=False):
|
|
_, include_pypi = get_dev_impala_shell_executable()
|
|
dimensions = []
|
|
python3_pytest = (os.getenv("IMPALA_USE_PYTHON3_TESTS", "false") == "true")
|
|
assert os.getenv("IMPALA_SYSTEM_PYTHON3"), "Must set env var IMPALA_SYSTEM_PYTHON3!"
|
|
dimensions.append('dev3')
|
|
if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest:
|
|
dimensions.append('dev')
|
|
if include_pypi and not dev_only:
|
|
if os.getenv("IMPALA_SYSTEM_PYTHON2") and not python3_pytest:
|
|
dimensions.append('python2')
|
|
if os.getenv("IMPALA_SYSTEM_PYTHON3"):
|
|
dimensions.append('python3')
|
|
return ImpalaTestDimension('impala_shell', *dimensions)
|
|
|
|
|
|
def get_impala_shell_executable(vector):
|
|
# impala-shell is invoked some places where adding a test vector may not make sense;
|
|
# use 'dev3' as the default.
|
|
impala_shell_executable, _ = get_dev_impala_shell_executable()
|
|
executable_map = {
|
|
'dev': ['env', 'IMPALA_PYTHON_EXECUTABLE=python',
|
|
'IMPALA_SHELL_PYTHON_FALLBACK=false', impala_shell_executable],
|
|
'dev3': ['env', 'IMPALA_PYTHON_EXECUTABLE=python3',
|
|
'IMPALA_SHELL_PYTHON_FALLBACK=false', impala_shell_executable],
|
|
'python2': [os.path.join(IMPALA_HOME, 'shell/build/python2_venv/bin/impala-shell')],
|
|
'python3': [os.path.join(IMPALA_HOME, 'shell/build/python3_venv/bin/impala-shell')]
|
|
}
|
|
return executable_map[vector.get_value_with_default('impala_shell', 'dev3')]
|
|
|
|
|
|
def stderr_get_first_error_msg(stderr):
|
|
"""Seek to the begining of the first error message in stderr of impala-shell."""
|
|
PROMPT = "ERROR: "
|
|
return stderr[(stderr.index(PROMPT) + len(PROMPT)):]
|