From 541fc5ee9ec2d804f2ba45feb2df5bb96a013f86 Mon Sep 17 00:00:00 2001 From: Csaba Ringhofer Date: Wed, 10 Apr 2024 17:20:00 +0200 Subject: [PATCH] IMPALA-12990: Fix impala-shell handling of unset rows_deleted The issue occurred in Python 3 when 0 rows were deleted from Iceberg. It could also happen in other DMLs with older Impala servers where TDmlResult.rows_deleted was not set. See the Jira for details of the error. Testing: Extended shell tests for Kudu DML reporting to also cover Iceberg. Change-Id: I5812b8006b9cacf34a7a0dbbc89a486d8b454438 Reviewed-on: http://gerrit.cloudera.org:8080/21284 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- shell/impala_client.py | 31 ++++---- shell/impala_shell.py | 53 +++++++------ .../test_hs2_fault_injection.py | 10 +-- tests/shell/test_shell_commandline.py | 75 ++++++++++++------- 4 files changed, 97 insertions(+), 72 deletions(-) diff --git a/shell/impala_client.py b/shell/impala_client.py index faf9486c3..b94df8931 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -311,9 +311,10 @@ class ImpalaClient(object): # differentiate between DML and non-DML. def close_dml(self, last_query_handle): """Fetches the results of a DML query. Returns a tuple containing the - number of rows modified and the number of row errors, in that order. If the DML - operation doesn't return 'num_row_errors', then the second element in the tuple - is None. Returns None if the query was not closed successfully. Not idempotent.""" + number of rows modified, the number of rows deleted, and the number of row errors, + in that order. If the DML operation doesn't return 'rows_deleted' or + 'num_row_errors', then the respective element in the tuple is None. + Returns None if the query was not closed successfully. Not idempotent.""" raise NotImplementedError() def close_query(self, last_query_handle): @@ -661,6 +662,13 @@ class ImpalaClient(object): # to add arbitrary http headers. return None + def _process_dml_result(self, dml_result): + num_rows = sum([int(k) for k in dml_result.rows_modified.values()]) + num_deleted_rows = None + if dml_result.rows_deleted: + num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()]) + return (num_rows, num_deleted_rows, dml_result.num_row_errors) + class ImpalaHS2Client(ImpalaClient): """Impala client. Uses the HS2 protocol plus Impala-specific extensions.""" @@ -972,14 +980,8 @@ class ImpalaHS2Client(ImpalaClient): self._check_hs2_rpc_status(resp.status) if not resp.dml_result: raise RPCException("Impala DML operation did not return DML statistics.") - - num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()]) - if resp.dml_result.rows_deleted: - num_deleted_rows = sum([int(k) for k in resp.dml_result.rows_deleted.values()]) - else: - num_deleted_rows = None last_query_handle.is_closed = True - return (num_rows, num_deleted_rows, resp.dml_result.num_row_errors) + return self._process_dml_result(resp.dml_result) finally: self._clear_current_query_handle() @@ -1449,17 +1451,12 @@ class ImpalaBeeswaxClient(ImpalaClient): return def close_dml(self, last_query_handle): - insert_result, rpc_status = self._do_beeswax_rpc( + dml_result, rpc_status = self._do_beeswax_rpc( lambda: self.imp_service.CloseInsert(last_query_handle)) if rpc_status != RpcStatus.OK: raise RPCException() - num_rows = sum([int(k) for k in insert_result.rows_modified.values()]) - if insert_result.rows_deleted: - num_deleted_rows = sum([int(k) for k in insert_result.rows_deleted.values()]) - else: - num_deleted_rows = None last_query_handle.is_closed = True - return (num_rows, num_deleted_rows, insert_result.num_row_errors) + return self._process_dml_result(dml_result) def close_query(self, last_query_handle): # Set a member in the handle to make sure that it is idempotent diff --git a/shell/impala_shell.py b/shell/impala_shell.py index e76f09599..7998fce53 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -1343,6 +1343,31 @@ class ImpalaShell(cmd.Cmd, object): "Est. #Rows", "Peak Mem", "Est. Peak Mem", "Detail"]) + def _format_num_rows_report(self, time_elapsed, num_fetched_rows=None, dml_result=None): + num_rows = None + verb = None + error_report = "" + if dml_result is not None: + (num_modified_rows, num_deleted_rows, num_row_errors) = dml_result + if num_modified_rows == 0 and num_deleted_rows is not None and num_deleted_rows > 0: + verb = "Deleted" + num_rows = num_deleted_rows + elif num_modified_rows is not None: + verb = "Modified" + num_rows = num_modified_rows + # Add the number of row errors if this DML and the operation supports it. + # num_row_errors is None if the DML operation doesn't return it. + if num_row_errors is not None: + error_report = ", %d row error(s)" % (num_row_errors) + elif num_fetched_rows is not None: + verb = "Fetched" + num_rows = num_fetched_rows + + if verb is not None: + return "%s %d row(s)%s in %2.2fs" % (verb, num_rows, error_report, time_elapsed) + else: + return "Time elapsed: %2.2fs" % time_elapsed + def _execute_stmt(self, query_str, is_dml=False, print_web_link=False): """Executes 'query_str' with options self.set_query_options on the Impala server. The query is run to completion and close with any results, warnings, errors or @@ -1381,8 +1406,7 @@ class ImpalaShell(cmd.Cmd, object): if is_dml: # retrieve the error log warning_log = self.imp_client.get_warning_log(self.last_query_handle) - (num_rows, num_deleted_rows, num_row_errors) = self.imp_client.close_dml( - self.last_query_handle) + dml_result = self.imp_client.close_dml(self.last_query_handle) else: # impalad does not support the fetching of metadata for certain types of queries. if not self.imp_client.expect_result_metadata(query_str, self.last_query_handle): @@ -1414,27 +1438,14 @@ class ImpalaShell(cmd.Cmd, object): if warning_log: self._print_if_verbose(warning_log) - # print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise. - verb = ["Fetched", "Modified"][is_dml] + time_elapsed = end_time - start_time - - # Add the number of row errors if this DML and the operation supports it. - # num_row_errors is None if the DML operation doesn't return it. - if is_dml and num_row_errors is not None: - error_report = ", %d row error(s)" % (num_row_errors) + row_report = "" + if is_dml: + row_report = self._format_num_rows_report(time_elapsed, dml_result=dml_result) else: - error_report = "" - - if is_dml and num_rows == 0 and num_deleted_rows > 0: - verb = "Deleted" - self._print_if_verbose("%s %d row(s)%s in %2.2fs" % - (verb, num_deleted_rows, error_report, time_elapsed)) - elif num_rows is not None: - self._print_if_verbose("%s %d row(s)%s in %2.2fs" % - (verb, num_rows, error_report, time_elapsed)) - else: - self._print_if_verbose("Time elapsed: %2.2fs" % - (time_elapsed)) + row_report = self._format_num_rows_report(time_elapsed, num_fetched_rows=num_rows) + self._print_if_verbose(row_report) if not is_dml: self.imp_client.close_query(self.last_query_handle) diff --git a/tests/custom_cluster/test_hs2_fault_injection.py b/tests/custom_cluster/test_hs2_fault_injection.py index 67d52c2b7..7f326a9b9 100644 --- a/tests/custom_cluster/test_hs2_fault_injection.py +++ b/tests/custom_cluster/test_hs2_fault_injection.py @@ -312,13 +312,13 @@ class TestHS2FaultInjection(CustomClusterTestSuite): query_handle = self.custom_hs2_http_client.execute_query(dml, {}) self.custom_hs2_http_client.wait_to_finish(query_handle) self.transport.enable_fault(502, "Injected Fault", 0.50) - (num_rows, num_row_errors) = None, None + exception = None try: - (num_rows, num_row_errors) = self.custom_hs2_http_client.close_dml(query_handle) + self.custom_hs2_http_client.close_dml(query_handle) except Exception as e: - assert str(e) == 'HTTP code 502: Injected Fault' - assert num_rows is None - assert num_row_errors is None + exception = e + assert exception is not None + assert str(exception) == 'HTTP code 502: Injected Fault' output = capsys.readouterr()[1].splitlines() assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation") diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index 32ed405da..3f348ba71 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -915,42 +915,59 @@ class TestImpalaShell(ImpalaTestSuite): else: assert "Modified 1 row(s)" in results.stderr - def _validate_dml_stmt(self, vector, stmt, expected_rows_modified, expected_row_errors): - results = run_impala_shell_cmd(vector, ['--query=%s' % stmt]) - expected_output = "Modified %d row(s), %d row error(s)" %\ - (expected_rows_modified, expected_row_errors) - assert expected_output in results.stderr, results.stderr - def test_kudu_dml_reporting(self, vector, unique_database): if vector.get_value('strict_hs2_protocol'): pytest.skip("Kudu not supported in strict hs2 mode.") - db = unique_database - run_impala_shell_cmd(vector, [ - '--query=create table %s.dml_test (id int primary key, ' - 'age int null) partition by hash(id) partitions 2 stored as kudu' % db]) + create_sql = 'create table %s (id int primary key, age int null)' \ + 'partition by hash(id) partitions 2 stored as kudu' + self._test_dml_reporting(vector, create_sql, unique_database, True) - self._validate_dml_stmt( - vector, "insert into %s.dml_test (id) values (7), (7)" % db, 1, 1) - self._validate_dml_stmt(vector, "insert into %s.dml_test (id) values (7)" % db, 0, 1) - self._validate_dml_stmt( - vector, "upsert into %s.dml_test (id) values (7), (7)" % db, 2, 0) - self._validate_dml_stmt( - vector, "update %s.dml_test set age = 1 where id = 7" % db, 1, 0) - self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % db, 1, 0) + def test_iceberg_dml_reporting(self, vector, unique_database): + if vector.get_value('strict_hs2_protocol'): + pytest.skip("DML results are not completely supported in strict hs2 mode.") + create_sql = 'create table %s (id int, age int) ' \ + 'stored as iceberg tblproperties("format-version"="2")' + self._test_dml_reporting(vector, create_sql, unique_database, False) - # UPDATE/DELETE where there are no matching rows; there are no errors because the - # scan produced no rows. - self._validate_dml_stmt( - vector, "update %s.dml_test set age = 1 where id = 8" % db, 0, 0) - self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % db, 0, 0) + def _test_dml_reporting(self, vector, create_sql, db, is_kudu): + """ Runs DMLs on Kudu or Iceberg tables and verifies that modifed / deleted row + count and number of row errors in Kudu are reported correctly. Kudu and Iceberg + can have different results when adding rows with the same primary key as this + leads to row errors in Kudu. + """ + tbl = db + ".dml_test" + run_impala_shell_cmd(vector, ['--query=' + create_sql % tbl]) + + def validate(stmt, expected_rows_modified_iceberg, expected_rows_modified_kudu, + expected_row_errors_kudu, is_delete=False): + results = run_impala_shell_cmd(vector, ['--query=' + stmt % tbl]) + expected = "" + if is_kudu: + expected = "Modified %d row(s), %d row error(s)" \ + % (expected_rows_modified_kudu, expected_row_errors_kudu) + elif is_delete and expected_rows_modified_iceberg > 0: + expected = "Deleted %d row(s)" % expected_rows_modified_iceberg + else: + expected = "Modified %d row(s)" % expected_rows_modified_iceberg + assert expected in results.stderr, results.stderr + + validate("insert into %s (id) values (7), (7)", 2, 1, 1) + validate("insert into %s (id) values (7)", 1, 0, 1) + if is_kudu: + validate("upsert into %s (id) values (7), (7)", -1, 2, 0) + validate("update %s set age = 1 where id = 7", 3, 1, 0) + validate("delete from %s where id = 7", 3, 1, 0, is_delete=True) + + # UPDATE/DELETE where there are no matching rows; there are no errors in Kudu because + # the scan produced no rows. + validate("update %s set age = 1 where id = 8", 0, 0, 0) + validate("delete from %s where id = 7", 0, 0, 0, is_delete=True) # WITH clauses, only apply to INSERT and UPSERT - self._validate_dml_stmt(vector, - "with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 1, 0) - self._validate_dml_stmt(vector, - "with y as (values(7)) insert into %s.dml_test (id) select * from y" % db, 0, 1) - self._validate_dml_stmt(vector, - "with y as (values(7)) upsert into %s.dml_test (id) select * from y" % db, 1, 0) + validate("with y as (values(7)) insert into %s (id) select * from y", 1, 1, 0) + validate("with y as (values(7)) insert into %s (id) select * from y", 1, 0, 1) + if is_kudu: + validate("with y as (values(7)) upsert into %s (id) select * from y", -1, 1, 0) def test_missing_query_file(self, vector): result = run_impala_shell_cmd(vector, ['-f', 'nonexistent.sql'], expect_success=False)