Files
impala/tests/shell/test_shell_interactive.py
Zoltan Borok-Nagy 6539e89c81 IMPALA-2235: Fix current db when shell auto-reconnects
The ImpalaShell didn't issue the 'USE <current-db>' command after
reconnecting to the Impala daemon. Therefore the client session
used the default DB after reconnection, not the previously selected DB.

Setting the current DB is done by the _validate_database method.
Before this commit it appended the "use <db>" command to the
command queue of the Cmd class. But, at this point we might already
have commands in the command queue that will run before the
"use <db>" command. In case of reconnection, we want to invoke
the USE command right away.

Also, the command processed by the precmd() method can entirely skip
the command queue, therefore it is not enough to insert the USE
command to the front of the command queue. We need to issue the
USE command with the onecmd() method to execute it immediately.

I extended the _validate_database method with an "immediately" flag.
If this flag is true, _validate_database will use the onecmd() method.
Otherwise, it will append the USE command to the command queue to
maintain the previous behaviour.

I added a new automated test suite named test_shell_interactive_reconnect.py
to the "custom cluster" tests. It sets the default database, and after
reconnection it checks if the shell set it again automatically.

One test case checks if the shell set the DB after manually reconnecting
to the impala daemon by issuing the CONNECT command.
The other test case checks if the shell set the DB after automatic
reconnection due to cluster restart.

I needed to backup the impala shell history file because I didn't
want to pollute it by the test cases (just like the way it is done in
tests/shell/test_shell_interactive.py). I created utility functions for
this in tests/shell/util.py and now test_shell_interactive.py and
the newly created test suite are using these utility functions.

Change-Id: I40dfa00ba0314d356fe8617446f516505c925e5e
Reviewed-on: http://gerrit.cloudera.org:8080/8368
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
2017-11-15 22:42:22 +00:00

370 lines
16 KiB
Python
Executable File

#!/usr/bin/env impala-python
# encoding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import pexpect
import pytest
import re
import signal
import socket
import sys
import tempfile
from time import sleep
from tests.common.impala_service import ImpaladService
from tests.common.skip import SkipIfLocal
from util import assert_var_substitution, ImpalaShell
from util import move_shell_history, restore_shell_history
SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
SHELL_HISTORY_FILE = os.path.expanduser("~/.impalahistory")
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
class TestImpalaShellInteractive(object):
"""Test the impala shell interactively"""
@classmethod
def setup_class(cls):
cls.tempfile_name = tempfile.mktemp()
move_shell_history(cls.tempfile_name)
@classmethod
def teardown_class(cls):
restore_shell_history(cls.tempfile_name)
def _expect_with_cmd(self, proc, cmd, expectations=()):
"""Executes a command on the expect process instance and verifies a set of
assertions defined by the expections."""
proc.sendline(cmd + ";")
proc.expect(":21000] >")
if not expectations: return
for e in expectations:
assert e in proc.before
@pytest.mark.execute_serially
def test_local_shell_options(self):
"""Test that setting the local shell options works"""
proc = pexpect.spawn(SHELL_CMD)
proc.expect(":21000] >")
self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: False", "LIVE_SUMMARY: False"))
self._expect_with_cmd(proc, "set live_progress=true")
self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: False"))
self._expect_with_cmd(proc, "set live_summary=1")
self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: True"))
@pytest.mark.execute_serially
def test_compute_stats_with_live_progress_options(self):
"""Test that setting LIVE_PROGRESS options won't cause COMPUTE STATS query fail"""
p = ImpalaShell()
p.send_cmd("set live_progress=True")
p.send_cmd("set live_summary=True")
p.send_cmd('create table test_live_progress_option(col int);')
try:
p.send_cmd('compute stats test_live_progress_option;')
finally:
p.send_cmd('drop table if exists test_live_progress_option;')
result = p.get_result()
assert "Updated 1 partition(s) and 1 column(s)" in result.stdout
@pytest.mark.execute_serially
def test_escaped_quotes(self):
"""Test escaping quotes"""
# test escaped quotes outside of quotes
result = run_impala_shell_interactive("select \\'bc';")
assert "Unexpected character" in result.stderr
result = run_impala_shell_interactive("select \\\"bc\";")
assert "Unexpected character" in result.stderr
# test escaped quotes within quotes
result = run_impala_shell_interactive("select 'ab\\'c';")
assert "Fetched 1 row(s)" in result.stderr
result = run_impala_shell_interactive("select \"ab\\\"c\";")
assert "Fetched 1 row(s)" in result.stderr
@pytest.mark.execute_serially
def test_cancellation(self):
impalad = ImpaladService(socket.getfqdn())
impalad.wait_for_num_in_flight_queries(0)
command = "select sleep(10000);"
p = ImpalaShell()
p.send_cmd(command)
sleep(3)
os.kill(p.pid(), signal.SIGINT)
result = p.get_result()
assert "Cancelled" not in result.stderr
assert impalad.wait_for_num_in_flight_queries(0)
@pytest.mark.execute_serially
def test_unicode_input(self):
"Test queries containing non-ascii input"
# test a unicode query spanning multiple lines
unicode_text = u'\ufffd'
args = "select '%s'\n;" % unicode_text.encode('utf-8')
result = run_impala_shell_interactive(args)
assert "Fetched 1 row(s)" in result.stderr
@pytest.mark.execute_serially
def test_welcome_string(self):
"""Test that the shell's welcome message is only printed once
when the shell is started. Ensure it is not reprinted on errors.
Regression test for IMPALA-1153
"""
result = run_impala_shell_interactive('asdf;')
assert result.stdout.count("Welcome to the Impala shell") == 1
result = run_impala_shell_interactive('select * from non_existent_table;')
assert result.stdout.count("Welcome to the Impala shell") == 1
@pytest.mark.execute_serially
def test_bash_cmd_timing(self):
"""Test existence of time output in bash commands run from shell"""
args = "! ls;"
result = run_impala_shell_interactive(args)
assert "Executed in" in result.stderr
@SkipIfLocal.multiple_impalad
@pytest.mark.execute_serially
def test_reconnect(self):
"""Regression Test for IMPALA-1235
Verifies that a connect command by the user is honoured.
"""
def get_num_open_sessions(impala_service):
"""Helper method to retrieve the number of open sessions"""
return impala_service.get_metric_value('impala-server.num-open-beeswax-sessions')
hostname = socket.getfqdn()
initial_impala_service = ImpaladService(hostname)
target_impala_service = ImpaladService(hostname, webserver_port=25001,
beeswax_port=21001, be_port=22001)
# Get the initial state for the number of sessions.
num_sessions_initial = get_num_open_sessions(initial_impala_service)
num_sessions_target = get_num_open_sessions(target_impala_service)
# Connect to localhost:21000 (default)
p = ImpalaShell()
sleep(2)
# Make sure we're connected <hostname>:21000
assert get_num_open_sessions(initial_impala_service) == num_sessions_initial + 1, \
"Not connected to %s:21000" % hostname
p.send_cmd("connect %s:21001" % hostname)
# Wait for a little while
sleep(2)
# The number of sessions on the target impalad should have been incremented.
assert get_num_open_sessions(target_impala_service) == num_sessions_target + 1, \
"Not connected to %s:21001" % hostname
# The number of sessions on the initial impalad should have been decremented.
assert get_num_open_sessions(initial_impala_service) == num_sessions_initial, \
"Connection to %s:21000 should have been closed" % hostname
@pytest.mark.execute_serially
def test_ddl_queries_are_closed(self):
"""Regression test for IMPALA-1317
The shell does not call close() for alter, use and drop queries, leaving them in
flight. This test issues those queries in interactive mode, and checks the debug
webpage to confirm that they've been closed.
TODO: Add every statement type.
"""
TMP_DB = 'inflight_test_db'
TMP_TBL = 'tmp_tbl'
MSG = '%s query should be closed'
NUM_QUERIES = 'impala-server.num-queries'
impalad = ImpaladService(socket.getfqdn())
p = ImpalaShell()
try:
start_num_queries = impalad.get_metric_value(NUM_QUERIES)
p.send_cmd('create database if not exists %s' % TMP_DB)
p.send_cmd('use %s' % TMP_DB)
impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 2)
assert impalad.wait_for_num_in_flight_queries(0), MSG % 'use'
p.send_cmd('create table %s(i int)' % TMP_TBL)
p.send_cmd('alter table %s add columns (j int)' % TMP_TBL)
impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 4)
assert impalad.wait_for_num_in_flight_queries(0), MSG % 'alter'
p.send_cmd('drop table %s' % TMP_TBL)
impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 5)
assert impalad.wait_for_num_in_flight_queries(0), MSG % 'drop'
finally:
run_impala_shell_interactive("drop table if exists %s.%s;" % (TMP_DB, TMP_TBL))
run_impala_shell_interactive("drop database if exists foo;")
@pytest.mark.execute_serially
def test_multiline_queries_in_history(self):
"""Test to ensure that multiline queries with comments are preserved in history
Ensure that multiline queries are preserved when they're read back from history.
Additionally, also test that comments are preserved.
"""
# regex for pexpect, a shell prompt is expected after each command..
prompt_regex = '.*%s:2100.*' % socket.getfqdn()
# readline gets its input from tty, so using stdin does not work.
child_proc = pexpect.spawn(SHELL_CMD)
queries = ["select\n1--comment;",
"select /*comment*/\n1;",
"select\n/*comm\nent*/\n1;"]
for query in queries:
child_proc.expect(prompt_regex)
child_proc.sendline(query)
child_proc.expect(prompt_regex)
child_proc.sendline('quit;')
p = ImpalaShell()
p.send_cmd('history')
result = p.get_result()
for query in queries:
assert query in result.stderr, "'%s' not in '%s'" % (query, result.stderr)
@pytest.mark.execute_serially
def test_rerun(self):
"""Smoke test for the 'rerun' command"""
# Clear history first.
if os.path.exists(SHELL_HISTORY_FILE):
os.remove(SHELL_HISTORY_FILE)
assert not os.path.exists(SHELL_HISTORY_FILE)
child_proc = pexpect.spawn(SHELL_CMD)
child_proc.expect(":21000] >")
self._expect_with_cmd(child_proc, "@1", ("Command index out of range"))
self._expect_with_cmd(child_proc, "rerun -1", ("Command index out of range"))
self._expect_with_cmd(child_proc, "select 'first_command'", ("first_command"))
self._expect_with_cmd(child_proc, "rerun 1", ("first_command"))
self._expect_with_cmd(child_proc, "@ -1", ("first_command"))
self._expect_with_cmd(child_proc, "select 'second_command'", ("second_command"))
child_proc.sendline('history;')
child_proc.expect(":21000] >")
assert '[1]: select \'first_command\';' in child_proc.before;
assert '[2]: select \'second_command\';' in child_proc.before;
assert '[3]: history;' in child_proc.before;
# Rerunning command should not add an entry into history.
assert '[4]' not in child_proc.before;
self._expect_with_cmd(child_proc, "@0", ("Command index out of range"))
self._expect_with_cmd(child_proc, "rerun 4", ("Command index out of range"))
self._expect_with_cmd(child_proc, "@-4", ("Command index out of range"))
self._expect_with_cmd(child_proc, " @ 3 ", ("second_command"))
self._expect_with_cmd(child_proc, "@-3", ("first_command"))
self._expect_with_cmd(child_proc, "@",
("Command index to be rerun must be an integer."))
self._expect_with_cmd(child_proc, "@1foo",
("Command index to be rerun must be an integer."))
self._expect_with_cmd(child_proc, "@1 2",
("Command index to be rerun must be an integer."))
self._expect_with_cmd(child_proc, "rerun1", ("Syntax error"))
child_proc.sendline('quit;')
@pytest.mark.execute_serially
def test_tip(self):
"""Smoke test for the TIP command"""
# Temporarily add impala_shell module to path to get at TIPS list for verification
sys.path.append("%s/shell/" % os.environ['IMPALA_HOME'])
try:
import impala_shell
finally:
sys.path = sys.path[:-1]
result = run_impala_shell_interactive("tip;")
for t in impala_shell.TIPS:
if t in result.stderr: return
assert False, "No tip found in output %s" % result.stderr
@pytest.mark.execute_serially
def test_var_substitution(self):
cmds = open(os.path.join(QUERY_FILE_PATH, 'test_var_substitution.sql')).read()
args = '''--var=foo=123 --var=BAR=456 --delimited "--output_delimiter= " '''
result = run_impala_shell_interactive(cmds, shell_args=args)
assert_var_substitution(result)
@pytest.mark.execute_serially
def test_query_option_configuration(self):
rcfile_path = os.path.join(QUERY_FILE_PATH, 'impalarc_with_query_options')
args = '-Q MT_dop=1 --query_option=MAX_ERRORS=200 --config_file="%s"' % rcfile_path
cmds = "set;"
result = run_impala_shell_interactive(cmds, shell_args=args)
assert "\tMT_DOP: 1" in result.stdout
assert "\tMAX_ERRORS: 200" in result.stdout
assert "\tEXPLAIN_LEVEL: 2" in result.stdout
assert "INVALID_QUERY_OPTION is not supported for the impalad being "
"connected to, ignoring." in result.stdout
@pytest.mark.execute_serially
def test_source_file(self):
cwd = os.getcwd()
try:
# Change working dir so that SOURCE command in shell.cmds can find shell2.cmds.
os.chdir("%s/tests/shell/" % os.environ['IMPALA_HOME'])
# IMPALA-5416: Test that a command following 'source' won't be run twice.
result = run_impala_shell_interactive("source shell.cmds;select \"second command\";")
assert "Query: use FUNCTIONAL" in result.stderr
assert "Query: show TABLES" in result.stderr
assert "alltypes" in result.stdout
# This is from shell2.cmds, the result of sourcing a file from a sourced file.
assert "select VERSION()" in result.stderr
assert "version()" in result.stdout
assert len(re.findall("'second command'", result.stdout)) == 1
# IMPALA-5416: Test that two source commands on a line won't crash the shell.
result = run_impala_shell_interactive("source shell.cmds;source shell.cmds;")
assert len(re.findall("version\(\)", result.stdout)) == 2
finally:
os.chdir(cwd)
@pytest.mark.execute_serially
def test_source_file_with_errors(self):
full_path = "%s/tests/shell/shell_error.cmds" % os.environ['IMPALA_HOME']
result = run_impala_shell_interactive("source %s;" % full_path)
assert "Could not execute command: use UNKNOWN_DATABASE" in result.stderr
assert "Query: use FUNCTIONAL" not in result.stderr
result = run_impala_shell_interactive("source %s;" % full_path, '-c')
assert "Could not execute command: use UNKNOWN_DATABASE" in result.stderr
assert "Query: use FUNCTIONAL" in result.stderr
assert "Query: show TABLES" in result.stderr
assert "alltypes" in result.stdout
@pytest.mark.execute_serially
def test_source_missing_file(self):
full_path = "%s/tests/shell/doesntexist.cmds" % os.environ['IMPALA_HOME']
result = run_impala_shell_interactive("source %s;" % full_path)
assert "No such file or directory" in result.stderr
@pytest.mark.execute_serially
def test_zero_row_fetch(self):
# IMPALA-4418: DROP and USE are generally exceptional statements where
# the client does not fetch. However, when preceded by a comment, the
# Impala shell treats them like any other statement and will try to
# fetch - receiving 0 rows. For statements returning 0 rows we do not
# want an empty line in stdout.
result = run_impala_shell_interactive("-- foo \n use default;")
assert "Fetched 0 row(s)" in result.stderr
assert re.search('> \[', result.stdout)
result = run_impala_shell_interactive("select * from functional.alltypes limit 0;")
assert "Fetched 0 row(s)" in result.stderr
assert re.search('> \[', result.stdout)
def run_impala_shell_interactive(input_lines, shell_args=None):
"""Runs a command in the Impala shell interactively."""
# if argument "input_lines" is a string, makes it into a list
if type(input_lines) is str:
input_lines = [input_lines]
# workaround to make Popen environment 'utf-8' compatible
# since piping defaults to ascii
my_env = os.environ
my_env['PYTHONIOENCODING'] = 'utf-8'
p = ImpalaShell(shell_args, env=my_env)
for line in input_lines:
p.send_cmd(line)
return p.get_result()