mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Recently, we switched bin/run-workload.py to use HS2. It turns out that the HS2 client code is not producing the same data structure for the exec summary. report_benchmark_results.py relies on that data structure and fails for HS2. This changes the HS2 client code to use the same representation as the beeswax. There is already a function that does this conversion (build_summary_table_from_thrift) for our regular tests, so this reuses that function. Testing: - Ran bin/run-workload.py twice to produce json files and processed them with report_benchmark_results.py. This failed before the change and passed afterward. Change-Id: I0a041bdebe748b6b3a05b552584e0ca2327cff67 Reviewed-on: http://gerrit.cloudera.org:8080/23597 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
325 lines
10 KiB
Python
325 lines
10 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from impala.dbapi import connect
|
|
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
|
|
from tests.common.impala_connection import build_summary_table_from_thrift
|
|
from sys import maxsize
|
|
from tests.performance.query import HiveQueryResult, ImpalaQueryResult
|
|
from tests.util.shell_util import exec_process
|
|
from time import time
|
|
import threading
|
|
|
|
DEFAULT_BEESWAX_PORT = 21000
|
|
DEFAULT_HS2_PORT = 21050
|
|
DEFAULT_HIVE_HS2_PORT = 10000
|
|
|
|
LOG = logging.getLogger('query_exec_functions')
|
|
|
|
|
|
def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False,
|
|
database=None, execOptions=None):
|
|
host, port = hiveserver, DEFAULT_HIVE_HS2_PORT
|
|
cursor = None
|
|
try:
|
|
conn = connect(host=host,
|
|
port=DEFAULT_HIVE_HS2_PORT,
|
|
user=user,
|
|
database=database,
|
|
auth_mechanism="GSSAPI" if use_kerberos else "PLAIN",
|
|
timeout=maxsize)
|
|
|
|
cursor = conn.cursor(configuration=execOptions)
|
|
LOG.info("Connected to {0}:{1}".format(host, port))
|
|
except Exception as e:
|
|
LOG.error("Error Connecting: {0}".format(str(e)))
|
|
return cursor
|
|
|
|
|
|
def execute_using_hive_hs2(query, query_config):
|
|
exec_result = HiveQueryResult(query, query_config=query_config)
|
|
plugin_runner = query_config.plugin_runner
|
|
cursor = getattr(threading.current_thread(), 'cursor', None)
|
|
if cursor is None:
|
|
cursor = get_hs2_hive_cursor(query_config.hiveserver,
|
|
user=query_config.user,
|
|
database=query.db,
|
|
use_kerberos=query_config.use_kerberos,
|
|
execOptions=query_config.exec_options)
|
|
threading.current_thread().cursor = cursor
|
|
|
|
if cursor is None: return exec_result
|
|
|
|
if plugin_runner: plugin_runner.run_plugins_pre(scope="Query")
|
|
try:
|
|
exec_result.start_time, start = datetime.now(), time()
|
|
cursor.execute(query.query_str)
|
|
exec_result.data = cursor.fetchall()
|
|
exec_result.time_taken = time() - start
|
|
exec_result.success = True
|
|
except Exception as e:
|
|
LOG.error(str(e))
|
|
exec_result.query_error = str(e)
|
|
finally:
|
|
if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
|
|
return exec_result
|
|
|
|
|
|
def get_hs2_impala_cursor(impalad, use_kerberos=False, database=None):
|
|
"""Get a cursor to an impalad
|
|
|
|
Args:
|
|
impalad: A string in form 'hostname:port' or 'hostname'
|
|
use_kerberos: boolean indication whether to get a secure connection.
|
|
database: default db to use in the connection.
|
|
|
|
Returns:
|
|
HiveServer2Cursor if the connection suceeds, None otherwise.
|
|
"""
|
|
try:
|
|
host, port = impalad.split(":")
|
|
except ValueError:
|
|
host, port = impalad, DEFAULT_HS2_PORT
|
|
cursor = None
|
|
try:
|
|
conn = connect(host=host,
|
|
port=port,
|
|
database=database,
|
|
auth_mechanism="GSSAPI" if use_kerberos else "NOSASL")
|
|
cursor = conn.cursor()
|
|
LOG.info("Connected to {0}:{1}".format(host, port))
|
|
except Exception as e:
|
|
LOG.error("Error connecting: {0}".format(str(e)))
|
|
return cursor
|
|
|
|
|
|
def execute_using_impala_hs2(query, query_config):
|
|
"""Executes a sql query against Impala using the hs2 interface.
|
|
|
|
Args:
|
|
query: Query
|
|
query_config: ImpalaHS2Config
|
|
|
|
Returns:
|
|
ImpalaQueryResult
|
|
"""
|
|
exec_result = ImpalaQueryResult(query, query_config=query_config)
|
|
plugin_runner = query_config.plugin_runner
|
|
cursor = get_hs2_impala_cursor(query_config.impalad,
|
|
use_kerberos=query_config.use_kerberos,
|
|
database=query.db)
|
|
if cursor is None: return exec_result
|
|
if plugin_runner: plugin_runner.run_plugins_pre(scope="Query")
|
|
try:
|
|
exec_result.start_time, start = datetime.now(), time()
|
|
cursor.execute(query.query_str)
|
|
exec_result.data = cursor.fetchall()
|
|
exec_result.time_taken = time() - start
|
|
exec_result.runtime_profile = cursor.get_profile()
|
|
exec_result.exec_summary = build_summary_table_from_thrift(cursor.get_summary())
|
|
exec_result.success = True
|
|
except Exception as e:
|
|
LOG.error(str(e))
|
|
exec_result.query_error = str(e)
|
|
finally:
|
|
cursor.close()
|
|
if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
|
|
return exec_result
|
|
|
|
|
|
def establish_beeswax_connection(query_config):
|
|
"""Establish a connection to the user specified impalad.
|
|
|
|
Args:
|
|
query_config (QueryExecConfig)
|
|
|
|
Returns:
|
|
ImpalaBeeswaxClient is the connection suceeds, None otherwise.
|
|
|
|
DEPRECATED: use hs2 instead of beeswax.
|
|
"""
|
|
LOG.warning('beeswax protocol is deprecated.')
|
|
use_kerberos = query_config.use_kerberos
|
|
user = query_config.user
|
|
password = query_config.password
|
|
use_ssl = query_config.use_ssl
|
|
# If the impalad is for the form host, convert it to host:port that the Impala beeswax
|
|
# client accepts.
|
|
if len(query_config.impalad.split(":")) == 1:
|
|
query_config.impalad = "{0}:{1}".format(query_config.impalad, DEFAULT_BEESWAX_PORT)
|
|
client = None
|
|
try:
|
|
client = ImpalaBeeswaxClient(query_config.impalad, use_kerberos=use_kerberos,
|
|
user=user, password=password, use_ssl=use_ssl)
|
|
# Try connect
|
|
client.connect()
|
|
# Set the exec options.
|
|
client.set_query_options(query_config.exec_options)
|
|
LOG.info("Connected to %s" % query_config.impalad)
|
|
except Exception as e:
|
|
LOG.error("Error connecting: {0}".format(str(e)))
|
|
return client
|
|
|
|
|
|
def execute_using_impala_beeswax(query, query_config):
|
|
"""Executes a query using beeswax.
|
|
|
|
A new client is created per query, then destroyed.
|
|
|
|
Args:
|
|
query (str): string containing the query to be executed.
|
|
query_config (QueryExecConfig)
|
|
|
|
Returns:
|
|
ImpalaQueryResult
|
|
|
|
DEPRECATED: use hs2 instead of beeswax.
|
|
"""
|
|
|
|
# Create a client object to talk to impalad
|
|
exec_result = ImpalaQueryResult(query, query_config=query_config)
|
|
plugin_runner = query_config.plugin_runner
|
|
client = establish_beeswax_connection(query_config)
|
|
if client is None: return exec_result
|
|
# We need to issue a use database here.
|
|
if query.db: client.execute("use {0}".format(query.db))
|
|
# create a map for query options and the query names to send to the plugin
|
|
context = build_context(query, query_config)
|
|
if plugin_runner: plugin_runner.run_plugins_pre(context=context, scope="Query")
|
|
result = None
|
|
try:
|
|
result = client.execute(query.query_str)
|
|
except Exception as e:
|
|
LOG.error(e)
|
|
exec_result.query_error = str(e)
|
|
finally:
|
|
client.close_connection()
|
|
if plugin_runner: plugin_runner.run_plugins_post(context=context, scope="Query")
|
|
return construct_exec_result(result, exec_result)
|
|
|
|
|
|
def build_context(query, query_config):
|
|
"""Build context based on query config for plugin_runner.
|
|
|
|
Why not pass QueryExecConfig to plugins directly?
|
|
|
|
Args:
|
|
query (str)
|
|
query_config (QueryExecConfig)
|
|
|
|
Returns:
|
|
dict str -> str
|
|
"""
|
|
|
|
context = vars(query_config)
|
|
context['query'] = query
|
|
return context
|
|
|
|
|
|
def construct_exec_result(result, exec_result):
|
|
""" Transform an ImpalaBeeswaxResult object to a ImpalaQueryResult object.
|
|
|
|
Args:
|
|
result (ImpalaBeeswasResult): Tranfers data from here.
|
|
exec_result (ImpalaQueryResult): Transfers data to here.
|
|
|
|
Returns:
|
|
ImpalaQueryResult
|
|
"""
|
|
|
|
# Return immedietely if the query failed.
|
|
if result is None or not result.success: return exec_result
|
|
exec_result.success = True
|
|
attrs = ['data', 'runtime_profile', 'start_time',
|
|
'time_taken', 'summary', 'exec_summary']
|
|
for attr in attrs:
|
|
setattr(exec_result, attr, getattr(result, attr))
|
|
return exec_result
|
|
|
|
|
|
def execute_using_jdbc(query, query_config):
|
|
"""Executes a query using JDBC"""
|
|
query_string = query.query_str + ';'
|
|
if query.db:
|
|
query_string = 'use %s; %s' % (query.db, query_string)
|
|
cmd = query_config.jdbc_client_cmd + " -q \"%s\"" % query_string
|
|
return run_query_capture_results(cmd, query)
|
|
|
|
|
|
def parse_jdbc_query_results(stdout, query):
|
|
"""
|
|
Parse query execution results for the Impala JDBC client
|
|
|
|
Parses the query execution details (avg time, stddev) from the output of the Impala
|
|
JDBC test client.
|
|
"""
|
|
jdbc_result_regex = r'row\(s\) in (\d*).(\d*)s'
|
|
time_taken = 0.0
|
|
for line in stdout.split('\n'):
|
|
match = re.search(jdbc_result_regex, line)
|
|
if match:
|
|
time_taken = float(('%s.%s') % (match.group(1), match.group(2)))
|
|
break
|
|
result_data = re.findall(r'\[START\]----\n(.*?)\n----\[END\]', stdout, re.DOTALL)[0]
|
|
return create_exec_result(time_taken, result_data, query)
|
|
|
|
|
|
def create_exec_result(time_taken, result_data, query):
|
|
exec_result = HiveQueryResult(query)
|
|
if result_data:
|
|
LOG.debug('Data:\n%s\n' % result_data)
|
|
exec_result.data = result_data
|
|
exec_result.time_taken = time_taken
|
|
exec_result.success = True
|
|
return exec_result
|
|
|
|
|
|
def run_query_capture_results(cmd, query):
|
|
"""
|
|
Runs the given query command and returns the execution result.
|
|
|
|
Takes in a match function that is used to parse stderr/stdout to extract the results.
|
|
"""
|
|
exec_result = HiveQueryResult(query)
|
|
start_time = datetime.now()
|
|
try:
|
|
rc, stdout, stderr = exec_process(cmd)
|
|
except Exception as e:
|
|
LOG.error('Error while executing query command: %s' % e)
|
|
exec_result.query_error = str(e)
|
|
# TODO: Should probably save the start time and query string for failed queries.
|
|
return exec_result
|
|
if rc != 0:
|
|
msg = ('Command returned with an error:\n'
|
|
'rc: %d\n'
|
|
'STDERR:\n%s'
|
|
'STDOUT:\n%s'
|
|
% (rc, stderr, stdout))
|
|
LOG.error(msg)
|
|
exec_result.query_error = msg
|
|
return exec_result
|
|
# The command completed
|
|
exec_result = parse_jdbc_query_results(stdout, query)
|
|
exec_result.query = query
|
|
exec_result.start_time = start_time
|
|
return exec_result
|