#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Impala's shell import cmd import errno import getpass import os import prettytable import random import re import shlex import signal import socket import sqlparse import subprocess import sys import textwrap import time from impala_client import (ImpalaClient, DisconnectedException, QueryStateException, RPCException, TApplicationException, QueryCancelledByShellException) from impala_shell_config_defaults import impala_shell_defaults from option_parser import get_option_parser, get_config_from_file from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter from shell_output import OverwritingStdErrOutputStream from subprocess import call from thrift.Thrift import TException VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s" VERSION_STRING = "build version not available" READLINE_UNAVAILABLE_ERROR = "The readline module was either not found or disabled. " \ "Command history will not be collected." # Tarball / packaging build makes impala_build_version available try: from impala_build_version import get_git_hash, get_build_date, get_version VERSION_STRING = VERSION_FORMAT % {'version': get_version(), 'git_hash': get_git_hash()[:7], 'build_date': get_build_date()} except Exception: pass class CmdStatus: """Values indicate the execution status of a command to the cmd shell driver module SUCCESS and ERROR continue running the shell and ABORT exits the shell Since SUCCESS == None, successful commands do not need to explicitly return anything on completion """ SUCCESS = None ABORT = True ERROR = False class ImpalaPrettyTable(prettytable.PrettyTable): """Patched version of PrettyTable with different unicode handling - instead of throwing exceptions when a character can't be converted to unicode, it is replaced with a placeholder character.""" def _unicode(self, value): if not isinstance(value, basestring): value = str(value) if not isinstance(value, unicode): # If a value cannot be encoded, replace it with a placeholder. value = unicode(value, self.encoding, "replace") return value class QueryOptionLevels: """These are the levels used when displaying query options. The values correspond to the ones in TQueryOptionLevel""" REGULAR = 0 ADVANCED = 1 DEVELOPMENT = 2 DEPRECATED = 3 REMOVED = 4 class QueryOptionDisplayModes: REGULAR_OPTIONS_ONLY = 1 ALL_OPTIONS = 2 class ImpalaShell(object, cmd.Cmd): """ Simple Impala Shell. Basic usage: type connect to connect to an impalad Then issue queries or other commands. Tab-completion should show the set of available commands. Methods that implement shell commands return a boolean tuple (stop, status) stop is a flag the command loop uses to continue/discontinue the prompt. Status tells the caller that the command completed successfully. """ # If not connected to an impalad, the server version is unknown. UNKNOWN_SERVER_VERSION = "Not Connected" PROMPT_FORMAT = "[{host}:{port}] {db}> " DISCONNECTED_PROMPT = "[Not connected] > " UNKNOWN_WEBSERVER = "0.0.0.0" # Message to display in shell when cancelling a query CANCELLATION_MESSAGE = ' Cancelling Query' # Number of times to attempt cancellation before giving up. CANCELLATION_TRIES = 3 # Commands are terminated with the following delimiter. CMD_DELIM = ';' # Valid variable name pattern VALID_VAR_NAME_PATTERN = r'[A-Za-z][A-Za-z0-9_]*' # Pattern for removal of comments preceding SET commands COMMENTS_BEFORE_SET_PATTERN = r'^(\s*/\*(.|\n)*?\*/|\s*--.*\n)*\s*((un)?set)' COMMENTS_BEFORE_SET_REPLACEMENT = r'\3' # Variable names are prefixed with the following string VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ] DEFAULT_DB = 'default' # Regex applied to all tokens of a query to detect DML statements. DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I) # Seperator for queries in the history file. HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_' VALID_SHELL_OPTIONS = { 'LIVE_PROGRESS' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_progress"), 'LIVE_SUMMARY' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_summary") } # Minimum time in seconds between two calls to get the exec summary. PROGRESS_UPDATE_INTERVAL = 1.0 def __init__(self, options, query_options): cmd.Cmd.__init__(self) self.is_alive = True self.impalad = None self.kerberos_host_fqdn = options.kerberos_host_fqdn self.use_kerberos = options.use_kerberos self.kerberos_service_name = options.kerberos_service_name self.use_ssl = options.ssl self.ca_cert = options.ca_cert self.user = options.user self.ldap_password = options.ldap_password self.ldap_password_cmd = options.ldap_password_cmd self.use_ldap = options.use_ldap self.verbose = options.verbose self.prompt = ImpalaShell.DISCONNECTED_PROMPT self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION self.webserver_address = ImpalaShell.UNKNOWN_WEBSERVER self.current_db = options.default_db self.history_file = os.path.expanduser("~/.impalahistory") # Stores the state of user input until a delimiter is seen. self.partial_cmd = str() # Stores the old prompt while the user input is incomplete. self.cached_prompt = str() self.show_profiles = options.show_profiles # Output formatting flags/options self.output_file = options.output_file self.output_delimiter = options.output_delimiter self.write_delimited = options.write_delimited self.print_header = options.print_header self.progress_stream = OverwritingStdErrOutputStream() self.set_query_options = query_options self.set_variables = options.variables self._populate_command_list() self.imp_client = None; self.orig_cmd = None # Tracks query handle of the last query executed. Used by the 'profile' command. self.last_query_handle = None; self.query_handle_closed = None self.print_summary = options.print_summary self.print_progress = options.print_progress self.ignore_query_failure = options.ignore_query_failure # Due to a readline bug in centos/rhel7, importing it causes control characters to be # printed. This breaks any scripting against the shell in non-interactive mode. Since # the non-interactive mode does not need readline - do not import it. if options.query or options.query_file: self.interactive = False self._disable_readline() else: self.interactive = True try: self.readline = __import__('readline') try: self.readline.set_history_length(int(options.history_max)) except ValueError: print_to_stderr("WARNING: history_max option malformed %s\n" % options.history_max) self.readline.set_history_length(1000) except ImportError: self._disable_readline() if options.impalad is not None: self.do_connect(options.impalad) # We handle Ctrl-C ourselves, using an Event object to signal cancellation # requests between the handler and the main shell thread. signal.signal(signal.SIGINT, self._signal_handler) def _populate_command_list(self): """Populate a list of commands in the shell. Each command has its own method of the form do_, and can be extracted by introspecting the class directory. """ # Slice the command method name to get the name of the command. self.commands = [cmd[3:] for cmd in dir(self.__class__) if cmd.startswith('do_')] def _disable_readline(self): """Disables the readline module. The readline module is responsible for keeping track of command history. """ self.readline = None def _print_options(self, print_mode): """Prints the current query options with default values distinguished from set values by brackets [], followed by shell-local options. The options are displayed in groups based on option levels received in parameter. Input parameter decides whether all groups or just the 'Regular' and 'Advanced' options are displayed.""" print "Query options (defaults shown in []):" if not self.imp_client.default_query_options and not self.set_query_options: print '\tNo options available.' else: (regular_options, advanced_options, development_options, deprecated_options, removed_options) = self._get_query_option_grouping() self._print_option_group(regular_options) # If the shell is connected to an Impala that predates IMPALA-2181 then # the advanced_options would be empty and only the regular options would # be displayed. if advanced_options: print '\nAdvanced Query Options:' self._print_option_group(advanced_options) if print_mode == QueryOptionDisplayModes.ALL_OPTIONS: if development_options: print '\nDevelopment Query Options:' self._print_option_group(development_options) if deprecated_options: print '\nDeprecated Query Options:' self._print_option_group(deprecated_options) self._print_shell_options() def _get_query_option_grouping(self): """For all the query options received through rpc this function determines the query option level for display purposes using the received query_option_levels parameters. If the option level can't be determined then it defaults to 'REGULAR'""" (regular_options, advanced_options, development_options, deprecated_options, removed_options) = {}, {}, {}, {}, {} for option_name, option_value in self.imp_client.default_query_options.iteritems(): level = self.imp_client.query_option_levels.get(option_name, QueryOptionLevels.REGULAR) if level == QueryOptionLevels.REGULAR: regular_options[option_name] = option_value elif level == QueryOptionLevels.DEVELOPMENT: development_options[option_name] = option_value elif level == QueryOptionLevels.DEPRECATED: deprecated_options[option_name] = option_value elif level == QueryOptionLevels.REMOVED: removed_options[option_name] = option_value else: advanced_options[option_name] = option_value return (regular_options, advanced_options, development_options, deprecated_options, removed_options) def _print_option_group(self, query_options): """Gets query options and prints them. Value is inside [] for the ones having default values. query_options parameter is a subset of the default_query_options map""" for option_name in sorted(query_options): if (option_name in self.set_query_options and self.set_query_options[option_name] != query_options[option_name]): print '\n'.join(["\t%s: %s" % (option_name, self.set_query_options[option_name])]) else: print '\n'.join(["\t%s: [%s]" % (option_name, query_options[option_name])]) def _print_variables(self): # Prints the currently defined variables. if not self.set_variables: print '\tNo variables defined.' else: for k in sorted(self.set_variables): print '\n'.join(["\t%s: %s" % (k, self.set_variables[k])]) def _print_shell_options(self): """Prints shell options, which are local and independent of query options.""" print "\nShell Options" for x in self.VALID_SHELL_OPTIONS: print "\t%s: %s" % (x, self.__dict__[self.VALID_SHELL_OPTIONS[x][1]]) def _create_beeswax_query(self, args): """Original command should be stored before running the method. The method is usually used in do_* methods and the command is kept at precmd().""" command = self.orig_cmd self.orig_cmd = None if not command: print_to_stderr("Unexpected error: Failed to execute query due to command "\ "is missing") sys.exit(1) return self.imp_client.create_beeswax_query("%s %s" % (command, args), self.set_query_options) def do_shell(self, args): """Run a command on the shell Usage: shell ! """ try: start_time = time.time() os.system(args) self._print_if_verbose("--------\nExecuted in %2.2fs" % (time.time() - start_time)) except Exception, e: print_to_stderr('Error running command : %s' % e) return CmdStatus.ERROR def _remove_comments_before_set(self, line): """SET commands preceded by a comment become a SET query, which are not processed locally. SET VAR:* commands must be processed locally, since they are not known to the frontend. Thus, we remove comments that precede SET commands to enforce the local processing.""" regexp = re.compile(ImpalaShell.COMMENTS_BEFORE_SET_PATTERN, re.IGNORECASE) return regexp.sub(ImpalaShell.COMMENTS_BEFORE_SET_REPLACEMENT, line, 1) def sanitise_input(self, args): # A command terminated by a semi-colon is legal. Check for the trailing # semi-colons and strip them from the end of the command. if not self.interactive: # Strip all the non-interactive commands of the delimiter. args = self._remove_comments_before_set(args) tokens = args.strip().split(' ') return ' '.join(tokens).rstrip(ImpalaShell.CMD_DELIM) # Handle EOF if input is interactive tokens = args.strip().split(' ') if tokens[0].lower() == 'eof': if not self.partial_cmd: # The first token is the command. # If it's EOF, call do_quit() return 'quit' else: # If a command is in progress and the user hits a Ctrl-D, clear its state # and reset the prompt. self.prompt = self.cached_prompt self.partial_cmd = str() # The print statement makes the new prompt appear in a new line. # Also print an extra newline to indicate that the current command has # been cancelled. print '\n' return str() args = self._check_for_command_completion(args) args = self._remove_comments_before_set(args) tokens = args.strip().split(' ') args = ' '.join(tokens).strip() return args.rstrip(ImpalaShell.CMD_DELIM) def _shlex_split(self, line): """Reimplement shlex.split() so that escaped single quotes are actually escaped. shlex.split() only escapes double quotes by default. This method will throw a ValueError if an open quotation (either single or double) is found. """ my_split = shlex.shlex(line, posix=True) my_split.escapedquotes = '"\'' my_split.whitespace_split = True my_split.commenters = '' return list(my_split) def _cmd_ends_with_delim(self, line): """Check if the input command ends with a command delimiter. A command ending with the delimiter and containing an open quotation character is not considered terminated. If no open quotation is found, it's considered terminated. """ # Strip any comments to make a statement such as the following be considered as # ending with a delimiter: # select 1 + 1; -- this is a comment line = sqlparse.format(line, strip_comments=True).rstrip() if line.endswith(ImpalaShell.CMD_DELIM): try: # Look for an open quotation in the entire command, and not just the # current line. if self.partial_cmd: line = '%s %s' % (self.partial_cmd, line) self._shlex_split(line) return True # If the command ends with a delimiter, check if it has an open quotation. # shlex in self._split() throws a ValueError iff an open quotation is found. # A quotation can either be a single quote or a double quote. except ValueError: pass # This checks to see if there are any backslashed quotes # outside of quotes, since backslashed quotes # outside of single or double quotes should not be escaped. # Ex. 'abc\'xyz' -> closed because \' is escaped # \'abcxyz -> open because \' is not escaped # \'abcxyz' -> closed # Iterate through the line and switch the state if a single or double quote is found # and ignore escaped single and double quotes if the line is considered open (meaning # a previous single or double quote has not been closed yet) state_closed = True; opener = None; for i, char in enumerate(line): if state_closed and (char in ['\'', '\"']): state_closed = False opener = char elif not state_closed and opener == char: if line[i - 1] != '\\': state_closed = True opener = None; return state_closed return False def _check_for_command_completion(self, cmd): """Check for a delimiter at the end of user input. The end of the user input is scanned for a legal delimiter. If a delimiter is not found: - Input is not send to onecmd() - onecmd() is a method in Cmd which routes the user input to the appropriate method. An empty string results in a no-op. - Input is removed from history. - Input is appended to partial_cmd If a delimiter is found: - The contents of partial_cmd are put in history, as they represent a completed command. - The contents are passed to the appropriate method for execution. - partial_cmd is reset to an empty string. """ if self.readline: current_history_len = self.readline.get_current_history_length() # Input is incomplete, store the contents and do nothing. if not self._cmd_ends_with_delim(cmd): # The user input is incomplete, change the prompt to reflect this. if not self.partial_cmd and cmd: self.cached_prompt = self.prompt self.prompt = '> '.rjust(len(self.cached_prompt)) # partial_cmd is already populated, add the current input after a newline. if self.partial_cmd and cmd: self.partial_cmd = "%s\n%s" % (self.partial_cmd, cmd) else: # If the input string is empty or partial_cmd is empty. self.partial_cmd = "%s%s" % (self.partial_cmd, cmd) # Remove the most recent item from history if: # -- The current state of user input in incomplete. # -- The most recent user input is not an empty string if self.readline and current_history_len > 0 and cmd: self.readline.remove_history_item(current_history_len - 1) # An empty string results in a no-op. Look at emptyline() return str() elif self.partial_cmd: # input ends with a delimiter and partial_cmd is not empty if cmd != ImpalaShell.CMD_DELIM: completed_cmd = "%s\n%s" % (self.partial_cmd, cmd) else: completed_cmd = "%s%s" % (self.partial_cmd, cmd) # Reset partial_cmd to an empty string self.partial_cmd = str() # Replace the most recent history item with the completed command. completed_cmd = sqlparse.format(completed_cmd) if self.readline and current_history_len > 0: self.readline.replace_history_item(current_history_len - 1, completed_cmd.encode('utf-8')) # Revert the prompt to its earlier state self.prompt = self.cached_prompt else: # Input has a delimiter and partial_cmd is empty completed_cmd = sqlparse.format(cmd) return completed_cmd def _new_impala_client(self): return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos, self.kerberos_service_name, self.use_ssl, self.ca_cert, self.user, self.ldap_password, self.use_ldap) def _signal_handler(self, signal, frame): """Handles query cancellation on a Ctrl+C event""" if self.last_query_handle is None or self.query_handle_closed: return # Create a new connection to the impalad and cancel the query. for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES): try: self.imp_client.is_query_cancelled = True self.query_handle_closed = True print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE) new_imp_client = self._new_impala_client() new_imp_client.connect() new_imp_client.cancel_query(self.last_query_handle, False) self.imp_client.close_query(self.last_query_handle) break except Exception, e: # Suppress harmless errors. err_msg = str(e).strip() if err_msg in ['ERROR: Cancelled', 'ERROR: Invalid or unknown query handle']: break print_to_stderr("Failed to reconnect and close (try %i/%i): %s" % ( cancel_try + 1, ImpalaShell.CANCELLATION_TRIES, err_msg)) def _replace_variables(self, query): """Replaces variable within the query text with their corresponding values""" errors = False matches = set(map(lambda v: v.upper(), re.findall(r'(? 1: # The last command needs a delimiter to be successfully executed. parsed_cmds[-1] += ImpalaShell.CMD_DELIM self.cmdqueue.extend(parsed_cmds) # If cmdqueue is populated, then commands are executed from the cmdqueue, and user # input is ignored. Send an empty string as the user input just to be safe. return str() try: self.imp_client.test_connection() except TException: print_to_stderr("Connection lost, reconnecting...") self._connect() self._validate_database(immediately=True) return args.encode('utf-8') def onecmd(self, line): """Overridden to ensure the variable replacement is processed in interactive as well as non-interactive mode, since the precmd method would only work for the interactive case, when cmdloop is called. """ # Replace variables in the statement before it's executed line = self._replace_variables(line) # Cmd is an old-style class, hence we need to call the method directly # instead of using super() # TODO: This may have to be changed to a super() call once we move to Python 3 if line == None: return CmdStatus.ERROR else: # This code is based on the code from the standard Python library package cmd.py: # https://github.com/python/cpython/blob/master/Lib/cmd.py#L192 # One change is lowering command before getting a function. The lowering # is necessary to find a proper function and here is a right place # because the lowering command in front of the finding can avoid a # side effect. command, arg, line = self.parseline(line) if not line: return self.emptyline() if command is None: return self.default(line) self.lastcmd = line if line == 'EOF' : self.lastcmd = '' if command == '': return self.default(line) else: try: func = getattr(self, 'do_' + command.lower()) self.orig_cmd = command except AttributeError: return self.default(line) return func(arg) def postcmd(self, status, args): # status conveys to shell how the shell should continue execution # should always be a CmdStatus return status def do_summary(self, args): summary = None try: summary = self.imp_client.get_summary(self.last_query_handle) except RPCException, e: import re error_pattern = re.compile("ERROR: Query id \d+:\d+ not found.") if error_pattern.match(e.value): print_to_stderr("Could not retrieve summary for query.") else: print_to_stderr(e) return CmdStatus.ERROR if summary.nodes is None: print_to_stderr("Summary not available") return CmdStatus.SUCCESS output = [] table = self._default_summary_table() self.imp_client.build_summary_table(summary, 0, False, 0, False, output) formatter = PrettyOutputFormatter(table) self.output_stream = OutputStream(formatter, filename=self.output_file) self.output_stream.write(output) def _handle_shell_options(self, token, value): try: handle = self.VALID_SHELL_OPTIONS[token] self.__dict__[handle[1]] = handle[0](value) return True except KeyError: return False def _get_var_name(self, name): """Look for a namespace:var_name pattern in an option name. Return the variable name if it's a match or None otherwise. """ ns_match = re.match(r'^([^:]*):(.*)', name) if ns_match is not None: ns = ns_match.group(1) var_name = ns_match.group(2) if ns in ImpalaShell.VAR_PREFIXES: return var_name return None def _print_with_set(self, print_level): self._print_options(print_level) print "\nVariables:" self._print_variables() def do_set(self, args): """Set or display query options. Display query options: Usage: SET (to display the Regular options) or SET ALL (to display all the options) Set query options: Usage: SET