IMPALA-12990: Fix impala-shell handling of unset rows_deleted

The issue occurred in Python 3 when 0 rows were deleted from Iceberg.
It could also happen in other DMLs with older Impala servers where
TDmlResult.rows_deleted was not set. See the Jira for details of
the error.

Testing:
Extended shell tests for Kudu DML reporting to also cover Iceberg.

Change-Id: I5812b8006b9cacf34a7a0dbbc89a486d8b454438
Reviewed-on: http://gerrit.cloudera.org:8080/21284
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Csaba Ringhofer
2024-04-10 17:20:00 +02:00
committed by Zoltan Borok-Nagy
parent 1a90388b19
commit 8587949061
4 changed files with 97 additions and 72 deletions

View File

@@ -311,9 +311,10 @@ class ImpalaClient(object):
# differentiate between DML and non-DML. # differentiate between DML and non-DML.
def close_dml(self, last_query_handle): def close_dml(self, last_query_handle):
"""Fetches the results of a DML query. Returns a tuple containing the """Fetches the results of a DML query. Returns a tuple containing the
number of rows modified and the number of row errors, in that order. If the DML number of rows modified, the number of rows deleted, and the number of row errors,
operation doesn't return 'num_row_errors', then the second element in the tuple in that order. If the DML operation doesn't return 'rows_deleted' or
is None. Returns None if the query was not closed successfully. Not idempotent.""" 'num_row_errors', then the respective element in the tuple is None.
Returns None if the query was not closed successfully. Not idempotent."""
raise NotImplementedError() raise NotImplementedError()
def close_query(self, last_query_handle): def close_query(self, last_query_handle):
@@ -661,6 +662,13 @@ class ImpalaClient(object):
# to add arbitrary http headers. # to add arbitrary http headers.
return None return None
def _process_dml_result(self, dml_result):
num_rows = sum([int(k) for k in dml_result.rows_modified.values()])
num_deleted_rows = None
if dml_result.rows_deleted:
num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()])
return (num_rows, num_deleted_rows, dml_result.num_row_errors)
class ImpalaHS2Client(ImpalaClient): class ImpalaHS2Client(ImpalaClient):
"""Impala client. Uses the HS2 protocol plus Impala-specific extensions.""" """Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
@@ -972,14 +980,8 @@ class ImpalaHS2Client(ImpalaClient):
self._check_hs2_rpc_status(resp.status) self._check_hs2_rpc_status(resp.status)
if not resp.dml_result: if not resp.dml_result:
raise RPCException("Impala DML operation did not return DML statistics.") raise RPCException("Impala DML operation did not return DML statistics.")
num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()])
if resp.dml_result.rows_deleted:
num_deleted_rows = sum([int(k) for k in resp.dml_result.rows_deleted.values()])
else:
num_deleted_rows = None
last_query_handle.is_closed = True last_query_handle.is_closed = True
return (num_rows, num_deleted_rows, resp.dml_result.num_row_errors) return self._process_dml_result(resp.dml_result)
finally: finally:
self._clear_current_query_handle() self._clear_current_query_handle()
@@ -1449,17 +1451,12 @@ class ImpalaBeeswaxClient(ImpalaClient):
return return
def close_dml(self, last_query_handle): def close_dml(self, last_query_handle):
insert_result, rpc_status = self._do_beeswax_rpc( dml_result, rpc_status = self._do_beeswax_rpc(
lambda: self.imp_service.CloseInsert(last_query_handle)) lambda: self.imp_service.CloseInsert(last_query_handle))
if rpc_status != RpcStatus.OK: if rpc_status != RpcStatus.OK:
raise RPCException() raise RPCException()
num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
if insert_result.rows_deleted:
num_deleted_rows = sum([int(k) for k in insert_result.rows_deleted.values()])
else:
num_deleted_rows = None
last_query_handle.is_closed = True last_query_handle.is_closed = True
return (num_rows, num_deleted_rows, insert_result.num_row_errors) return self._process_dml_result(dml_result)
def close_query(self, last_query_handle): def close_query(self, last_query_handle):
# Set a member in the handle to make sure that it is idempotent # Set a member in the handle to make sure that it is idempotent

View File

@@ -1343,6 +1343,31 @@ class ImpalaShell(cmd.Cmd, object):
"Est. #Rows", "Peak Mem", "Est. #Rows", "Peak Mem",
"Est. Peak Mem", "Detail"]) "Est. Peak Mem", "Detail"])
def _format_num_rows_report(self, time_elapsed, num_fetched_rows=None, dml_result=None):
num_rows = None
verb = None
error_report = ""
if dml_result is not None:
(num_modified_rows, num_deleted_rows, num_row_errors) = dml_result
if num_modified_rows == 0 and num_deleted_rows is not None and num_deleted_rows > 0:
verb = "Deleted"
num_rows = num_deleted_rows
elif num_modified_rows is not None:
verb = "Modified"
num_rows = num_modified_rows
# Add the number of row errors if this DML and the operation supports it.
# num_row_errors is None if the DML operation doesn't return it.
if num_row_errors is not None:
error_report = ", %d row error(s)" % (num_row_errors)
elif num_fetched_rows is not None:
verb = "Fetched"
num_rows = num_fetched_rows
if verb is not None:
return "%s %d row(s)%s in %2.2fs" % (verb, num_rows, error_report, time_elapsed)
else:
return "Time elapsed: %2.2fs" % time_elapsed
def _execute_stmt(self, query_str, is_dml=False, print_web_link=False): def _execute_stmt(self, query_str, is_dml=False, print_web_link=False):
"""Executes 'query_str' with options self.set_query_options on the Impala server. """Executes 'query_str' with options self.set_query_options on the Impala server.
The query is run to completion and close with any results, warnings, errors or The query is run to completion and close with any results, warnings, errors or
@@ -1381,8 +1406,7 @@ class ImpalaShell(cmd.Cmd, object):
if is_dml: if is_dml:
# retrieve the error log # retrieve the error log
warning_log = self.imp_client.get_warning_log(self.last_query_handle) warning_log = self.imp_client.get_warning_log(self.last_query_handle)
(num_rows, num_deleted_rows, num_row_errors) = self.imp_client.close_dml( dml_result = self.imp_client.close_dml(self.last_query_handle)
self.last_query_handle)
else: else:
# impalad does not support the fetching of metadata for certain types of queries. # impalad does not support the fetching of metadata for certain types of queries.
if not self.imp_client.expect_result_metadata(query_str, self.last_query_handle): if not self.imp_client.expect_result_metadata(query_str, self.last_query_handle):
@@ -1414,27 +1438,14 @@ class ImpalaShell(cmd.Cmd, object):
if warning_log: if warning_log:
self._print_if_verbose(warning_log) self._print_if_verbose(warning_log)
# print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise.
verb = ["Fetched", "Modified"][is_dml]
time_elapsed = end_time - start_time time_elapsed = end_time - start_time
row_report = ""
# Add the number of row errors if this DML and the operation supports it. if is_dml:
# num_row_errors is None if the DML operation doesn't return it. row_report = self._format_num_rows_report(time_elapsed, dml_result=dml_result)
if is_dml and num_row_errors is not None:
error_report = ", %d row error(s)" % (num_row_errors)
else: else:
error_report = "" row_report = self._format_num_rows_report(time_elapsed, num_fetched_rows=num_rows)
self._print_if_verbose(row_report)
if is_dml and num_rows == 0 and num_deleted_rows > 0:
verb = "Deleted"
self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
(verb, num_deleted_rows, error_report, time_elapsed))
elif num_rows is not None:
self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
(verb, num_rows, error_report, time_elapsed))
else:
self._print_if_verbose("Time elapsed: %2.2fs" %
(time_elapsed))
if not is_dml: if not is_dml:
self.imp_client.close_query(self.last_query_handle) self.imp_client.close_query(self.last_query_handle)

View File

@@ -312,13 +312,13 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
query_handle = self.custom_hs2_http_client.execute_query(dml, {}) query_handle = self.custom_hs2_http_client.execute_query(dml, {})
self.custom_hs2_http_client.wait_to_finish(query_handle) self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50) self.transport.enable_fault(502, "Injected Fault", 0.50)
(num_rows, num_row_errors) = None, None exception = None
try: try:
(num_rows, num_row_errors) = self.custom_hs2_http_client.close_dml(query_handle) self.custom_hs2_http_client.close_dml(query_handle)
except Exception as e: except Exception as e:
assert str(e) == 'HTTP code 502: Injected Fault' exception = e
assert num_rows is None assert exception is not None
assert num_row_errors is None assert str(exception) == 'HTTP code 502: Injected Fault'
output = capsys.readouterr()[1].splitlines() output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation") assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")

View File

@@ -915,42 +915,59 @@ class TestImpalaShell(ImpalaTestSuite):
else: else:
assert "Modified 1 row(s)" in results.stderr assert "Modified 1 row(s)" in results.stderr
def _validate_dml_stmt(self, vector, stmt, expected_rows_modified, expected_row_errors):
results = run_impala_shell_cmd(vector, ['--query=%s' % stmt])
expected_output = "Modified %d row(s), %d row error(s)" %\
(expected_rows_modified, expected_row_errors)
assert expected_output in results.stderr, results.stderr
def test_kudu_dml_reporting(self, vector, unique_database): def test_kudu_dml_reporting(self, vector, unique_database):
if vector.get_value('strict_hs2_protocol'): if vector.get_value('strict_hs2_protocol'):
pytest.skip("Kudu not supported in strict hs2 mode.") pytest.skip("Kudu not supported in strict hs2 mode.")
db = unique_database create_sql = 'create table %s (id int primary key, age int null)' \
run_impala_shell_cmd(vector, [ 'partition by hash(id) partitions 2 stored as kudu'
'--query=create table %s.dml_test (id int primary key, ' self._test_dml_reporting(vector, create_sql, unique_database, True)
'age int null) partition by hash(id) partitions 2 stored as kudu' % db])
self._validate_dml_stmt( def test_iceberg_dml_reporting(self, vector, unique_database):
vector, "insert into %s.dml_test (id) values (7), (7)" % db, 1, 1) if vector.get_value('strict_hs2_protocol'):
self._validate_dml_stmt(vector, "insert into %s.dml_test (id) values (7)" % db, 0, 1) pytest.skip("DML results are not completely supported in strict hs2 mode.")
self._validate_dml_stmt( create_sql = 'create table %s (id int, age int) ' \
vector, "upsert into %s.dml_test (id) values (7), (7)" % db, 2, 0) 'stored as iceberg tblproperties("format-version"="2")'
self._validate_dml_stmt( self._test_dml_reporting(vector, create_sql, unique_database, False)
vector, "update %s.dml_test set age = 1 where id = 7" % db, 1, 0)
self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % db, 1, 0)
# UPDATE/DELETE where there are no matching rows; there are no errors because the def _test_dml_reporting(self, vector, create_sql, db, is_kudu):
# scan produced no rows. """ Runs DMLs on Kudu or Iceberg tables and verifies that modifed / deleted row
self._validate_dml_stmt( count and number of row errors in Kudu are reported correctly. Kudu and Iceberg
vector, "update %s.dml_test set age = 1 where id = 8" % db, 0, 0) can have different results when adding rows with the same primary key as this
self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % db, 0, 0) leads to row errors in Kudu.
"""
tbl = db + ".dml_test"
run_impala_shell_cmd(vector, ['--query=' + create_sql % tbl])
def validate(stmt, expected_rows_modified_iceberg, expected_rows_modified_kudu,
expected_row_errors_kudu, is_delete=False):
results = run_impala_shell_cmd(vector, ['--query=' + stmt % tbl])
expected = ""
if is_kudu:
expected = "Modified %d row(s), %d row error(s)" \
% (expected_rows_modified_kudu, expected_row_errors_kudu)
elif is_delete and expected_rows_modified_iceberg > 0:
expected = "Deleted %d row(s)" % expected_rows_modified_iceberg
else:
expected = "Modified %d row(s)" % expected_rows_modified_iceberg
assert expected in results.stderr, results.stderr
validate("insert into %s (id) values (7), (7)", 2, 1, 1)
validate("insert into %s (id) values (7)", 1, 0, 1)
if is_kudu:
validate("upsert into %s (id) values (7), (7)", -1, 2, 0)
validate("update %s set age = 1 where id = 7", 3, 1, 0)
validate("delete from %s where id = 7", 3, 1, 0, is_delete=True)
# UPDATE/DELETE where there are no matching rows; there are no errors in Kudu because
# the scan produced no rows.
validate("update %s set age = 1 where id = 8", 0, 0, 0)
validate("delete from %s where id = 7", 0, 0, 0, is_delete=True)
# WITH clauses, only apply to INSERT and UPSERT # WITH clauses, only apply to INSERT and UPSERT
self._validate_dml_stmt(vector, validate("with y as (values(7)) insert into %s (id) select * from y", 1, 1, 0)
"with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 1, 0) validate("with y as (values(7)) insert into %s (id) select * from y", 1, 0, 1)
self._validate_dml_stmt(vector, if is_kudu:
"with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 0, 1) validate("with y as (values(7)) upsert into %s (id) select * from y", -1, 1, 0)
self._validate_dml_stmt(vector,
"with y as (values(7)) upsert into %s.dml_test (id) select * from y" % db, 1, 0)
def test_missing_query_file(self, vector): def test_missing_query_file(self, vector):
result = run_impala_shell_cmd(vector, ['-f', 'nonexistent.sql'], expect_success=False) result = run_impala_shell_cmd(vector, ['-f', 'nonexistent.sql'], expect_success=False)