IMPALA-5792: Eliminate duplicate beeswax python code

This patch unify duplicated exec summary code used by python beeswax
clients: one used by the shell in impala_shell.py and one used by tests
in impala_beeswax.py. The code that has progress furthest is the one in
shell/impala_client.py, which is the one that can print correct exec
summary table for MT_DOP>0 queries. It is made into a dedicated
build_exec_summary_table function in impala_client.py, and then
impala_beeswax.py import it from impala_client.py.

This patch also fix several flake8 issues around the modified files.

Testing:
- Manually run TPC-DS Q74 in impala-shell and then type "summary"
  command. Confirm that plan tree is displayed properly.
- Run single_node_perf_run.py over branches that produce different
  TPC-DS Q74 plan tree. Confirm that the plan tree are displayed
  correctly in performance_result.txt

Change-Id: Ica57c90dd571d9ac74d76d9830da26c7fe20c74f
Reviewed-on: http://gerrit.cloudera.org:8080/22060
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
This commit is contained in:
Riza Suminto
2024-11-12 13:12:55 -08:00
parent fafcd60061
commit 1f35747ea3
3 changed files with 219 additions and 288 deletions

View File

@@ -98,9 +98,10 @@ def utf8_encode_if_needed(val):
val = val.encode('utf-8', errors='replace')
return val
# Regular expression that matches the progress line added to HS2 logs by
# the Impala server.
HS2_LOG_PROGRESS_REGEX = re.compile("Query.*Complete \([0-9]* out of [0-9]*\)\n")
HS2_LOG_PROGRESS_REGEX = re.compile(r"Query.*Complete \([0-9]* out of [0-9]*\)\n")
# Exception types to differentiate between the different RPCExceptions.
# RPCException raised when TApplicationException is caught.
@@ -108,6 +109,158 @@ RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
# RPCException raised when impala server sends a TStatusCode.ERROR_STATUS status code.
RPC_EXCEPTION_SERVER = "SERVER_ERROR"
def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output,
is_prettyprint=True, separate_prefix_column=False):
"""Direct translation of Coordinator::PrintExecSummary() to recursively build a list
of rows of summary statistics, one per exec node
summary: the TExecSummary object that contains all the summary data
idx: the index of the node to print
indent_level: the number of spaces to print before writing the node's label, to give
the appearance of a tree. The 0th child of a node has the same indent_level as its
parent. All other children have an indent_level of one greater than their parent.
new_indent_level: If true, this indent level is different from the previous row's.
output: the list of rows into which to append the rows produced for this node and its
children.
is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty
printed format.
separate_prefix_column: Optional. If True, the prefix and operator name will be
returned as separate column. Otherwise, prefix and operater name will be concatenated
into single column.
Returns the index of the next exec node in summary.exec_nodes that should be
processed, used internally to this method only.
"""
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)
node = summary.nodes[idx]
if node.exec_stats is not None:
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
if node.exec_stats is not None and node.exec_stats:
avg_time = agg_stats.latency_ns / len(node.exec_stats)
else:
avg_time = 0
is_sink = node.node_id == -1
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
# is the max over all instances (which should all have received the same number of
# rows). Otherwise, the cardinality is the sum over all instances which process
# disjoint partitions.
if is_sink:
cardinality = -1
elif node.is_broadcast:
cardinality = max_stats.cardinality
else:
cardinality = agg_stats.cardinality
est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
label_prefix += " |" * (indent_level - 1)
if new_indent_level:
label_prefix += "--"
else:
label_prefix += " "
def prettyprint(val, units, divisor):
for unit in units:
if val < divisor:
if unit == units[0]:
return "%d%s" % (val, unit)
else:
return "%3.2f%s" % (val, unit)
val /= divisor
def prettyprint_bytes(byte_val):
return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
def prettyprint_units(unit_val):
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
def prettyprint_time(time_val):
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
instances = 0
if node.exec_stats is not None:
instances = len(node.exec_stats)
latency = max_stats.latency_ns
cardinality_est = est_stats.cardinality
memory_used = max_stats.memory_used
memory_est = est_stats.memory_used
if (is_prettyprint):
avg_time = prettyprint_time(avg_time)
latency = prettyprint_time(latency)
cardinality = "" if is_sink else prettyprint_units(cardinality)
cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
memory_used = prettyprint_bytes(memory_used)
memory_est = prettyprint_bytes(memory_est)
row = list()
if separate_prefix_column:
row.append(label_prefix)
row.append(node.label)
else:
row.append(label_prefix + node.label)
row.extend([
node.num_hosts,
instances,
avg_time,
latency,
cardinality,
cardinality_est,
memory_used,
memory_est,
node.label_detail])
output.append(row)
try:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node or a join node with a separate builder, so the source
# is a fragment root, and should be printed next.
sender_indent_level = indent_level + node.num_children
sender_new_indent_level = node.num_children > 0
build_exec_summary_table(summary, sender_idx, sender_indent_level,
sender_new_indent_level, output, is_prettyprint,
separate_prefix_column)
except (KeyError, TypeError):
# Fall through if idx not in map, or if exch_to_sender_map itself is not set
pass
idx += 1
if node.num_children > 0:
first_child_output = []
idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output,
is_prettyprint, separate_prefix_column)
for child_idx in xrange(1, node.num_children):
# All other children are indented (we only have 0, 1 or 2 children for every exec
# node at the moment)
idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output,
is_prettyprint, separate_prefix_column)
output += first_child_output
return idx
class QueryOptionLevels:
"""These are the levels used when displaying query options.
The values correspond to the ones in TQueryOptionLevel"""
@@ -196,7 +349,7 @@ class ImpalaClient(object):
try:
self._open_session()
return self._ping_impala_service()
except:
except Exception:
# Ensure we are in a disconnected state if we failed above.
self.close_connection()
raise
@@ -525,130 +678,9 @@ class ImpalaClient(object):
sock.setTimeout(None)
return transport
def build_summary_table(self, summary, idx, is_fragment_root, indent_level,
new_indent_level, output):
"""Direct translation of Coordinator::PrintExecSummary() to recursively build a list
of rows of summary statistics, one per exec node
summary: the TExecSummary object that contains all the summary data
idx: the index of the node to print
is_fragment_root: true if the node to print is the root of a fragment (and therefore
feeds into an exchange)
indent_level: the number of spaces to print before writing the node's label, to give
the appearance of a tree. The 0th child of a node has the same indent_level as its
parent. All other children have an indent_level of one greater than their parent.
output: the list of rows into which to append the rows produced for this node and its
children.
Returns the index of the next exec node in summary.exec_nodes that should be
processed, used internally to this method only.
NOTE: This is duplicated in impala_beeswax.py, and changes made here should also be
made there. TODO: refactor into a shared library. (IMPALA-5792)
"""
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)
node = summary.nodes[idx]
if node.exec_stats is not None:
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
if node.exec_stats is not None and node.exec_stats:
avg_time = agg_stats.latency_ns / len(node.exec_stats)
else:
avg_time = 0
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
# is the max over all instances (which should all have received the same number of
# rows). Otherwise, the cardinality is the sum over all instances which process
# disjoint partitions.
if node.is_broadcast:
cardinality = max_stats.cardinality
else:
cardinality = agg_stats.cardinality
est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
label_prefix += " |" * (indent_level - 1)
if new_indent_level:
label_prefix += "--"
else:
label_prefix += " "
def prettyprint(val, units, divisor):
for unit in units:
if val < divisor:
if unit == units[0]:
return "%d%s" % (val, unit)
else:
return "%3.2f%s" % (val, unit)
val /= divisor
def prettyprint_bytes(byte_val):
return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
def prettyprint_units(unit_val):
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
def prettyprint_time(time_val):
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
instances = 0
if node.exec_stats is not None:
instances = len(node.exec_stats)
is_sink = node.node_id == -1
row = [ label_prefix + node.label,
node.num_hosts, instances,
prettyprint_time(avg_time),
prettyprint_time(max_stats.latency_ns),
"" if is_sink else prettyprint_units(cardinality),
"" if is_sink else prettyprint_units(est_stats.cardinality),
prettyprint_bytes(max_stats.memory_used),
prettyprint_bytes(est_stats.memory_used),
node.label_detail ]
output.append(row)
try:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node or a join node with a separate builder, so the source
# is a fragment root, and should be printed next.
sender_indent_level = indent_level + node.num_children
sender_new_indent_level = node.num_children > 0
self.build_summary_table(
summary, sender_idx, True, sender_indent_level, sender_new_indent_level, output)
except (KeyError, TypeError):
# Fall through if idx not in map, or if exch_to_sender_map itself is not set
pass
idx += 1
if node.num_children > 0:
first_child_output = []
idx = \
self.build_summary_table(
summary, idx, False, indent_level, False, first_child_output)
for child_idx in xrange(1, node.num_children):
# All other children are indented (we only have 0, 1 or 2 children for every exec
# node at the moment)
idx = self.build_summary_table(
summary, idx, False, indent_level + 1, True, output)
output += first_child_output
return idx
def build_summary_table(self, summary, output):
build_exec_summary_table(summary, 0, 0, False, output, is_prettyprint=True,
separate_prefix_column=False)
def _get_sleep_interval(self, start_time):
"""Returns a step function of time to sleep in seconds before polling
@@ -728,8 +760,8 @@ class ImpalaHS2Client(ImpalaClient):
username=self.user)
resp = self._do_hs2_rpc(OpenSession, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
assert (resp.serverProtocolVersion ==
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), resp.serverProtocolVersion
assert (resp.serverProtocolVersion
== TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), resp.serverProtocolVersion
# TODO: ensure it's closed if needed
self.session_handle = resp.sessionHandle
@@ -755,7 +787,6 @@ class ImpalaHS2Client(ImpalaClient):
return headers
def close_connection(self):
if self.session_handle is not None:
# Attempt to close session explicitly. Do not fail if there is an error
@@ -799,8 +830,8 @@ class ImpalaHS2Client(ImpalaClient):
QueryStateException):
raise
except RPCException as r:
if (r.exception_type == RPC_EXCEPTION_TAPPLICATION or
r.exception_type == RPC_EXCEPTION_SERVER):
if (r.exception_type == RPC_EXCEPTION_TAPPLICATION
or r.exception_type == RPC_EXCEPTION_SERVER):
raise
log_exception_with_timestamp(r, "Exception",
"type={0} when listing query options. {1}".format(type(r), retry_msg))
@@ -1465,7 +1496,7 @@ class ImpalaBeeswaxClient(ImpalaClient):
dml_result, rpc_status = self._do_beeswax_rpc(
lambda: self.imp_service.CloseInsert(last_query_handle))
if rpc_status != RpcStatus.OK:
raise RPCException()
raise RPCException()
last_query_handle.is_closed = True
return self._process_dml_result(dml_result)

View File

@@ -74,6 +74,7 @@ DEFAULT_HS2_HTTP_PORT = 28000
DEFAULT_STRICT_HS2_PORT = 11050
DEFAULT_STRICT_HS2_HTTP_PORT = 10001
def strip_comments(sql):
"""sqlparse default implementation of strip comments has a bad performance when parsing
very large SQL due to the grouping. This is because the default implementation tries to
@@ -166,7 +167,7 @@ class ImpalaShell(cmd.Cmd, object):
COMMENTS_BEFORE_SET_PATTERN = r'^(\s*/\*(.|\n)*?\*/|\s*--.*\n)*\s*((un)?set)'
COMMENTS_BEFORE_SET_REPLACEMENT = r'\3'
# Variable names are prefixed with the following string
VAR_PREFIXES = [ 'VAR', 'HIVEVAR' ]
VAR_PREFIXES = ['VAR', 'HIVEVAR']
DEFAULT_DB = 'default'
# Regex applied to all tokens of a query to detect DML statements.
DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
@@ -180,10 +181,10 @@ class ImpalaShell(cmd.Cmd, object):
VALID_SHELL_OPTIONS = {
'LIVE_PROGRESS': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_progress"),
'LIVE_SUMMARY': (lambda x: x in ImpalaShell.TRUE_STRINGS, "live_summary"),
'WRITE_DELIMITED' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "write_delimited"),
'VERBOSE' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
'DELIMITER' : (lambda x: " " if x == '\\s' else x, "output_delimiter"),
'OUTPUT_FILE' : (lambda x: None if x == '' else x, "output_file"),
'WRITE_DELIMITED': (lambda x: x in ImpalaShell.TRUE_STRINGS, "write_delimited"),
'VERBOSE': (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
'DELIMITER': (lambda x: " " if x == '\\s' else x, "output_delimiter"),
'OUTPUT_FILE': (lambda x: None if x == '' else x, "output_file"),
'VERTICAL': (lambda x: x in ImpalaShell.TRUE_STRINGS, "vertical"),
}
@@ -217,8 +218,8 @@ class ImpalaShell(cmd.Cmd, object):
(self.strict_hs2_protocol and not self.use_kerberos and not self.use_jwt)
self.client_connect_timeout_ms = options.client_connect_timeout_ms
self.http_socket_timeout_s = None
if (options.http_socket_timeout_s != 'None' and
options.http_socket_timeout_s is not None):
if (options.http_socket_timeout_s != 'None'
and options.http_socket_timeout_s is not None):
self.http_socket_timeout_s = float(options.http_socket_timeout_s)
self.connect_max_tries = options.connect_max_tries
self.verbose = options.verbose
@@ -415,8 +416,8 @@ class ImpalaShell(cmd.Cmd, object):
default values.
query_options parameter is a subset of the default_query_options map"""
for option in sorted(query_options):
if (option in self.set_query_options and
self.set_query_options[option] != query_options[option]): # noqa
if (option in self.set_query_options
and self.set_query_options[option] != query_options[option]): # noqa
print('\n'.join(["\t%s: %s" % (option, self.set_query_options[option])]))
else:
print('\n'.join(["\t%s: [%s]" % (option, query_options[option])]))
@@ -765,7 +766,7 @@ class ImpalaShell(cmd.Cmd, object):
# Cmd is an old-style class, hence we need to call the method directly
# instead of using super()
# TODO: This may have to be changed to a super() call once we move to Python 3
if line == None:
if line is None:
return CmdStatus.ERROR
else:
# This code is based on the code from the standard Python library package cmd.py:
@@ -848,15 +849,15 @@ class ImpalaShell(cmd.Cmd, object):
arg_mode = str(arg_mode).lower()
if arg_mode not in [QueryAttemptDisplayModes.ALL,
QueryAttemptDisplayModes.LATEST, QueryAttemptDisplayModes.ORIGINAL]:
print("Invalid value for query attempt display mode: \'" +
arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
print("Invalid value for query attempt display mode: \'"
+ arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
return None
return arg_mode
def print_exec_summary(self, summary):
output = []
table = self._default_summary_table()
self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
self.imp_client.build_summary_table(summary, output)
formatter = PrettyOutputFormatter(table)
self.output_stream = OutputStream(formatter, filename=self.output_file)
self.output_stream.write(output)
@@ -1113,7 +1114,7 @@ class ImpalaShell(cmd.Cmd, object):
self.ldap_password.endswith('\n'):
print("Warning: LDAP password contains a trailing newline. "
"Did you use 'echo' instead of 'echo -n'?", file=sys.stderr)
if self.use_ssl and sys.version_info < (2,7,9) \
if self.use_ssl and sys.version_info < (2, 7, 9) \
and "EOF occurred in violation of protocol" in str(e):
print("Warning: TLSv1.2 is not supported for Python < 2.7.9", file=sys.stderr)
log_exception_with_timestamp(e, "Exception",
@@ -1190,7 +1191,7 @@ class ImpalaShell(cmd.Cmd, object):
arg = arg.replace('\n', '')
# Get the database and table name, using the current database if the table name
# wasn't fully qualified.
db_name, tbl_name = self.current_db, arg
db_name = self.current_db
if db_name is None:
db_name = ImpalaShell.DEFAULT_DB
db_table_name = arg.split('.')
@@ -1341,7 +1342,7 @@ class ImpalaShell(cmd.Cmd, object):
if self.live_summary:
table = self._default_summary_table()
output = []
self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
self.imp_client.build_summary_table(summary, output)
formatter = PrettyOutputFormatter(table)
data += formatter.format(output) + "\n"
@@ -1408,8 +1409,8 @@ class ImpalaShell(cmd.Cmd, object):
"Query state can be monitored at: %s" % self.imp_client.get_query_link(
self.imp_client.get_query_id_str(self.last_query_handle)))
wait_to_finish = self.imp_client.wait_to_finish(self.last_query_handle,
self._periodic_wait_callback)
self.imp_client.wait_to_finish(
self.last_query_handle, self._periodic_wait_callback)
# Reset the progress stream.
self.progress_stream.clear()
@@ -1517,7 +1518,6 @@ class ImpalaShell(cmd.Cmd, object):
self.prompt = ImpalaShell.DISCONNECTED_PROMPT
return CmdStatus.ERROR
def construct_table_with_header(self, column_names):
""" Constructs the table header for a given query handle.
@@ -1822,7 +1822,7 @@ class ImpalaShell(cmd.Cmd, object):
"""
cmd_names = [cmd for cmd in self.commands if cmd.startswith(text.lower())]
# If the user input is upper case, return commands in upper case.
if text.isupper(): return [cmd_names.upper() for cmd_names in cmd_names]
if text.isupper(): return [cmd_name.upper() for cmd_name in cmd_names]
# If the user input is lower case or mixed case, return lower case commands.
return cmd_names
@@ -1873,7 +1873,7 @@ HEADER_DIVIDER =\
def _format_tip(tip):
"""Takes a tip string and splits it on word boundaries so that it fits neatly inside the
shell header."""
return '\n'.join([l for l in textwrap.wrap(tip, len(HEADER_DIVIDER))])
return '\n'.join([line for line in textwrap.wrap(tip, len(HEADER_DIVIDER))])
WELCOME_STRING = """\
@@ -1916,8 +1916,8 @@ def parse_variables(keyvals):
for keyval in keyvals:
match = re.match(kv_pattern, keyval)
if not match:
print('Error: Could not parse key-value "%s". ' % (keyval,) +
'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
print('Error: Could not parse key-value "%s". ' % (keyval,)
+ 'It must follow the pattern "KEY=VALUE".', file=sys.stderr)
parser.print_help()
raise FatalShellException()
else:
@@ -1943,8 +1943,8 @@ def replace_variables(set_variables, input_string):
# Check if syntax is correct
var_name = get_var_name(name)
if var_name is None:
print('Error: Unknown substitution syntax (%s). ' % (name,) +
'Use ${VAR:var_name}.', file=sys.stderr)
print('Error: Unknown substitution syntax (%s). ' % (name,)
+ 'Use ${VAR:var_name}.', file=sys.stderr)
errors = True
else:
# Replaces variable value
@@ -1998,8 +1998,8 @@ def execute_queries_non_interactive_mode(options, query_options):
queries = parse_query_text(query_text)
with ImpalaShell(options, query_options) as shell:
return (shell.execute_query_list(shell.cmdqueue) and
shell.execute_query_list(queries))
return (shell.execute_query_list(shell.cmdqueue)
and shell.execute_query_list(queries))
def get_intro(options):
@@ -2062,7 +2062,6 @@ def read_password_cmd(password_cmd, auth_method_desc, strip_newline=False):
raise FatalShellException()
def impala_shell_main():
"""
There are two types of options: shell options and query_options. Both can be set on the
@@ -2165,14 +2164,14 @@ def impala_shell_main():
raise FatalShellException()
if not options.ssl and not options.creds_ok_in_clear and options.use_ldap:
print("LDAP credentials may not be sent over insecure " +
"connections. Enable SSL or set --auth_creds_ok_in_clear",
print(("LDAP credentials may not be sent over insecure "
"connections. Enable SSL or set --auth_creds_ok_in_clear"),
file=sys.stderr)
raise FatalShellException()
if not options.use_ldap and options.ldap_password_cmd:
print("Option --ldap_password_cmd requires using LDAP authentication " +
"mechanism (-l)", file=sys.stderr)
print(("Option --ldap_password_cmd requires using LDAP authentication "
"mechanism (-l)"), file=sys.stderr)
raise FatalShellException()
if options.use_jwt and options.protocol.lower() != 'hs2-http':
@@ -2273,8 +2272,8 @@ def impala_shell_main():
raise FatalShellException()
if options.http_socket_timeout_s is not None:
if (options.http_socket_timeout_s != 'None' and
float(options.http_socket_timeout_s) < 0):
if (options.http_socket_timeout_s != 'None'
and float(options.http_socket_timeout_s) < 0):
print("http_socket_timeout_s must be a nonnegative floating point number"
" expressing seconds, or None", file=sys.stderr)
raise FatalShellException()

View File

@@ -26,7 +26,7 @@
# result = client.execute(query_string)
# where result is an object of the class ImpalaBeeswaxResult.
from __future__ import absolute_import, division, print_function
from builtins import filter, map, range
from builtins import filter, map
import logging
import time
import shlex
@@ -36,11 +36,6 @@ import re
from beeswaxd import BeeswaxService
from beeswaxd.BeeswaxService import QueryState
from datetime import datetime
try:
# If Exec Summary is not implemented in Impala, this cannot be imported
from ExecStats.ttypes import TExecStats
except ImportError:
pass
from ImpalaService import ImpalaService
from tests.util.thrift_util import create_transport
from thrift.transport.TTransport import TTransportException
@@ -49,11 +44,13 @@ from thrift.Thrift import TApplicationException
LOG = logging.getLogger('impala_beeswax')
# Custom exception wrapper.
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
# TODO: Add the ability to print some of the stack.
class ImpalaBeeswaxException(Exception):
__name__ = "ImpalaBeeswaxException"
def __init__(self, message, inner_exception):
self.__message = message
self.inner_exception = inner_exception
@@ -61,6 +58,7 @@ class ImpalaBeeswaxException(Exception):
def __str__(self):
return self.__message
class ImpalaBeeswaxResult(object):
def __init__(self, **kwargs):
self.query = kwargs.get('query', None)
@@ -99,10 +97,10 @@ class ImpalaBeeswaxResult(object):
'Took: %s(s)\n'
'Data:\n%s\n'
% (self.summary, self.success, self.time_taken,
self.__format_data())
)
self.__format_data()))
return message
# Interface to beeswax. Responsible for executing queries, fetching results.
class ImpalaBeeswaxClient(object):
# Regex applied to all tokens of a query to detect the query type.
@@ -243,121 +241,24 @@ class ImpalaBeeswaxClient(object):
if summary is None or summary.nodes is None:
return None
# If exec summary is not implemented in Impala, this function returns, so we do not
# get the function __build_summary_table which requires TExecStats to be imported.
# get the function build_exec_summary_table which requires TExecStats to be
# imported.
output = []
self.__build_summary_table(summary, 0, False, 0, False, output)
output = list()
self.__build_summary_table(summary, output)
return output
def __build_summary_table(self, summary, idx, is_fragment_root, indent_level,
new_indent_level, output):
"""NOTE: This was taken from impala_shell.py. Changes made here must be made there as
well. TODO: This method will be a placed in a library that is shared between
impala_shell and this file. (IMPALA-5792)
Direct translation of Coordinator::PrintExecSummary() to recursively build a list
of rows of summary statistics, one per exec node
summary: the TExecSummary object that contains all the summary data
idx: the index of the node to print
is_fragment_root: true if the node to print is the root of a fragment (and therefore
feeds into an exchange)
indent_level: the number of spaces to print before writing the node's label, to give
the appearance of a tree. The 0th child of a node has the same indent_level as its
parent. All other children have an indent_level of one greater than their parent.
new_indent_level: If true, this indent level is different from the previous row's.
output: the list of rows into which to append the rows produced for this node and its
children.
Returns the index of the next exec node in summary.exec_nodes that should be
processed, used internally to this method only.
"""
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)
row = {}
node = summary.nodes[idx]
# exec_stats may not be set even if the query is FINISHED if there are fragments that
# are still executing or that were cancelled before sending a status report.
if node.exec_stats is not None:
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
if len(node.exec_stats) > 0:
avg_time = agg_stats.latency_ns // len(node.exec_stats)
else:
avg_time = 0
row["num_instances"] = len(node.exec_stats)
row["num_hosts"] = node.num_hosts
row["avg_time"] = avg_time
is_sink = node.node_id == -1
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
# is the max over all instances (which should all have received the same number of
# rows). Otherwise, the cardinality is the sum over all instances which process
# disjoint partitions.
if is_sink:
cardinality = -1
elif node.is_broadcast:
cardinality = max_stats.cardinality
else:
cardinality = agg_stats.cardinality
est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
label_prefix += " |" * (indent_level - 1)
if new_indent_level:
label_prefix += "--"
else:
label_prefix += " "
row["prefix"] = label_prefix
row["operator"] = node.label
row["max_time"] = max_stats.latency_ns
row["num_rows"] = cardinality
row["est_num_rows"] = est_stats.cardinality
row["peak_mem"] = max_stats.memory_used
row["est_peak_mem"] = est_stats.memory_used
row["detail"] = node.label_detail
output.append(row)
if summary.exch_to_sender_map is not None and idx in summary.exch_to_sender_map:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node, so the sender is a fragment root, and should be printed
# next.
self.__build_summary_table(summary, sender_idx, True, indent_level, False, output)
idx += 1
if node.num_children > 0:
first_child_output = []
idx = \
self.__build_summary_table(
summary, idx, False, indent_level, False, first_child_output)
for child_idx in range(1, node.num_children):
# All other children are indented (we only have 0, 1 or 2 children for every exec
# node at the moment)
idx = self.__build_summary_table(
summary, idx, False, indent_level + 1, True, output)
output += first_child_output
return idx
def __build_summary_table(self, summary, output):
from shell.impala_client import build_exec_summary_table
result = list()
build_exec_summary_table(summary, 0, 0, False, result, is_prettyprint=False,
separate_prefix_column=True)
keys = ['prefix', 'operator', 'num_hosts', 'num_instances', 'avg_time', 'max_time',
'num_rows', 'est_num_rows', 'peak_mem', 'est_peak_mem', 'detail']
for row in result:
assert len(keys) == len(row)
summ_map = dict(zip(keys, row))
output.append(summ_map)
def get_runtime_profile(self, handle):
return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle))
@@ -468,7 +369,7 @@ class ImpalaBeeswaxClient(object):
def get_log(self, query_handle):
return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
def fetch_results(self, query_string, query_handle, max_rows = -1):
def fetch_results(self, query_string, query_handle, max_rows=-1):
"""Fetches query results given a handle and query type (insert, use, other)"""
query_type = self.__get_query_type(query_string)
if query_type == 'use':
@@ -486,7 +387,7 @@ class ImpalaBeeswaxClient(object):
exec_result.query = query_string
return exec_result
def __fetch_results(self, handle, max_rows = -1):
def __fetch_results(self, handle, max_rows=-1):
"""Handles query results, returns a ImpalaBeeswaxResult object"""
schema = self.__do_rpc(lambda: self.imp_service.get_results_metadata(handle)).schema
# The query has finished, we can fetch the results