IMPALA-13501: Clean up uncommitted Iceberg files after validation check failure

Iceberg supports multiple writers with optimistic concurrency.
Each writer can write new files which are then added to the table
after a validation check to ensure that the commit does not conflict
with other modifications made during the execution.

When there was a conflicting change which could not be resolved, it
means that the newly written files cannot be committed to the table,
so they used to become orphan files on the file system. Orphan files
can accumulate over time, taking up a lot of storage space. They do
not belong to the table because they are not referenced by any snapshot
and therefore they can't be removed by expiring snapshots.

This change introduces automatic cleanup of uncommitted files
after an unsuccessful DML operation to prevent creating orphan files.
No cleanup is done if Iceberg throws CommitStateUnknownException
because the update success or failure is unknown in this case.

Testing:
- E2E test: Injected ValidationException with debug option.
- stress test: Added a method to check that no orphan files were
  created after failed conflicting commits.

Change-Id: Ibe59546ebf3c639b75b53dfa1daba37cef50eb21
Reviewed-on: http://gerrit.cloudera.org:8080/22189
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Noemi Pap-Takacs
2024-11-22 11:43:21 +01:00
committed by Impala Public Jenkins
parent 740ee28eb1
commit c518d3c818
5 changed files with 157 additions and 63 deletions

View File

@@ -44,7 +44,7 @@ from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet)
from tests.shell.util import run_impala_shell_cmd
from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE
from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE, FILESYSTEM_PREFIX
from tests.util.get_parquet_metadata import get_parquet_metadata
from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs
@@ -2001,6 +2001,44 @@ class TestIcebergV2Table(IcebergTestSuite):
def test_merge_star(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
def test_cleanup(self, unique_database):
"""Test that all uncommitted files written by Impala are removed from the file
system when a DML commit to an Iceberg table fails, and that the effects of the
failed operation are not visible."""
table_name = "iceberg_cleanup_failure"
fq_tbl_name = unique_database + "." + table_name
# The query options that inject an iceberg validation check failure.
fail_ice_commit_options = {'debug_action':
'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@'
'ValidationException@'
'simulated validation check failure'}
# Create an iceberg table and insert a row.
self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i int)
STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')""".format(fq_tbl_name))
self.execute_query_expect_success(self.client,
"insert into {0} values (1)".format(fq_tbl_name))
# Run a query that would update a row, but pass the query options that
# will cause the iceberg validation check to fail.
err = self.execute_query_expect_failure(self.client,
"update {0} set i=2 where i=1".format(fq_tbl_name),
query_options=fail_ice_commit_options)
# Check that we get the error message.
assert error_msg_expected(
str(err), "ValidationException: simulated validation check failure")
# Check that the table content was not updated.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(fq_tbl_name))
assert len(data.data) == 1
assert data.data[0] == '1'
# Check that the uncommitted data and delete files are removed from the file system
# and only the first data file remains.
table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
FILESYSTEM_PREFIX, unique_database, table_name)
files_result = check_output(["hdfs", "dfs", "-ls", table_location])
assert "Found 1 items" in files_result
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it