mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
Add support for HS2 protocol V6
This patch adds support for V6 of the HS2 protocol, which notably includes columnar organisation of result sets. Clients that set their protocol version to < V6 will receive result sets in the traditional row orientation. The performance of fetches over HS2 goes up significantly as a result, since the V1 protocol had some pathologies in its deserialisation performance. Beeswax Row materialisation: 455ms, client processing time: 523ms HS2 V6: Row materialisation: 444ms, client processing time: 1.8s HS2 V1: Row materialisation: 585ms, client processing time: 15.9s (!) TODO: Add support for the CHAR datatype The following patch is also included: Fix wait-for-hiveserver2.py when Impala moves to HS2 V6 Due to HIVE-6050, older versions of Hive are not compatible with newer clients (even those that try to use old protocol versions). wait-for-hiveserver2.py uses HS2 to talk to the HiveServer2 service, but picks up the newer version from V6, and fails. This patch temporarily re-adds cli_service.thrift (renaming the Thrift service as LegacyTCLIService) only for wait-for-hiveserver2.py to use. As soon as Impala's thirdparty Hive moves to HS2 V6, we can get rid of this change. Change-Id: I2cbe884345ae7e772620b80a29b6574bd6532940 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/4402 Tested-by: jenkins Reviewed-by: Henry Robinson <henry@cloudera.com>
This commit is contained in:
@@ -16,9 +16,10 @@
|
||||
|
||||
import pytest
|
||||
from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
|
||||
from cli_service import TCLIService
|
||||
from TCLIService import TCLIService
|
||||
|
||||
# Simple test to make sure all the HS2 types are supported.
|
||||
# Simple test to make sure all the HS2 types are supported for both the row and
|
||||
# column-oriented versions of the HS2 protocol.
|
||||
class TestFetch(HS2TestSuite):
|
||||
def __verify_result_precision_scale(self, t, precision, scale):
|
||||
# This should be DECIMAL_TYPE but how do I get that in python
|
||||
@@ -28,8 +29,8 @@ class TestFetch(HS2TestSuite):
|
||||
assert p.i32Value == precision
|
||||
assert s.i32Value == scale
|
||||
|
||||
@needs_session
|
||||
def test_query_stmts(self):
|
||||
@needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
def test_alltypes_v1(self):
|
||||
execute_statement_req = TCLIService.TExecuteStatementReq()
|
||||
execute_statement_req.sessionHandle = self.session_handle
|
||||
|
||||
@@ -60,3 +61,90 @@ class TestFetch(HS2TestSuite):
|
||||
|
||||
self.close(execute_statement_resp.operationHandle)
|
||||
|
||||
def __query_and_fetch(self, query):
|
||||
execute_statement_req = TCLIService.TExecuteStatementReq()
|
||||
execute_statement_req.sessionHandle = self.session_handle
|
||||
execute_statement_req.statement = query
|
||||
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
|
||||
HS2TestSuite.check_response(execute_statement_resp)
|
||||
|
||||
fetch_results_req = TCLIService.TFetchResultsReq()
|
||||
fetch_results_req.operationHandle = execute_statement_resp.operationHandle
|
||||
fetch_results_req.maxRows = 1024
|
||||
fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
|
||||
HS2TestSuite.check_response(fetch_results_resp)
|
||||
|
||||
return fetch_results_resp
|
||||
|
||||
def __column_results_to_string(self, columns):
|
||||
"""Quick-and-dirty way to get a readable string to compare the output of a
|
||||
columnar-oriented query to its expected output"""
|
||||
formatted = ""
|
||||
num_rows = 0
|
||||
# Determine the number of rows by finding the type of the first column
|
||||
for col_type in HS2TestSuite.HS2_V6_COLUMN_TYPES:
|
||||
typed_col = getattr(columns[0], col_type)
|
||||
if typed_col != None:
|
||||
num_rows = len(typed_col.values)
|
||||
break
|
||||
|
||||
for i in xrange(num_rows):
|
||||
row = []
|
||||
for c in columns:
|
||||
for col_type in HS2TestSuite.HS2_V6_COLUMN_TYPES:
|
||||
typed_col = getattr(c, col_type)
|
||||
if typed_col != None:
|
||||
indicator = ord(typed_col.nulls[i / 8])
|
||||
if indicator & (1 << (i % 8)):
|
||||
row.append("NULL")
|
||||
else:
|
||||
row.append(str(typed_col.values[i]))
|
||||
break
|
||||
formatted += (", ".join(row) + "\n")
|
||||
return (num_rows, formatted)
|
||||
|
||||
@needs_session()
|
||||
def test_alltypes_v6(self):
|
||||
"""Test that a simple select statement works for all types"""
|
||||
fetch_results_resp = self.__query_and_fetch(
|
||||
"SELECT *, NULL from functional.alltypes ORDER BY id LIMIT 1")
|
||||
|
||||
num_rows, result = self.__column_results_to_string(fetch_results_resp.results.columns)
|
||||
assert num_rows == 1
|
||||
assert result == \
|
||||
"0, True, 0, 0, 0, 0, 0.0, 0.0, 01/01/09, 0, 2009-01-01 00:00:00, 2009, 1, NULL\n"
|
||||
|
||||
# Decimals
|
||||
fetch_results_resp = self.__query_and_fetch(
|
||||
"SELECT * from functional.decimal_tbl LIMIT 1")
|
||||
num_rows, result = self.__column_results_to_string(fetch_results_resp.results.columns)
|
||||
assert result == ("1234, 2222, 1.2345678900, "
|
||||
"0.12345678900000000000000000000000000000, 12345.78900, 1\n")
|
||||
|
||||
# VARCHAR (TODO: CHAR)
|
||||
fetch_results_resp = self.__query_and_fetch("SELECT CAST('str' AS VARCHAR(3))")
|
||||
num_rows, result = self.__column_results_to_string(fetch_results_resp.results.columns)
|
||||
assert result == "str\n"
|
||||
|
||||
@needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
|
||||
def test_execute_select_v1(self):
|
||||
"""Test that a simple select statement works in the row-oriented protocol"""
|
||||
execute_statement_req = TCLIService.TExecuteStatementReq()
|
||||
execute_statement_req.sessionHandle = self.session_handle
|
||||
execute_statement_req.statement = "SELECT COUNT(*) FROM functional.alltypes"
|
||||
execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
|
||||
HS2TestSuite.check_response(execute_statement_resp)
|
||||
|
||||
fetch_results_req = TCLIService.TFetchResultsReq()
|
||||
fetch_results_req.operationHandle = execute_statement_resp.operationHandle
|
||||
fetch_results_req.maxRows = 100
|
||||
fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
|
||||
HS2TestSuite.check_response(fetch_results_resp)
|
||||
|
||||
assert len(fetch_results_resp.results.rows) == 1
|
||||
assert fetch_results_resp.results.startRowOffset == 0
|
||||
|
||||
try:
|
||||
assert not fetch_results_resp.hasMoreRows
|
||||
except AssertionError:
|
||||
pytest.xfail("IMPALA-558")
|
||||
|
||||
Reference in New Issue
Block a user