IMPALA-12216: Print timestamp for impala-shell errors

This change will print timestamp of an exception or warning
occurred during execution of a query via impala-shell.
The timestamp will use timezone of the machine running impala-shell.

example:
Query submitted at: 2024-08-22 16:17:57 (Coordinator: http://host:25000)
Query state can be monitored at:
http://localhost:25000/query_plan?query_id=e04dcc55e560d1ee:11173fe800000000
^C Cancelling Query
Opened TCP connection to localhost:21050
2024-08-22 16:17:58 [Exception] type=<class 'socket.error'> in FetchResults.
[Errno 4] Interrupted system call
2024-08-22 16:17:58 [Warning]  Cancelling Query
2024-08-22 16:17:58 [Warning] close session RPC failed: <class
'shell_exceptions.QueryCancelledByShellException'>
Opened TCP connection to localhost:21050
[localhost:21050] default>

Change-Id: I4abbd02aa9f61210b0333495bf191e72c22a5944
Reviewed-on: http://gerrit.cloudera.org:8080/21426
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Saurabh Katiyal
2024-05-15 00:06:51 +05:30
committed by Impala Public Jenkins
parent f3ce97916f
commit 2535e79491
6 changed files with 124 additions and 72 deletions

View File

@@ -27,6 +27,19 @@ from tests.common.impala_test_suite import IMPALAD_HS2_HTTP_HOST_PORT
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from time import sleep
"""IMPALA-12216 implemented timestamp to be printed in case of any error/warning
during query execution, below is an example :
2024-07-15 12:49:27 [Exception] type=<class 'socket.error'> in FetchResults.
2024-07-15 12:49:27 [Warning] Cancelling Query
2024-07-15 12:49:27 [Warning] close session RPC failed: <class 'shell_exceptions.
QueryCancelledByShellException'>
To avoid test flakiness due to timestamp, we would be ignoring timestamp in actual
result before asserting with expected result, (YYYY-MM-DD hh:mm:ss ) is of length 20
"""
TS_LEN = 20
class FaultInjectingHttpClient(ImpalaHttpClient, object):
"""Class for injecting faults in the ImpalaHttpClient. Faults are injected by using the
@@ -132,35 +145,34 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
def __expect_msg_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
"type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3".format(impala_rpc_name))
return ("[Exception] type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3 HTTP code 502: Injected Fault".format(impala_rpc_name))
def __expect_msg_retry_with_extra(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried and where the http
message has a message body"""
return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
"type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3".format(impala_rpc_name))
return ("[Exception] type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3 HTTP code 503: Injected Fault [EXTRA]"
.format(impala_rpc_name))
def __expect_msg_retry_with_retry_after(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried and the http
message has a body and a Retry-After header that can be correctly decoded"""
return ("Caught exception HTTP code 503: Injected Fault [EXTRA], "
"type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
return ("[Exception] type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3, retry after 1 secs "
"HTTP code 503: Injected Fault [EXTRA]".format(impala_rpc_name))
def __expect_msg_retry_with_retry_after_no_extra(self, impala_rpc_name):
"""Returns expected log message for rpcs which can be retried and the http
message has a Retry-After header that can be correctly decoded"""
return ("Caught exception HTTP code 503: Injected Fault, "
"type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3, retry after 1 secs".format(impala_rpc_name))
return ("[Exception] type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"Num remaining tries: 3, retry after 1 secs "
"HTTP code 503: Injected Fault".format(impala_rpc_name))
def __expect_msg_no_retry(self, impala_rpc_name):
"""Returns expected log message for rpcs which can not be retried"""
return ("Caught exception HTTP code 502: Injected Fault, "
"type=<class 'shell.shell_exceptions.HttpError'> in {0}. ".format(impala_rpc_name))
return ("[Exception] type=<class 'shell.shell_exceptions.HttpError'> in {0}. "
"HTTP code 502: Injected Fault".format(impala_rpc_name))
@pytest.mark.execute_serially
def test_connect(self, capsys):
@@ -170,8 +182,8 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
self.transport.enable_fault(502, "Injected Fault", 0.20)
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("OpenSession")
assert output[2] == self.__expect_msg_retry("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry("OpenSession")
assert output[2][TS_LEN:] == self.__expect_msg_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_connect_proxy(self, capsys):
@@ -182,8 +194,9 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
self.transport.enable_fault(503, "Injected Fault", 0.20, 'EXTRA')
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2] == self.__expect_msg_retry_with_extra("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2][TS_LEN:] == self.\
__expect_msg_retry_with_extra("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_connect_proxy_no_retry(self, capsys):
@@ -195,8 +208,9 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
{"header1": "value1"})
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2] == self.__expect_msg_retry_with_extra("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2][TS_LEN:] == self.\
__expect_msg_retry_with_extra("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_connect_proxy_bad_retry(self, capsys):
@@ -209,8 +223,9 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
"Retry-After": "junk"})
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2] == self.__expect_msg_retry_with_extra("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry_with_extra("OpenSession")
assert output[2][TS_LEN:] == self.\
__expect_msg_retry_with_extra("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_connect_proxy_retry(self, capsys):
@@ -222,8 +237,9 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
"Retry-After": "1"})
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry_with_retry_after("OpenSession")
assert output[2] == self.__expect_msg_retry_with_retry_after("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry_with_retry_after("OpenSession")
assert output[2][TS_LEN:] == self.\
__expect_msg_retry_with_retry_after("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_connect_proxy_retry_no_body(self, capsys):
@@ -235,8 +251,9 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
"Retry-After": "1"})
self.connect()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry_with_retry_after_no_extra("OpenSession")
assert output[2] == self.\
assert output[1][TS_LEN:] == self.\
__expect_msg_retry_with_retry_after_no_extra("OpenSession")
assert output[2][TS_LEN:] == self.\
__expect_msg_retry_with_retry_after_no_extra("CloseImpalaOperation")
@pytest.mark.execute_serially
@@ -248,7 +265,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
self.transport.enable_fault(502, "Injected Fault", 0.50)
self.custom_hs2_http_client.close_connection()
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseSession")
assert output[1][TS_LEN:] == self.__expect_msg_no_retry("CloseSession")
@pytest.mark.execute_serially
def test_ping(self, capsys):
@@ -263,7 +280,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
page = requests.get('{0}/{1}'.format(webserver_address, 'healthz'))
assert page.status_code == requests.codes.ok
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("PingImpalaHS2Service")
assert output[1][TS_LEN:] == self.__expect_msg_retry("PingImpalaHS2Service")
@pytest.mark.execute_serially
def test_execute_query(self, capsys):
@@ -278,7 +295,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert str(e) == 'HTTP code 502: Injected Fault'
assert query_handle is None
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("ExecuteStatement")
assert output[1][TS_LEN:] == self.__expect_msg_no_retry("ExecuteStatement")
@pytest.mark.execute_serially
def test_fetch(self, capsys):
@@ -298,7 +315,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert num_rows is None
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("FetchResults")
assert output[1][TS_LEN:] == self.__expect_msg_no_retry("FetchResults")
@pytest.mark.execute_serially
def test_close_dml(self, unique_database, capsys):
@@ -320,7 +337,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert exception is not None
assert str(exception) == 'HTTP code 502: Injected Fault'
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_no_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_close_query(self, capsys):
@@ -332,7 +349,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
self.transport.enable_fault(502, "Injected Fault", 0.50)
self.custom_hs2_http_client.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("CloseImpalaOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry("CloseImpalaOperation")
@pytest.mark.execute_serially
def test_cancel_query(self, capsys):
@@ -346,7 +363,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert success
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("CancelOperation")
assert output[1][TS_LEN:] == self.__expect_msg_retry("CancelOperation")
@pytest.mark.execute_serially
def test_get_query_state(self, capsys):
@@ -360,7 +377,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert query_state == self.custom_hs2_http_client.FINISHED_STATE
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetOperationStatus")
assert output[1][TS_LEN:] == self.__expect_msg_retry("GetOperationStatus")
@pytest.mark.execute_serially
def test_get_runtime_profile_summary(self, capsys):
@@ -377,8 +394,8 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
summary = self.custom_hs2_http_client.get_summary(query_handle)
assert summary is not None
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetRuntimeProfile")
assert output[2] == self.__expect_msg_retry("GetExecSummary")
assert output[1][TS_LEN:] == self.__expect_msg_retry("GetRuntimeProfile")
assert output[2][TS_LEN:] == self.__expect_msg_retry("GetExecSummary")
@pytest.mark.execute_serially
def test_get_warning_log(self, capsys):
@@ -394,7 +411,7 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
assert warning_log == 'WARNINGS: JOIN hint not recognized: foo'
self.close_query(query_handle)
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_retry("GetLog")
assert output[1][TS_LEN:] == self.__expect_msg_retry("GetLog")
@pytest.mark.execute_serially
def test_connection_drop(self):