# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Impala's shell from __future__ import absolute_import, print_function, unicode_literals import cmd import errno import getpass import logging import os import random import re import shlex import signal import socket import subprocess from subprocess import call import sys import textwrap import time import traceback import json from six.moves import http_client import prettytable import sqlparse from impala_shell.compatibility import _xrange as xrange from impala_shell.impala_client import ( ImpalaBeeswaxClient, ImpalaHS2Client, log_exception_with_timestamp, log_timestamp, QueryOptionLevels, StrictHS2Client, ) from impala_shell.impala_shell_config_defaults import impala_shell_defaults from impala_shell.option_parser import get_config_from_file, get_option_parser from impala_shell.shell_exceptions import ( DisconnectedException, MissingThriftMethodException, QueryCancelledByShellException, QueryStateException, RPCException, ) from impala_shell.shell_output import ( DelimitedOutputFormatter, match_string_type, OutputStream, OverwritingStdErrOutputStream, PrettyOutputFormatter, VerticalOutputFormatter, ) from impala_shell.value_converter import HS2ValueConverter VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s" VERSION_STRING = "impala shell build version not available" READLINE_UNAVAILABLE_ERROR = "The readline module was either not found or disabled. " \ "Command history will not be collected." # Tarball / packaging build makes impala_build_version available # TODO: There's no reason for this to fail when everything is built around pip installs, # so this could be simplified. try: from impala_shell.impala_build_version import get_build_date, get_git_hash, get_version VERSION_STRING = VERSION_FORMAT % {'version': get_version(), 'git_hash': get_git_hash()[:7], 'build_date': get_build_date()} except Exception: pass DEFAULT_BEESWAX_PORT = 21000 DEFAULT_HS2_PORT = 21050 DEFAULT_HS2_HTTP_PORT = 28000 DEFAULT_STRICT_HS2_PORT = 11050 DEFAULT_STRICT_HS2_HTTP_PORT = 10001 def strip_comments(sql): """sqlparse default implementation of strip comments has a bad performance when parsing very large SQL due to the grouping. This is because the default implementation tries to format the SQL for pretty-printing. Impala shell use of strip comments is mostly for checking and not for altering the actual SQL, so having a pretty-formatted SQL is irrelevant in Impala shell. Removing the grouping gives a significant performance boost. """ stack = sqlparse.engine.FilterStack() stack.stmtprocess.append(sqlparse.filters.StripCommentsFilter()) stack.postprocess.append(sqlparse.filters.SerializerUnicode()) return ''.join(stack.run(sql, 'utf-8')).strip() class CmdStatus: """Values indicate the execution status of a command to the cmd shell driver module SUCCESS and ERROR continue running the shell and ABORT exits the shell Since SUCCESS == None, successful commands do not need to explicitly return anything on completion """ SUCCESS = None ABORT = True ERROR = False class FatalShellException(Exception): """Thrown if a fatal error occurs that requires terminating the shell. The cause of the error should be logged to stderr before raising this exception.""" pass class QueryOptionDisplayModes: REGULAR_OPTIONS_ONLY = 1 ALL_OPTIONS = 2 class QueryAttemptDisplayModes: """The display mode when runtime profiles or summaries are printed to the console. If the query has not been retried, then the display mode does not change anything. The format is always the same. If the query has been retried, then the ALL option will print both the original and retried profiles/summaries. If the LATEST option is specified, then only the retried profile/summary will be printed. If the ORIGINAL option is specified, then only the original profile/summary will be printed.""" ALL = "all" LATEST = "latest" ORIGINAL = "original" # Py3 method resolution order requires 'object' to be last w/ multiple inheritance class ImpalaShell(cmd.Cmd, object): """ Simple Impala Shell. Implements the context manager interface to ensure client connections and sessions are cleanly torn down. ImpalaShell instances should be used within a "with" statement to ensure correct teardown. Basic usage: type connect to connect to an impalad Then issue queries or other commands. Tab-completion should show the set of available commands. Methods that implement shell commands return a boolean tuple (stop, status) stop is a flag the command loop uses to continue/discontinue the prompt. Status tells the caller that the command completed successfully. """ # NOTE: These variables are centrally defined for reuse, but they are also # used directly in several tests to verify shell behavior (e.g. to # verify that the shell remained connected or the shell connected # successfully). # If not connected to an impalad, the server version is unknown. UNKNOWN_SERVER_VERSION = "Not Connected" PROMPT_FORMAT = "[{host}:{port}] {db}> " DISCONNECTED_PROMPT = "[Not connected] > " # Message to display when the connection failed and it is reconnecting. CONNECTION_LOST_MESSAGE = 'Connection lost, reconnecting...' # Message to display when there is an exception when connecting. ERROR_CONNECTING_MESSAGE = "Error connecting " # Message to display when there is a socket error. SOCKET_ERROR_MESSAGE = "Socket error" # Message to display upon successful connection to an Impalad CONNECTED_TO_MESSAGE = "Connected to" # Message to display in shell when cancelling a query CANCELLATION_MESSAGE = ' Cancelling Query' # Number of times to attempt cancellation before giving up. CANCELLATION_TRIES = 3 # Commands are terminated with the following delimiter. CMD_DELIM = ';' # Valid variable name pattern VALID_VAR_NAME_PATTERN = r'[A-Za-z][A-Za-z0-9_]*' # Pattern for removal of comments preceding SET commands COMMENTS_BEFORE_SET_PATTERN = r'^(\s*/\*(.|\n)*?\*/|\s*--.*\n)*\s*((un)?set)' COMMENTS_BEFORE_SET_REPLACEMENT = r'\3' # Variable names are prefixed with the following string VAR_PREFIXES = ['VAR', 'HIVEVAR'] DEFAULT_DB = 'default' # Regex applied to all tokens of a query to detect DML statements. DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I) # Seperator for queries in the history file. HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_' # Strings that are interpreted as True for some shell options. TRUE_STRINGS = ("true", "TRUE", "True", "1") # List of quit commands QUIT_COMMANDS = ("quit", "exit") VALID_SHELL_OPTIONS = { 'LIVE_PROGRESS': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_progress"), 'LIVE_SUMMARY': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_summary"), 'WRITE_DELIMITED': (lambda x: x in ImpalaShell.TRUE_STRINGS, "write_delimited"), 'VERBOSE': (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"), 'DELIMITER': (lambda x: " " if x == '\\s' else x, "output_delimiter"), 'OUTPUT_FILE': (lambda x: None if x == '' else x, "output_file"), 'VERTICAL': (lambda x: x in ImpalaShell.TRUE_STRINGS, "vertical"), } # Minimum time in seconds between two calls to get the exec summary. PROGRESS_UPDATE_INTERVAL = 1.0 # Environment variable used to source a global config file GLOBAL_CONFIG_FILE = "IMPALA_SHELL_GLOBAL_CONFIG_FILE" def __init__(self, options, query_options): cmd.Cmd.__init__(self) self.is_alive = True self.impalad = None self.kerberos_host_fqdn = options.kerberos_host_fqdn self.use_kerberos = options.use_kerberos self.kerberos_service_name = options.kerberos_service_name self.use_ssl = options.ssl self.ca_cert = options.ca_cert self.user = options.user self.ldap_password_cmd = options.ldap_password_cmd self.jwt_cmd = options.jwt_cmd self.oauth_cmd = options.oauth_cmd self.strict_hs2_protocol = options.strict_hs2_protocol self.ldap_password = options.ldap_password self.use_jwt = options.use_jwt self.jwt = options.jwt self.use_oauth = options.use_oauth self.oauth = options.oauth self.oauth_server = options.oauth_server self.oauth_client_id = options.oauth_client_id self.oauth_client_secret_cmd = options.oauth_client_secret_cmd self.oauth_client_secret = options.oauth_client_secret self.oauth_endpoint = options.oauth_endpoint self.oauth_mock_response_cmd = options.oauth_mock_response_cmd self.oauth_mock_response = options.oauth_mock_response # When running tests in strict mode, the server uses the ldap # protocol but can allow any password. if options.use_ldap_test_password: self.ldap_password = 'password' self.use_ldap = options.use_ldap or \ (self.strict_hs2_protocol and not self.use_kerberos and not self.use_jwt and not self.use_oauth) self.client_connect_timeout_ms = options.client_connect_timeout_ms self.http_socket_timeout_s = None if (options.http_socket_timeout_s != 'None' and options.http_socket_timeout_s is not None): self.http_socket_timeout_s = float(options.http_socket_timeout_s) self.connect_max_tries = options.connect_max_tries self.verbose = options.verbose self.prompt = ImpalaShell.DISCONNECTED_PROMPT self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION self.webserver_address = None self.current_db = options.default_db self.history_file = os.path.expanduser(options.history_file) # Stores the state of user input until a delimiter is seen. self.partial_cmd = str() # Stores the old prompt while the user input is incomplete. self.cached_prompt = str() self.show_profiles = options.show_profiles self.rpc_stdout = options.rpc_stdout self.rpc_file = options.rpc_file # Output formatting flags/options self.output_file = options.output_file self.output_delimiter = " " if options.output_delimiter == "\\s" \ else options.output_delimiter self.write_delimited = options.write_delimited self.print_header = options.print_header self.vertical = options.vertical self.progress_stream = OverwritingStdErrOutputStream() self.set_query_options = query_options self.set_variables = options.variables self._populate_command_list() self.imp_client = None # Used to pass the original unmodified command into do_*() methods. self.orig_cmd = None # Tracks query handle of the last query executed. Used by the 'profile' command. self.last_query_handle = None # Whether to print num rows report at the end of each query like beeswax, or not. self.beeswax_compat_num_rows = options.beeswax_compat_num_rows # live_summary and live_progress are turned off in strict_hs2_protocol mode if options.strict_hs2_protocol: if options.live_summary: warning = "WARNING: Unable to track live summary with strict_hs2_protocol" print(warning, file=sys.stderr) if options.live_progress: warning = "WARNING: Unable to track live progress with strict_hs2_protocol" print(warning, file=sys.stderr) # do not allow live_progress or live_summary to be changed. self.VALID_SHELL_OPTIONS['LIVE_PROGRESS'] = (lambda x: x in (), "live_progress") self.VALID_SHELL_OPTIONS['LIVE_SUMMARY'] = (lambda x: x in (), "live_summary") self.live_summary = options.live_summary and not options.strict_hs2_protocol self.live_progress = options.live_progress and not options.strict_hs2_protocol self.ignore_query_failure = options.ignore_query_failure self.http_path = options.http_path self.fetch_size = options.fetch_size self.http_cookie_names = options.http_cookie_names self.http_tracing = not options.no_http_tracing self.hs2_x_forward = options.hs2_x_forward # Due to a readline bug in centos/rhel7, importing it causes control characters to be # printed. This breaks any scripting against the shell in non-interactive mode. Since # the non-interactive mode does not need readline - do not import it. if options.query or options.query_file: self.interactive = False self._disable_readline() else: self.interactive = True try: self.readline = __import__('readline') try: self.readline.set_history_length(int(options.history_max)) # The history file is created when the Impala shell is invoked and commands are # issued. In case it does not exist do not read the history file. if os.path.exists(self.history_file): self.readline.read_history_file(self.history_file) self._replace_history_delimiters(ImpalaShell.HISTORY_FILE_QUERY_DELIM, '\n') except ValueError: warning = "WARNING: history_max option malformed %s\n" % options.history_max print(warning, file=sys.stderr) self.readline.set_history_length(1000) except IOError as i: warning = "WARNING: Unable to load command history (disabling impala-shell " \ "command history): %s" % i print(warning, file=sys.stderr) # This history file exists but is not readable, disable readline. self._disable_readline() except ImportError as i: warning = "WARNING: Unable to import readline module (disabling impala-shell " \ "command history): %s" % i print(warning, file=sys.stderr) self._disable_readline() if options.impalad is not None: self.do_connect(options.impalad) # Check if the database in shell option exists self._validate_database(immediately=True) # We handle Ctrl-C ourselves, using an Event object to signal cancellation # requests between the handler and the main shell thread. signal.signal(signal.SIGINT, self._signal_handler) # For debugging, it is useful to be able to get stacktraces from a running shell. # When using Python 3, this hooks up Python 3's faulthandler to handle SIGUSR1. # It will print stacktraces for all threads when receiving SIGUSR1. if sys.version_info.major > 2: import faulthandler faulthandler.register(signal.SIGUSR1) def __enter__(self): return self def __exit__(self, type, value, traceback): self.close_connection() def _populate_command_list(self): """Populate a list of commands in the shell. Each command has its own method of the form do_, and can be extracted by introspecting the class directory. """ # Slice the command method name to get the name of the command. self.commands = [cmd[3:] for cmd in dir(self.__class__) if cmd.startswith('do_')] def _disable_readline(self): """Disables the readline module. The readline module is responsible for keeping track of command history. """ self.readline = None def _print_options(self, print_mode): """Prints the current query options with default values distinguished from set values by brackets [], followed by shell-local options. The options are displayed in groups based on option levels received in parameter. Input parameter decides whether all groups or just the 'Regular' and 'Advanced' options are displayed.""" print("Query options (defaults shown in []):") if not self.imp_client.default_query_options and not self.set_query_options: print('\tNo options available.') else: (regular_options, advanced_options, development_options, deprecated_options, removed_options) = self._get_query_option_grouping() self._print_option_group(regular_options) # If the shell is connected to an Impala that predates IMPALA-2181 then # the advanced_options would be empty and only the regular options would # be displayed. if advanced_options: print('\nAdvanced Query Options:') self._print_option_group(advanced_options) if print_mode == QueryOptionDisplayModes.ALL_OPTIONS: if development_options: print('\nDevelopment Query Options:') self._print_option_group(development_options) if deprecated_options: print('\nDeprecated Query Options:') self._print_option_group(deprecated_options) self._print_shell_options() def _get_query_option_grouping(self): """For all the query options received through rpc this function determines the query option level for display purposes using the received query_option_levels parameters. If the option level can't be determined then it defaults to 'REGULAR'""" (regular_options, advanced_options, development_options, deprecated_options, removed_options) = {}, {}, {}, {}, {} if sys.version_info.major < 3: client_default_query_opts = self.imp_client.default_query_options.iteritems() else: client_default_query_opts = self.imp_client.default_query_options.items() for option_name, option_value in client_default_query_opts: level = self.imp_client.query_option_levels.get(option_name, QueryOptionLevels.REGULAR) if level == QueryOptionLevels.REGULAR: regular_options[option_name] = option_value elif level == QueryOptionLevels.DEVELOPMENT: development_options[option_name] = option_value elif level == QueryOptionLevels.DEPRECATED: deprecated_options[option_name] = option_value elif level == QueryOptionLevels.REMOVED: removed_options[option_name] = option_value else: advanced_options[option_name] = option_value return (regular_options, advanced_options, development_options, deprecated_options, removed_options) def _print_option_group(self, query_options): """Gets query options and prints them. Value is inside [] for the ones having default values. query_options parameter is a subset of the default_query_options map""" for option in sorted(query_options): if (option in self.set_query_options and self.set_query_options[option] != query_options[option]): # noqa print('\n'.join(["\t%s: %s" % (option, self.set_query_options[option])])) else: print('\n'.join(["\t%s: [%s]" % (option, query_options[option])])) def _print_variables(self): # Prints the currently defined variables. if not self.set_variables: print('\tNo variables defined.') else: for k in sorted(self.set_variables): print('\n'.join(["\t%s: %s" % (k, self.set_variables[k])])) def _print_shell_options(self): """Prints shell options, which are local and independent of query options.""" print("\nShell Options") for x in self.VALID_SHELL_OPTIONS: print("\t%s: %s" % (x, self.__dict__[self.VALID_SHELL_OPTIONS[x][1]])) def _build_query_string(self, leading_comment, cmd, args): """Called to build a query string based on the parts output by parseline(): the leading comment, the command name and the arguments to the command.""" # In order to deduce the correct cmd, parseline stripped the leading comment. # To preserve the original query, the leading comment (if exists) will be # prepended when constructing the query sent to the Impala front-end. return "{0}{1} {2}".format(leading_comment or '', cmd or '', args) def do_shell(self, args): """Run a command on the shell Usage: shell ! """ try: start_time = time.time() os.system(args) self._print_if_verbose("--------\nExecuted in %2.2fs" % (time.time() - start_time)) except Exception as e: print('Error running command : %s' % e, file=sys.stderr) return CmdStatus.ERROR def _remove_comments_before_set(self, line): """SET commands preceded by a comment become a SET query, which are not processed locally. SET VAR:* commands must be processed locally, since they are not known to the frontend. Thus, we remove comments that precede SET commands to enforce the local processing.""" regexp = re.compile(ImpalaShell.COMMENTS_BEFORE_SET_PATTERN, re.IGNORECASE) return regexp.sub(ImpalaShell.COMMENTS_BEFORE_SET_REPLACEMENT, line, 1) def sanitise_input(self, args): # A command terminated by a semi-colon is legal. Check for the trailing # semi-colons and strip them from the end of the command. if not self.interactive: # Strip all the non-interactive commands of the delimiter. args = self._remove_comments_before_set(args) tokens = args.strip().split(' ') return ' '.join(tokens).rstrip(ImpalaShell.CMD_DELIM) # Handle EOF if input is interactive tokens = args.strip().split(' ') if tokens[0].lower() == 'eof': if not self.partial_cmd: # The first token is the command. # If it's EOF, call do_quit() return 'quit' else: # If a command is in progress and the user hits a Ctrl-D, clear its state # and reset the prompt. self.prompt = self.cached_prompt self.partial_cmd = str() # The print statement makes the new prompt appear in a new line. # Also print an extra newline to indicate that the current command has # been cancelled. print('\n') return str() args = self._check_for_command_completion(args) args = self._remove_comments_before_set(args) tokens = args.strip().split(' ') args = ' '.join(tokens).strip() return args.rstrip(ImpalaShell.CMD_DELIM) def _shlex_split(self, line): """Reimplement shlex.split() so that escaped single quotes are actually escaped. shlex.split() only escapes double quotes by default. This method will throw a ValueError if an open quotation (either single or double) is found. """ my_split = shlex.shlex(line, posix=True) my_split.escapedquotes = '"\'' my_split.whitespace_split = True my_split.commenters = '' return list(my_split) def _cmd_ends_with_delim(self, line): """Check if the input command ends with a command delimiter. A command ending with the delimiter and containing an open quotation character is not considered terminated. If no open quotation is found, it's considered terminated. """ # Strip any comments to make a statement such as the following be considered as # ending with a delimiter: # select 1 + 1; -- this is a comment line = strip_comments(line).rstrip() if line.endswith(ImpalaShell.CMD_DELIM): try: # Look for an open quotation in the entire command, and not just the # current line. if self.partial_cmd: line = strip_comments('%s %s' % (self.partial_cmd, line)) self._shlex_split(line) return True # If the command ends with a delimiter, check if it has an open quotation. # shlex in self._split() throws a ValueError iff an open quotation is found. # A quotation can either be a single quote or a double quote. except ValueError: pass # This checks to see if there are any backslashed quotes # outside of quotes, since backslashed quotes # outside of single or double quotes should not be escaped. # Ex. 'abc\'xyz' -> closed because \' is escaped # \'abcxyz -> open because \' is not escaped # \'abcxyz' -> closed # Iterate through the line and switch the state if a single or double quote is found # and ignore escaped single and double quotes if the line is considered open (meaning # a previous single or double quote has not been closed yet) state_closed = True opener = None for i, char in enumerate(line): if state_closed and (char in ['\'', '\"']): state_closed = False opener = char elif not state_closed and opener == char: if line[i - 1] != '\\': state_closed = True opener = None return state_closed return False def _check_for_command_completion(self, cmd): """Check for a delimiter at the end of user input. The end of the user input is scanned for a legal delimiter. If a delimiter is not found: - Input is not send to onecmd() - onecmd() is a method in Cmd which routes the user input to the appropriate method. An empty string results in a no-op. - Input is removed from history. - Input is appended to partial_cmd If a delimiter is found: - The contents of partial_cmd are put in history, as they represent a completed command. - The contents are passed to the appropriate method for execution. - partial_cmd is reset to an empty string. Returns text type result (unicode in Python2, str in Python3) """ if self.readline: current_history_len = self.readline.get_current_history_length() # Input is incomplete, store the contents and do nothing. if not self._cmd_ends_with_delim(cmd): # The user input is incomplete, change the prompt to reflect this. if not self.partial_cmd and cmd: self.cached_prompt = self.prompt self.prompt = '> '.rjust(len(self.cached_prompt)) # partial_cmd is already populated, add the current input after a newline. if self.partial_cmd and cmd: self.partial_cmd = "{0}\n{1}".format(self.partial_cmd, cmd) else: # If the input string is empty or partial_cmd is empty. self.partial_cmd = "{0}{1}".format(self.partial_cmd, cmd) # Remove the most recent item from history if: # -- The current state of user input in incomplete. # -- The most recent user input is not an empty string if self.readline and current_history_len > 0 and cmd: self.readline.remove_history_item(current_history_len - 1) # An empty string results in a no-op. Look at emptyline() return str() elif self.partial_cmd: # input ends with a delimiter and partial_cmd is not empty if cmd != ImpalaShell.CMD_DELIM: completed_cmd = "{0}\n{1}".format(self.partial_cmd, cmd) else: completed_cmd = "{0}{1}".format(self.partial_cmd, cmd) # Reset partial_cmd to an empty string self.partial_cmd = str() # Replace the most recent history item with the completed command. completed_cmd = sqlparse.format(completed_cmd) if self.readline and current_history_len > 0: # readline.replace_history_item(pos, line) requires 'line' in bytes in Python2, # and str in Python3. if sys.version_info.major == 2: history_cmd = completed_cmd.encode('utf-8') else: history_cmd = completed_cmd self.readline.replace_history_item(current_history_len - 1, history_cmd) # Revert the prompt to its earlier state self.prompt = self.cached_prompt else: # Input has a delimiter and partial_cmd is empty completed_cmd = sqlparse.format(cmd) return completed_cmd def _new_impala_client(self): protocol = options.protocol.lower() value_converter = None if protocol == 'hs2' or protocol == 'hs2-http': value_converter = HS2ValueConverter() if options.hs2_fp_format: value_converter.override_floating_point_converter(options.hs2_fp_format) if options.strict_hs2_protocol: assert protocol == 'hs2' or protocol == 'hs2-http' if protocol == 'hs2': return StrictHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, True, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=False, http_path=self.http_path, http_cookie_names=None, value_converter=value_converter, rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file, http_tracing=self.http_tracing) elif protocol == 'hs2-http': return StrictHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=True, http_path=self.http_path, http_cookie_names=self.http_cookie_names, value_converter=value_converter, rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file, http_tracing=self.http_tracing, jwt=self.jwt, oauth=self.oauth, hs2_x_forward=self.hs2_x_forward) if protocol == 'hs2': return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=False, http_path=self.http_path, http_cookie_names=None, value_converter=value_converter, rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file, http_tracing=self.http_tracing) elif protocol == 'hs2-http': return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap, self.client_connect_timeout_ms, self.verbose, use_http_base_transport=True, http_path=self.http_path, http_cookie_names=self.http_cookie_names, http_socket_timeout_s=self.http_socket_timeout_s, value_converter=value_converter, connect_max_tries=self.connect_max_tries, rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file, http_tracing=self.http_tracing, jwt=self.jwt, oauth=self.oauth, hs2_x_forward=self.hs2_x_forward) elif protocol == 'beeswax': return ImpalaBeeswaxClient(self.impalad, self.fetch_size, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap, self.client_connect_timeout_ms, self.verbose) else: err_msg = "Invalid --protocol value {0}, must be beeswax, hs2 or hs2-http." print(err_msg.format(protocol), file=sys.stderr) raise FatalShellException() def close_connection(self): """Closes the current Impala connection.""" if self.imp_client: self.imp_client.close_connection() def _signal_handler(self, signal, frame): """Handles query cancellation on a Ctrl+C event""" if self.last_query_handle is None or self.last_query_handle.is_closed: if self.partial_cmd: # Revert the prompt to its earlier state self.prompt = self.cached_prompt # Reset the already given commands self.partial_cmd = str() raise KeyboardInterrupt() # Create a new connection to the impalad and cancel the query. # TODO: this isn't thread-safe with respect to the main thread executing the # query. This probably contributes to glitchiness when cancelling query in # the shell. for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES): try: self.imp_client.is_query_cancelled = True os.write(sys.stderr.fileno(), ImpalaShell.CANCELLATION_MESSAGE.encode('utf-8')) new_imp_client = self._new_impala_client() new_imp_client.connect() try: new_imp_client.cancel_query(self.last_query_handle) new_imp_client.close_query(self.last_query_handle) finally: new_imp_client.close_connection() break except Exception as e: # Suppress harmless errors. err_msg = str(e).strip() # Check twice so that it can work with both the old and the new error formats. if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle'] or \ ('\nCancelled' in err_msg or '\nInvalid or unknown query handle' in err_msg): break err_details = "Failed to reconnect and close (try {}/{}): {}".format( cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg) os.write(sys.stderr.fileno(), err_details.encode('utf-8')) def _is_quit_command(self, command): # Do a case insensitive check return command.lower() in ImpalaShell.QUIT_COMMANDS def set_prompt(self, db): self.prompt = ImpalaShell.PROMPT_FORMAT.format( host=self.impalad[0], port=self.impalad[1], db=db) def precmd(self, args): # In Python2, 'args' could in str type if it's the original input line, or in unicode # type if it's a split query appended by us. See how we deal with 'parsed_cmds' below. if sys.version_info.major == 2 and isinstance(args, str): args = self.sanitise_input(args.decode('utf-8')) # python2 else: args = self.sanitise_input(args) # python3 if not args: return args # Split args using sqlparse. If there are multiple queries present in user input, # the length of the returned query list will be greater than one. parsed_cmds = sqlparse.split(args) if len(parsed_cmds) > 1: # The last command needs a delimiter to be successfully executed. parsed_cmds[-1] += ImpalaShell.CMD_DELIM self.cmdqueue.extend(parsed_cmds) # If cmdqueue is populated, then commands are executed from the cmdqueue, and user # input is ignored. Send an empty string as the user input just to be safe. return str() # There is no need to reconnect if we are quitting. if not self.imp_client.is_connected() and not self._is_quit_command(args): print(ImpalaShell.CONNECTION_LOST_MESSAGE, file=sys.stderr) self._connect() self._validate_database(immediately=True) return args def onecmd(self, line): """Overridden to ensure the variable replacement is processed in interactive as well as non-interactive mode, since the precmd method would only work for the interactive case, when cmdloop is called. """ # Replace variables in the statement before it's executed line = replace_variables(self.set_variables, line) # Cmd is an old-style class, hence we need to call the method directly # instead of using super() # TODO: This may have to be changed to a super() call once we move to Python 3 if line is None: return CmdStatus.ERROR else: # This code is based on the code from the standard Python library package cmd.py: # https://github.com/python/cpython/blob/master/Lib/cmd.py#L192 # One change is lowering command before getting a function. The lowering # is necessary to find a proper function and here is a right place # because the lowering command in front of the finding can avoid a # side effect. command, arg, line, leading_comment = self.parseline(line) if not line: return self.emptyline() # orig_cmd and last_leading_comment are passed into do_*() functions # via this side mechanism because the cmd module limits us to passing # in the argument list only. self.orig_cmd = command self.last_leading_comment = leading_comment self.lastcmd = line if not command: return self.default(line) elif line == 'EOF': self.lastcmd = '' else: try: func = getattr(self, 'do_' + command.lower()) except AttributeError: return self.default(line) return func(arg) def postcmd(self, status, args): # status conveys to shell how the shell should continue execution # should always be a CmdStatus return status def do_summary(self, args): split_args = args.split() if len(split_args) > 1: print("'summary' only accepts 0 or 1 arguments", file=sys.stderr) return CmdStatus.ERROR if not self.last_query_handle: print("Could not retrieve summary: no previous query.", file=sys.stderr) return CmdStatus.ERROR display_mode = QueryAttemptDisplayModes.LATEST if len(split_args) == 1: display_mode = self.get_query_attempt_display_mode(split_args[0]) if display_mode is None: return CmdStatus.ERROR try: summary, failed_summary = self.imp_client.get_summary(self.last_query_handle) except RPCException as e: error_pattern = re.compile("Query id [a-f0-9]+:[a-f0-9]+ not found.") if error_pattern.match(e.value): print("Could not retrieve summary for query.", file=sys.stderr) else: print(e, file=sys.stderr) return CmdStatus.ERROR if summary.nodes is None: print("Summary not available", file=sys.stderr) return CmdStatus.SUCCESS if display_mode == QueryAttemptDisplayModes.ALL: print("Query Summary:") self.print_exec_summary(summary) if failed_summary: print("Failed Query Summary:") self.print_exec_summary(failed_summary) elif display_mode == QueryAttemptDisplayModes.LATEST: self.print_exec_summary(summary) elif display_mode == QueryAttemptDisplayModes.ORIGINAL: if failed_summary: self.print_exec_summary(failed_summary) else: print("No failed summary found") else: raise FatalShellException("Invalid value for query summary display mode") @staticmethod def get_query_attempt_display_mode(arg_mode): arg_mode = str(arg_mode).lower() if arg_mode not in [QueryAttemptDisplayModes.ALL, QueryAttemptDisplayModes.LATEST, QueryAttemptDisplayModes.ORIGINAL]: print("Invalid value for query attempt display mode: \'" + arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]") return None return arg_mode def print_exec_summary(self, summary): output = [] table = self._default_summary_table() self.imp_client.build_summary_table(summary, output) formatter = PrettyOutputFormatter(table) self.output_stream = OutputStream(formatter, filename=self.output_file) self.output_stream.write(output) def _handle_shell_options(self, token, value): try: handle = self.VALID_SHELL_OPTIONS[token] self.__dict__[handle[1]] = handle[0](value) return True except KeyError: return False def _handle_unset_shell_options(self, token): try: handle = self.VALID_SHELL_OPTIONS[token] self.__dict__[handle[1]] = impala_shell_defaults[handle[1]] return True except KeyError: return False def _print_with_set(self, print_level): self._print_options(print_level) print("\nVariables:") self._print_variables() def do_set(self, args): """Set or display query options. Display query options: Usage: SET (to display the Regular options) or SET ALL (to display all the options) Set query options: Usage: SET