# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function from collections import defaultdict, namedtuple import datetime import json import logging import os import random import re from subprocess import check_call, check_output import time from avro.datafile import DataFileReader from avro.io import DatumReader from builtins import range import pytest import pytz # noinspection PyUnresolvedReferences from impala_thrift_gen.parquet.ttypes import ConvertedType from tests.common.file_utils import ( create_iceberg_table_from_directory, create_table_from_parquet, ) from tests.common.iceberg_test_suite import IcebergTestSuite from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfFS from tests.common.test_dimensions import add_exec_option_dimension from tests.common.test_result_verifier import error_msg_startswith from tests.shell.util import run_impala_shell_cmd from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, IS_HDFS, WAREHOUSE from tests.util.get_parquet_metadata import get_parquet_metadata from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, quote from tests.util.parse_util import bytes_to_str LOG = logging.getLogger(__name__) class TestIcebergTable(IcebergTestSuite): """Tests related to Iceberg tables.""" @classmethod def add_test_dimensions(cls): super(TestIcebergTable, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') def test_iceberg_negative(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-negative', vector, use_db=unique_database) def test_create_iceberg_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database) def test_alter_iceberg_tables_v1(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-alter-v1', vector, use_db=unique_database) def test_alter_iceberg_tables_v2(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-alter-v2', vector, use_db=unique_database) def test_alter_iceberg_tables_default(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-alter-default', vector, use_db=unique_database) def test_iceberg_binary_type(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-binary-type', vector, use_db=unique_database) def test_external_iceberg_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-external', vector, unique_database) def test_expire_snapshots(self, unique_database): tbl_name = unique_database + ".expire_snapshots" iceberg_catalogs = IcebergCatalogs(unique_database) for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): # We are setting the TIMEZONE query option in this test, so let's create a local # impala client. with self.create_impala_client() as impalad_client: # Iceberg doesn't create a snapshot entry for the initial empty table impalad_client.execute(""" create table {0} (i int) stored as iceberg TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) ts_0 = datetime.datetime.now() insert_q = "insert into {0} values (1)".format(tbl_name) ts_1 = self.execute_query_ts(impalad_client, insert_q) time.sleep(1) impalad_client.execute(insert_q) time.sleep(1) ts_2 = self.execute_query_ts(impalad_client, insert_q) impalad_client.execute(insert_q) # There should be 4 snapshots initially self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4) # Expire the oldest snapshot and test that the oldest one was expired expire_q = "alter table {0} execute expire_snapshots({1})" impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3) # Expire with a timestamp in which the interval does not touch existing snapshot impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) # Expire all, but retain 1 impalad_client.execute(expire_q.format(tbl_name, cast_ts(datetime.datetime.now()))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1) # Change number of retained snapshots, then expire all impalad_client.execute("""alter table {0} set tblproperties ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name)) impalad_client.execute(insert_q) impalad_client.execute(insert_q) impalad_client.execute(expire_q.format(tbl_name, cast_ts(datetime.datetime.now()))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2) # Check that timezone is interpreted in local timezone controlled by query option # TIMEZONE. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") impalad_client.execute(insert_q) ts_tokyo = self.impala_now(impalad_client) impalad_client.execute("SET TIMEZONE='Europe/Budapest'") impalad_client.execute(insert_q) impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1) impalad_client.execute("DROP TABLE {0}".format(tbl_name)) def test_truncate_iceberg_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database) @SkipIf.not_dfs def test_drop_incomplete_table(self, unique_database): """Test DROP TABLE when the underlying directory is deleted. In that case table loading fails, but we should be still able to drop the table from Impala.""" tbl_name = unique_database + ".synchronized_iceberg_tbl" cat_location = get_fs_path("/test-warehouse/" + unique_database) self.client.execute("""create table {0} (i int) stored as iceberg tblproperties('iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location)) self.filesystem_client.delete_file_dir(cat_location, True) self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name)) @SkipIf.not_dfs(reason="Dfs required as test to directly delete files.") def test_drop_corrupt_table(self, unique_database): """Test that if the underlying iceberg metadata directory is deleted, then a query fails with a reasonable error message, and the table can be dropped successfully.""" table = "corrupt_iceberg_tbl" full_table_name = unique_database + "." + table self.client.execute("""create table {0} (i int) stored as iceberg""". format(full_table_name)) metadata_location = get_fs_path("""/test-warehouse/{0}.db/{1}/metadata""".format( unique_database, table)) assert self.filesystem_client.exists(metadata_location) status = self.filesystem_client.delete_file_dir(metadata_location, True) assert status, "Delete failed with {0}".format(status) assert not self.filesystem_client.exists(metadata_location) # Invalidate so that table loading problems will happen in the catalog. self.client.execute("invalidate metadata {0}".format(full_table_name)) # Query should now fail. err = self.execute_query_expect_failure(self.client, """select * from {0}""". format(full_table_name)) result = str(err) assert "AnalysisException: Failed to load metadata for table" in result assert ("Failed to load metadata for table" in result # local catalog or "Error loading metadata for Iceberg table" in result) # default catalog self.execute_query_expect_success(self.client, """drop table {0}""". format(full_table_name)) def test_insert(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database) def test_partitioned_insert_v1(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-partitioned-insert-v1', vector, use_db=unique_database) def test_partitioned_insert_v2(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-partitioned-insert-v2', vector, use_db=unique_database) def test_partitioned_insert_default(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-partitioned-insert-default', vector, use_db=unique_database) def test_insert_overwrite(self, vector, unique_database): """Run iceberg-overwrite tests, then test that INSERT INTO/OVERWRITE queries running concurrently with a long running INSERT OVERWRITE are handled gracefully. query_a is started before query_b/query_c, but query_b/query_c are supposed to finish before query_a. query_a should fail because the overwrite should not erase query_b/query_c's result.""" # Run iceberg-overwrite.test self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database) # Create test dataset for concurrency tests and warm-up the test table tbl_name = unique_database + ".overwrite_tbl" self.client.execute("""create table {0} (i int) partitioned by spec (truncate(3, i)) stored as iceberg""".format(tbl_name)) self.client.execute("insert into {0} values (1), (2), (3);".format(tbl_name)) # Test queries: 'a' is the long running query while 'b' and 'c' are the short ones query_a = """insert overwrite {0} select sleep(5000);""".format(tbl_name) query_b = """insert overwrite {0} select * from {0};""".format(tbl_name) query_c = """insert into {0} select * from {0};""".format(tbl_name) # Test concurrent INSERT OVERWRITEs, the exception closes the query handle. handle = self.client.execute_async(query_a) time.sleep(1) self.client.execute(query_b) try: self.client.wait_for_finished_timeout(handle, 30) assert False except IMPALA_CONNECTION_EXCEPTION as e: assert "Found conflicting files" in str(e) # Test INSERT INTO during INSERT OVERWRITE, the exception closes the query handle. handle = self.client.execute_async(query_a) time.sleep(1) self.client.execute(query_c) try: self.client.wait_for_finished_timeout(handle, 30) assert False except IMPALA_CONNECTION_EXCEPTION as e: assert "Found conflicting files" in str(e) def test_ctas(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database) def test_partition_transform_insert(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector, use_db=unique_database) def test_iceberg_orc_field_id(self, vector): self.run_test_case('QueryTest/iceberg-orc-field-id', vector) def test_catalogs(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-catalogs', vector, use_db=unique_database) def test_missing_field_ids(self, vector): self.run_test_case('QueryTest/iceberg-missing-field-ids', vector) def test_migrated_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-migrated-tables', vector, unique_database) def test_migrated_table_field_id_resolution(self, vector, unique_database): create_iceberg_table_from_directory(self.client, unique_database, "iceberg_migrated_alter_test", "parquet") create_iceberg_table_from_directory(self.client, unique_database, "iceberg_migrated_complex_test", "parquet") create_iceberg_table_from_directory(self.client, unique_database, "iceberg_migrated_alter_test_orc", "orc") create_iceberg_table_from_directory(self.client, unique_database, "iceberg_migrated_complex_test_orc", "orc") self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution', vector, unique_database) if IS_HDFS: self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-orc', vector, unique_database) def test_column_case_sensitivity(self, vector, unique_database): create_iceberg_table_from_directory(self.client, unique_database, "iceberg_column_case_sensitivity_issue", "parquet") self.run_test_case('QueryTest/iceberg-column-case-sensitivity-issue', vector, unique_database) @SkipIfFS.hive def test_migrated_table_field_id_resolution_complex(self, vector, unique_database): def get_table_loc(tbl_name): return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, tbl_name) def create_table(tbl_name, file_format, partition_cols): self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} ( id INT, name STRING, teststeps array>) PARTITIONED BY ({}) STORED AS {} """.format(unique_database, tbl_name, partition_cols, file_format)) def add_file_to_table_partition(tbl_name, part_dir, local_filename): tbl_loc = get_table_loc(tbl_name) part_dir = os.path.join(tbl_loc, part_dir) self.filesystem_client.make_dir(part_dir) data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata", "migrated_iceberg", local_filename) self.filesystem_client.copy_from_local(data_file_path, part_dir) def finalize_table(tbl_name): self.execute_query("ALTER TABLE {}.{} RECOVER PARTITIONS".format( unique_database, tbl_name)) self.execute_query("ALTER TABLE {}.{} CONVERT TO ICEBERG".format( unique_database, tbl_name)) def prepare_test_table(tbl_name, file_format, partition_cols, part_dir, datafile): create_table(tbl_name, file_format, partition_cols) add_file_to_table_partition(tbl_name, part_dir, datafile) finalize_table(tbl_name) prepare_test_table('all_part_cols_stored_parquet', "PARQUET", "result_date STRING", "result_date=2024-08-26", "complextypes_and_partition_columns_in_data_files.parquet") prepare_test_table('not_all_part_cols_stored_parquet', "PARQUET", "result_date STRING, p INT", "result_date=2024-08-26/p=3", "complextypes_and_partition_columns_in_data_files.parquet") prepare_test_table('all_part_cols_stored_orc', "ORC", "result_date STRING", "result_date=2024-08-26", "complextypes_and_partition_columns_in_data_files.orc") prepare_test_table('not_all_part_cols_stored_orc', "ORC", "result_date STRING, p INT", "result_date=2024-08-26/p=3", "complextypes_and_partition_columns_in_data_files.orc") self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-complex', vector, unique_database) def test_describe_history(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database) # Create a table with multiple snapshots and verify the table history. tbl_name = unique_database + ".iceberg_multi_snapshots" self.client.execute("""create table {0} (i int) stored as iceberg tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name)) self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2) first_snapshot = snapshots[0] second_snapshot = snapshots[1] # Check that first snapshot is older than the second snapshot. assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time()) # Check that second snapshot's parent ID is the snapshot ID of the first snapshot. assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id()) # The first snapshot has no parent snapshot ID. assert(first_snapshot.get_parent_id() is None) # Check "is_current_ancestor" column. assert(first_snapshot.is_current_ancestor()) assert(second_snapshot.is_current_ancestor()) def test_execute_rollback_negative(self, vector): """Negative test for EXECUTE ROLLBACK.""" self.run_test_case('QueryTest/iceberg-rollback-negative', vector) def test_execute_rollback(self, unique_database): """Test for EXECUTE ROLLBACK.""" iceberg_catalogs = IcebergCatalogs(unique_database) for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): # Create a table with multiple snapshots. tbl_name = unique_database + ".iceberg_execute_rollback" # We are setting the TIMEZONE query option in this test, so let's create a local # impala client. with self.create_impala_client() as impalad_client: orig_timezone = 'America/Los_Angeles' impalad_client.execute("SET TIMEZONE='" + orig_timezone + "'") impalad_client.execute(""" create table {0} (i int) stored as iceberg TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) initial_snapshots = 3 for i in range(initial_snapshots): impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i)) snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=initial_snapshots) output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id()) LOG.info("success output={0}".format(output)) # We rolled back, but that creates a new snapshot, so now there are 4. snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4) # The new snapshot has the same id (and parent id) as the snapshot we rolled back # to, but it has a different creation time. assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id() assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id() assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time() # The "orphaned" snapshot is now not a current ancestor. assert not snapshots[2].is_current_ancestor() # We cannot roll back to a snapshot that is not a current ancestor. output = self.rollback_to_id_expect_failure(tbl_name, snapshots[2].get_snapshot_id(), expected_text="Cannot roll back to snapshot, not an ancestor of the current " "state") # Create another snapshot. before_insert = datetime.datetime.now(pytz.timezone(orig_timezone)) impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4)) snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) # Rollback to before the last insert. self.rollback_to_ts(impalad_client, tbl_name, before_insert) # This creates another snapshot. snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6) # The snapshot id is the same, the dates differ assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id() assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time() assert not snapshots[4].is_current_ancestor() # Show that the EXECUTE ROLLBACK is respecting the current timezone. # To do this we try to roll back to a time for which there is no # snapshot, this will fail with an error message that includes the specified # time. We parse out that time. By doing this in two timezones we can see # that the parameter being used was affected by the current timezone. one_hour_ago = before_insert - datetime.timedelta(hours=1) # We use Timezones from Japan and Iceland to avoid any DST complexities. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") japan_ts = self.get_snapshot_ts_from_failed_rollback( impalad_client, tbl_name, one_hour_ago) impalad_client.execute("SET TIMEZONE='Iceland'") iceland_ts = self.get_snapshot_ts_from_failed_rollback( impalad_client, tbl_name, one_hour_ago) diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60) assert diff_hours == 9 impalad_client.execute("DROP TABLE {0}".format(tbl_name)) def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts): """Run an EXECUTE ROLLBACK which is expected to fail. Parse the error message to extract the timestamp for which there was no snapshot, and convert the string to an integer""" try: self.rollback_to_ts(client, tbl_name, ts) assert False, "Query should have failed" except IMPALA_CONNECTION_EXCEPTION as e: result = re.search(r".*no valid snapshot older than: (\d+)", str(e)) time_str = result.group(1) snapshot_ts = int(time_str) assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result) return snapshot_ts def rollback_to_ts(self, client, tbl_name, ts): """Rollback a table to a snapshot timestamp.""" query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat()) return self.execute_query_expect_success(client, query) def rollback_to_id(self, tbl_name, id): """Rollback a table to a snapshot id.""" query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) return self.execute_query_expect_success(self.client, query) def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None): """Attempt to roll back a table to a snapshot id, expecting a failure.""" query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) output = self.execute_query_expect_failure(self.client, query) if expected_text: assert expected_text in str(output) return output def test_execute_remove_orphan_files(self, unique_database): tbl_name = 'tbl_with_orphan_files' db_tbl = unique_database + "." + tbl_name with self.create_impala_client() as impalad_client: impalad_client.execute("create table {0} (i int) stored as iceberg" .format(db_tbl)) insert_q = "insert into {0} values ({1})" self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3)) result = impalad_client.execute('select i from {} order by i'.format(db_tbl)) assert result.data == ['1', '2', '3'] # Add some junk files to data and metadata dir. TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name) DATA_PATH = os.path.join(TABLE_PATH, "data") METADATA_PATH = os.path.join(TABLE_PATH, "metadata") SRC_DIR = os.path.join( os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/iceberg_mixed_file_format_test/{0}/{1}") # Copy first set of junk files. file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \ "fff150b6136a-job_16619542960420_0002-1-00001.parquet" file_avro1 = "055baf62-de6d-4583-bf21-f187f9482343-m0.avro" self.filesystem_client.copy_from_local( SRC_DIR.format('data', file_parq1), DATA_PATH) self.filesystem_client.copy_from_local( SRC_DIR.format('metadata', file_avro1), METADATA_PATH) assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1)) assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro1)) # Keep current time. result = impalad_client.execute('select cast(now() as string)') cp1_time = result.data[0] time.sleep(1) # Copy second set of junk files. file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \ "27ff880faff0-job_16619542960420_0004-1-00001.parquet" file_avro2 = "871d1473-8566-46c0-a530-a2256b3f396f-m0.avro" self.filesystem_client.copy_from_local( SRC_DIR.format('data', file_parq2), DATA_PATH) self.filesystem_client.copy_from_local( SRC_DIR.format('metadata', file_avro2), METADATA_PATH) assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2)) # Execute REMOVE_ORPHAN_FILES at specific timestamp. result = impalad_client.execute( "ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES('{1}')".format(db_tbl, cp1_time)) assert result.data[0] == 'Remove orphan files executed.' assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1)) assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq1)) assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2)) # Execute REMOVE_ORPHAN_FILES at now(). result = impalad_client.execute( "ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES(now())".format(db_tbl)) assert result.data[0] == 'Remove orphan files executed.' assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq2)) # Assert table still queryable. result = impalad_client.execute('select i from {} order by i'.format(db_tbl)) assert result.data == ['1', '2', '3'] def test_describe_history_params(self, unique_database): tbl_name = unique_database + ".describe_history" # We are setting the TIMEZONE query option in this test, so let's create a local # impala client. with self.create_impala_client() as impalad_client: # Iceberg doesn't create a snapshot entry for the initial empty table impalad_client.execute("create table {0} (i int) stored as iceberg" .format(tbl_name)) insert_q = "insert into {0} values (1)".format(tbl_name) ts_1 = self.execute_query_ts(impalad_client, insert_q) time.sleep(1) ts_2 = self.execute_query_ts(impalad_client, insert_q) time.sleep(1) ts_3 = self.execute_query_ts(impalad_client, insert_q) # Describe history without predicate data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) assert len(data.data) == 3 # Describe history with FROM predicate self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1 - datetime.timedelta(hours=1), 3) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 2) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0) # Describe history with BETWEEN AND predicate self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1) self.expect_results_between(impalad_client, tbl_name, ts_1 - datetime.timedelta(hours=1), ts_2, 2) self.expect_results_between(impalad_client, tbl_name, ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3) # Check that timezone is interpreted in local timezone controlled by query option # TIMEZONE. Persist the local times first and create a new snapshot. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") now_tokyo = self.impala_now(impalad_client) impalad_client.execute("SET TIMEZONE='Europe/Budapest'") now_budapest = self.impala_now(impalad_client) self.execute_query_ts(impalad_client, "insert into {0} values (4)".format(tbl_name)) self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 1) # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") self.expect_num_snapshots_from(impalad_client, tbl_name, now_tokyo, 1) # Interpreting Budapest time in Tokyo time points to the past. self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4) def test_time_travel(self, unique_database): tbl_name = unique_database + ".time_travel" def expect_results(query, expected_results, expected_cols): data = impalad_client.execute(query) assert len(data.data) == len(expected_results) for r in expected_results: assert r in data.data expected_col_labels = expected_cols['labels'] expected_col_types = expected_cols['types'] assert data.column_labels == expected_col_labels assert data.column_types == expected_col_types def expect_for_count_star(query, expected): data = impalad_client.execute(query) assert len(data.data) == 1 assert expected in data.data assert "NumRowGroups" not in data.runtime_profile assert "NumFileMetadataRead" not in data.runtime_profile def expect_results_t(ts, expected_results, expected_cols): expect_results( "select * from {0} for system_time as of {1}".format(tbl_name, ts), expected_results, expected_cols) def expect_for_count_star_t(ts, expected): expect_for_count_star( "select count(*) from {0} for system_time as of {1}".format(tbl_name, ts), expected) def expect_snapshot_id_in_plan_t(ts, snapshot_id): data = impalad_client.execute( "explain select * from {0} for system_time as of {1}".format( tbl_name, ts)) assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data def expect_results_v(snapshot_id, expected_results, expected_cols): expect_results( "select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id), expected_results, expected_cols) def expect_for_count_star_v(snapshot_id, expected): expect_for_count_star( "select count(*) from {0} for system_version as of {1}".format( tbl_name, snapshot_id), expected) def expect_snapshot_id_in_plan_v(snapshot_id): data = impalad_client.execute( "explain select * from {0} for system_version as of {1}".format( tbl_name, snapshot_id)) assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data def impala_now(): now_data = impalad_client.execute("select now()") return now_data.data[0] # We are setting the TIMEZONE query option in this test, so let's create a local # impala client. with self.create_impala_client() as impalad_client: # Iceberg doesn't create a snapshot entry for the initial empty table impalad_client.execute("create table {0} (i int) stored as iceberg" .format(tbl_name)) ts_1 = self.execute_query_ts(impalad_client, "insert into {0} values (1)" .format(tbl_name)) ts_2 = self.execute_query_ts(impalad_client, "insert into {0} values (2)" .format(tbl_name)) ts_3 = self.execute_query_ts(impalad_client, "truncate table {0}".format(tbl_name)) time.sleep(5) ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)" .format(tbl_name)) ts_no_ss = self.execute_query_ts(impalad_client, "alter table {0} add column {1} bigint" .format(tbl_name, "j")) ts_5 = self.execute_query_ts(impalad_client, "insert into {0} (i,j) values (3, 103)" .format(tbl_name)) # Descriptions of the different schemas we expect to see as Time Travel queries # use the schema from the specified time or snapshot. # # When the schema is just the 'J' column. j_cols = { 'labels': ['J'], 'types': ['BIGINT'] } # When the schema is just the 'I' column. i_cols = { 'labels': ['I'], 'types': ['INT'] } # When the schema is the 'I' and 'J' columns. ij_cols = { 'labels': ['I', 'J'], 'types': ['INT', 'BIGINT'] } # Query table as of timestamps. expect_results_t("now()", ['100\tNULL', '3\t103'], ij_cols) expect_results_t(quote(ts_1), ['1'], i_cols) expect_results_t(quote(ts_2), ['1', '2'], i_cols) expect_results_t(quote(ts_3), [], i_cols) expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [], i_cols) expect_results_t(quote(ts_4), ['100'], i_cols) expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [], i_cols) # There is no new snapshot created by the schema change between ts_4 and ts_no_ss. # So at ts_no_ss we see the schema as of ts_4 expect_results_t(quote(ts_no_ss), ['100'], i_cols) expect_results_t(quote(ts_5), ['100\tNULL', '3\t103'], ij_cols) # Future queries return the current snapshot. expect_results_t(cast_ts(ts_5) + " + interval 1 hours", ['100\tNULL', '3\t103'], ij_cols) # Query table as of snapshot IDs. snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols) expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols) expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols) expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols) expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols) expect_snapshot_id_in_plan_v(snapshots[0].get_snapshot_id()) expect_snapshot_id_in_plan_v(snapshots[1].get_snapshot_id()) expect_snapshot_id_in_plan_v(snapshots[2].get_snapshot_id()) expect_snapshot_id_in_plan_v(snapshots[3].get_snapshot_id()) expect_snapshot_id_in_plan_v(snapshots[4].get_snapshot_id()) expect_snapshot_id_in_plan_t(quote(ts_1), snapshots[0].get_snapshot_id()) expect_snapshot_id_in_plan_t(quote(ts_2), snapshots[1].get_snapshot_id()) expect_snapshot_id_in_plan_t(quote(ts_3), snapshots[2].get_snapshot_id()) expect_snapshot_id_in_plan_t(quote(ts_4), snapshots[3].get_snapshot_id()) expect_snapshot_id_in_plan_t(quote(ts_5), snapshots[4].get_snapshot_id()) # Test of plain count star optimization # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile expect_for_count_star_t("now()", '2') expect_for_count_star_t(quote(ts_1), '1') expect_for_count_star_t(quote(ts_2), '2') expect_for_count_star_t(quote(ts_3), '0') expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0') expect_for_count_star_t(quote(ts_4), '1') expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0') expect_for_count_star_t(cast_ts(ts_5), '2') expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2') expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1') expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2') expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0') expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1') expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2') # SELECT diff expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( tbl=tbl_name, ts_new=ts_2, ts_old=ts_1), ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), v_old=snapshots[0].get_snapshot_id()), ['2'], i_cols) # Mix SYSTEM_TIME and SYSTEM_VERSION expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1), ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()), ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT *, NULL FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( tbl=tbl_name, ts_new=ts_5, ts_old=ts_4), ['3\t103'], ij_cols) # Query old snapshot try: impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_TIME AS OF {1}".format( tbl_name, "now() - interval 2 years")) assert False # Exception must be thrown except Exception as e: assert "Cannot find a snapshot older than" in str(e) # Query invalid snapshot try: impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_VERSION AS OF 42".format( tbl_name)) assert False # Exception must be thrown except Exception as e: assert "Cannot find snapshot with ID 42" in str(e) # Go back to one column impalad_client.execute("alter table {0} drop column i".format(tbl_name)) # Test that deleted column is not selectable. try: impalad_client.execute("SELECT i FROM {0}".format(tbl_name)) assert False # Exception must be thrown except Exception as e: assert "Could not resolve column/field reference: 'i'" in str(e) # Back at ts_2 the deleted 'I' column is there expect_results("SELECT * FROM {0} FOR SYSTEM_TIME AS OF '{1}'". format(tbl_name, ts_2), ['1', '2'], i_cols) expect_results("SELECT i FROM {0} FOR SYSTEM_TIME AS OF '{1}'". format(tbl_name, ts_2), ['1', '2'], i_cols) # Check that timezone is interpreted in local timezone controlled by query option # TIMEZONE impalad_client.execute("truncate table {0}".format(tbl_name)) impalad_client.execute("insert into {0} values (1111)".format(tbl_name)) impalad_client.execute("SET TIMEZONE='Europe/Budapest'") now_budapest = impala_now() expect_results_t(quote(now_budapest), ['1111'], j_cols) # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") now_tokyo = impala_now() expect_results_t(quote(now_tokyo), ['1111'], j_cols) try: # Interpreting Budapest time in Tokyo time points to the past when the table # didn't exist. expect_results_t(quote(now_budapest), [], j_cols) assert False except Exception as e: assert "Cannot find a snapshot older than" in str(e) def test_time_travel_queries(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-time-travel', vector, use_db=unique_database) @SkipIf.not_dfs def test_strings_utf8(self, unique_database): # Create table table_name = "ice_str_utf8" qualified_table_name = "%s.%s" % (unique_database, table_name) query = 'create table %s (a string) stored as iceberg' % qualified_table_name self.client.execute(query) # Inserted string data should have UTF8 annotation regardless of query options. query = 'insert into %s values ("impala")' % qualified_table_name self.execute_query(query, {'parquet_annotate_strings_utf8': False}) # Copy the created file to the local filesystem and parse metadata local_file = '/tmp/iceberg_utf8_test_%s.parq' % random.randint(0, 10000) LOG.info("test_strings_utf8 local file name: " + local_file) hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/data/*.parq' % (unique_database, table_name)) check_call(['hadoop', 'fs', '-copyToLocal', hdfs_file, local_file]) metadata = get_parquet_metadata(local_file) # Extract SchemaElements corresponding to the table column a_schema_element = metadata.schema[1] assert a_schema_element.name == 'a' # Check that the schema uses the UTF8 annotation assert a_schema_element.converted_type == ConvertedType.UTF8 os.remove(local_file) # Get hdfs path to manifest list that belongs to the snapshot identified by # 'snapshot_counter'. def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name, snapshot_counter): local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000)) hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json' % (db_name, table_name, snapshot_counter)) check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path]) manifest_list_hdfs_path = None try: with open(local_path, 'r') as fp: metadata = json.load(fp) current_snapshot_id = metadata['current-snapshot-id'] for snapshot in metadata['snapshots']: if snapshot['snapshot-id'] == current_snapshot_id: manifest_list_hdfs_path = snapshot['manifest-list'] break finally: os.remove(local_path) return manifest_list_hdfs_path # Get list of hdfs paths to manifest files from the manifest list avro file. def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path): local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000)) check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path]) manifest_hdfs_path_list = [] reader = None try: with open(local_path, 'rb') as fp: reader = DataFileReader(fp, DatumReader()) for manifest in reader: manifest_hdfs_path_list.append(manifest['manifest_path']) finally: if reader: reader.close() os.remove(local_path) return manifest_hdfs_path_list # Get 'data_file' structs from avro manifest files. def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list): datafiles = [] for hdfs_path in manifest_hdfs_path_list: local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000)) check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path]) reader = None try: with open(local_path, 'rb') as fp: reader = DataFileReader(fp, DatumReader()) datafiles.extend([rec['data_file'] for rec in reader]) finally: if reader: reader.close() os.remove(local_path) return datafiles def get_latest_metadata_path(self, database_name, table_name): describe = 'describe extended %s.%s ' % (database_name, table_name) output = self.client.execute(describe) metadata_location = [s for s in output.data if s.startswith('\tmetadata_location')] assert len(metadata_location) == 1 metadata_location_split = metadata_location[0].split('\t') assert len(metadata_location_split) == 3 metadata_path = metadata_location_split[2] return metadata_path # Get the current partition spec as JSON from the latest metadata file def get_current_partition_spec(self, database_name, table_name): hdfs_path = self.get_latest_metadata_path(database_name, table_name) output = check_output(['hadoop', 'fs', '-cat', hdfs_path]) current_partition_spec = None metadata = json.loads(output) current_spec_id = metadata['default-spec-id'] for spec in metadata['partition-specs']: if spec['spec-id'] == current_spec_id: current_partition_spec = spec break return current_partition_spec @SkipIf.not_dfs def test_partition_spec_update_v1(self, unique_database): # Create table table_name = "ice_part" qualified_table_name = "%s.%s" % (unique_database, table_name) create_table = """create table {} (s string, i int) partitioned by spec(truncate(5, s), identity(i)) stored as iceberg tblproperties ('format-version'='1')""".format(qualified_table_name) self.client.execute(create_table) partition_spec = self.get_current_partition_spec(unique_database, table_name) assert len(partition_spec['fields']) == 2 truncate_s = partition_spec['fields'][0] identity_i = partition_spec['fields'][1] # At table creation, partition names does not contain the parameter value. assert truncate_s['name'] == 's_trunc' assert identity_i['name'] == 'i' assert truncate_s['field-id'] == 1000 assert identity_i['field-id'] == 1001 # Partition evolution partition_evolution = 'alter table %s set partition ' \ 'spec(identity(i), truncate(6,s))' % qualified_table_name self.client.execute(partition_evolution) # V1 partition evolution keeps the old, modified fields, but changes their # transform to VOID. evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) assert len(evolved_partition_spec['fields']) == 3 old_truncate_s = evolved_partition_spec['fields'][0] identity_i = evolved_partition_spec['fields'][1] truncate_s = evolved_partition_spec['fields'][2] assert old_truncate_s['name'] == 's_trunc' assert identity_i['name'] == 'i' # Modified field name contains the parameter value. assert truncate_s['name'] == 's_trunc_6' assert old_truncate_s['field-id'] == 1000 assert identity_i['field-id'] == 1001 # field-id increases for the modified field. assert truncate_s['field-id'] == 1002 assert old_truncate_s['transform'] == 'void' @SkipIf.not_dfs def test_partition_spec_update_v2(self, unique_database): # Create table table_name = "ice_part" qualified_table_name = "%s.%s" % (unique_database, table_name) create_table = 'create table %s ' \ '(s string, i int) partitioned by spec(truncate(5, s), identity(i)) ' \ 'stored as iceberg tblproperties ("format-version" = "2")' \ % qualified_table_name self.client.execute(create_table) partition_spec = self.get_current_partition_spec(unique_database, table_name) assert len(partition_spec['fields']) == 2 truncate_s = partition_spec['fields'][0] identity_i = partition_spec['fields'][1] # At table creation, partition names does not contain the parameter value. assert truncate_s['name'] == 's_trunc' assert identity_i['name'] == 'i' assert truncate_s['field-id'] == 1000 assert identity_i['field-id'] == 1001 # Partition evolution for s partition_evolution = 'alter table %s set partition ' \ 'spec(identity(i), truncate(6,s))' % qualified_table_name self.client.execute(partition_evolution) # V2 partition evolution updates the previous partitioning for # the modified field. evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) assert len(evolved_partition_spec['fields']) == 2 identity_i = evolved_partition_spec['fields'][0] truncate_s = evolved_partition_spec['fields'][1] assert identity_i['name'] == 'i' # Modified field name contains the parameter value. assert truncate_s['name'] == 's_trunc_6' assert identity_i['field-id'] == 1001 # field-id increased for the modified field. assert truncate_s['field-id'] == 1002 # Partition evolution for i and s partition_evolution = 'alter table %s set partition ' \ 'spec(bucket(4, i), truncate(3,s))' \ % qualified_table_name self.client.execute(partition_evolution) evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) assert len(evolved_partition_spec['fields']) == 2 bucket_i = evolved_partition_spec['fields'][0] truncate_s = evolved_partition_spec['fields'][1] # The field naming follows the transform changes. assert bucket_i['name'] == 'i_bucket_4' assert truncate_s['name'] == 's_trunc_3' # field-id's are increasing with each modification assert bucket_i['field-id'] == 1003 assert truncate_s['field-id'] == 1004 @SkipIf.not_dfs def test_writing_metrics_to_metadata_v1(self, unique_database): self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v1', '1') @SkipIf.not_dfs def test_writing_metrics_to_metadata_v2(self, unique_database): self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v2', '2') def _test_writing_metrics_to_metadata_impl(self, unique_database, table_name, version): # Create table qualified_table_name = "%s.%s" % (unique_database, table_name) query = """create table {} (s string, i int, b boolean, bi bigint, ts timestamp, dt date, dc decimal(10, 3)) stored as iceberg tblproperties ('format-version'='{}')""".format(qualified_table_name, version) self.client.execute(query) # Insert data # 1st data file: query = 'insert into %s values ' \ '("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \ '("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \ '("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \ '(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \ % qualified_table_name self.execute_query(query) # 2nd data file: query = 'insert into %s values ' \ '(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \ '(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \ % qualified_table_name self.execute_query(query) # Get hdfs path to manifest list file manifest_list_hdfs_path = self.get_manifest_list_hdfs_path( '/tmp/iceberg_metrics_test', unique_database, table_name, '00002') # Get the list of hdfs paths to manifest files assert manifest_list_hdfs_path is not None manifest_hdfs_path_list = self.get_manifest_hdfs_path_list( '/tmp/iceberg_metrics_test', manifest_list_hdfs_path) # Get 'data_file' records from manifest files. assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0 datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test', manifest_hdfs_path_list) # Check column stats in datafiles assert datafiles is not None and len(datafiles) == 2 # The 1st datafile contains the 2 NULL rows assert datafiles[0]['record_count'] == 2 assert datafiles[0]['column_sizes'] == \ [{'key': 1, 'value': 39}, {'key': 2, 'value': 39}, {'key': 3, 'value': 25}, {'key': 4, 'value': 39}, {'key': 5, 'value': 39}, {'key': 6, 'value': 39}, {'key': 7, 'value': 39}] assert datafiles[0]['value_counts'] == \ [{'key': 1, 'value': 2}, {'key': 2, 'value': 2}, {'key': 3, 'value': 2}, {'key': 4, 'value': 2}, {'key': 5, 'value': 2}, {'key': 6, 'value': 2}, {'key': 7, 'value': 2}] assert datafiles[0]['null_value_counts'] == \ [{'key': 1, 'value': 2}, {'key': 2, 'value': 2}, {'key': 3, 'value': 2}, {'key': 4, 'value': 2}, {'key': 5, 'value': 2}, {'key': 6, 'value': 2}, {'key': 7, 'value': 2}] # Upper/lower bounds should be empty lists assert datafiles[0]['lower_bounds'] == [] assert datafiles[0]['upper_bounds'] == [] # 2nd datafile assert datafiles[1]['record_count'] == 4 assert datafiles[1]['column_sizes'] == \ [{'key': 1, 'value': 66}, {'key': 2, 'value': 56}, {'key': 3, 'value': 26}, {'key': 4, 'value': 59}, {'key': 5, 'value': 68}, {'key': 6, 'value': 56}, {'key': 7, 'value': 53}] assert datafiles[1]['value_counts'] == \ [{'key': 1, 'value': 4}, {'key': 2, 'value': 4}, {'key': 3, 'value': 4}, {'key': 4, 'value': 4}, {'key': 5, 'value': 4}, {'key': 6, 'value': 4}, {'key': 7, 'value': 4}] assert datafiles[1]['null_value_counts'] == \ [{'key': 1, 'value': 1}, {'key': 2, 'value': 1}, {'key': 3, 'value': 2}, {'key': 4, 'value': 2}, {'key': 5, 'value': 1}, {'key': 6, 'value': 1}, {'key': 7, 'value': 2}] assert datafiles[1]['lower_bounds'] == \ [{'key': 1, 'value': b'abc'}, # INT is serialized as 4-byte little endian {'key': 2, 'value': b'\x00\x00\x00\x00'}, # BOOLEAN is serialized as 0x00 for FALSE {'key': 3, 'value': b'\x00'}, # BIGINT is serialized as 8-byte little endian {'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'}, # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since # 1970-01-01 00:00:00) {'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'}, # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) {'key': 6, 'value': b'\x93\xfe\xff\xff'}, # Unlike other numerical values, DECIMAL is serialized as big-endian. {'key': 7, 'value': b'\xd8\xf0'}] assert datafiles[1]['upper_bounds'] == \ [{'key': 1, 'value': b'ghij'}, # INT is serialized as 4-byte little endian {'key': 2, 'value': b'\x03\x00\x00\x00'}, # BOOLEAN is serialized as 0x01 for TRUE {'key': 3, 'value': b'\x01'}, # BIGINT is serialized as 8-byte little endian {'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'}, # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since # 1970-01-01 00:00:00) {'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'}, # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) {'key': 6, 'value': b'\x6d\x01\x00\x00'}, # Unlike other numerical values, DECIMAL is serialized as big-endian. {'key': 7, 'value': b'\x00\xdc\x14'}] def test_using_upper_lower_bound_metrics(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector, use_db=unique_database) def test_writing_many_files(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-write-many-files', vector, use_db=unique_database) @pytest.mark.execute_serially def test_writing_many_files_stress(self, vector, unique_database): if self.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector, use_db=unique_database) @pytest.mark.execute_serially def test_table_load_time_for_many_files(self, unique_database): if self.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') tbl_name = unique_database + ".iceberg_many_files" self.execute_query("""CREATE TABLE {} PARTITIONED BY SPEC (bucket(2039, l_orderkey)) STORED AS ICEBERG AS SELECT * FROM tpch_parquet.lineitem""".format(tbl_name)) self.execute_query("invalidate metadata") start_time = time.time() self.execute_query("describe formatted {}".format(tbl_name)) elapsed_time = time.time() - start_time if IS_HDFS: time_limit = 10 else: time_limit = 20 assert elapsed_time < time_limit def test_consistent_scheduling(self, unique_database): """IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for Iceberg tables.""" def collect_split_stats(profile): splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s] splits.sort() return splits with self.create_impala_client() as impalad_client: impalad_client.execute("use " + unique_database) impalad_client.execute("""create table line_ice stored as iceberg as select * from tpch_parquet.lineitem""") first_result = impalad_client.execute("""select count(*) from line_ice""") ref_profile = first_result.runtime_profile ref_split_stats = collect_split_stats(ref_profile) for i in range(0, 10): # Subsequent executions of the same query should schedule scan ranges similarly. result = impalad_client.execute("""select count(*) from line_ice""") profile = result.runtime_profile split_stats = collect_split_stats(profile) assert ref_split_stats == split_stats def test_scheduling_partitioned_tables(self, unique_database): """IMPALA-12765: Balance consecutive partitions better for Iceberg tables""" # We are setting the replica_preference query option in this test, so let's create a # local impala client. inventory_tbl = "inventory_ice" item_tbl = "item_ice" date_dim_tbl = "date_dim_ice" with self.create_impala_client() as impalad_client: impalad_client.execute("use " + unique_database) impalad_client.execute("set replica_preference=remote") impalad_client.execute(""" CREATE TABLE {} PARTITIONED BY SPEC (inv_date_sk) STORED BY ICEBERG AS SELECT * from tpcds_partitioned_parquet_snap.inventory; """.format(inventory_tbl)) impalad_client.execute(""" CREATE TABLE {} STORED BY ICEBERG AS SELECT * from tpcds_partitioned_parquet_snap.item; """.format(item_tbl)) impalad_client.execute(""" CREATE TABLE {} STORED BY ICEBERG AS SELECT * from tpcds_partitioned_parquet_snap.date_dim; """.format(date_dim_tbl)) q22_result = impalad_client.execute(""" select i_product_name, i_brand, i_class, i_category, avg(inv_quantity_on_hand) qoh from inventory_ice, date_dim_ice, item_ice where inv_date_sk=d_date_sk and inv_item_sk=i_item_sk and d_month_seq between 1199 and 1199 + 11 group by rollup(i_product_name, i_brand, i_class, i_category) order by qoh, i_product_name, i_brand, i_class, i_category limit 100 """) profile = q22_result.runtime_profile # "Files rejected:" contains the number of files being rejected by runtime # filters. With IMPALA-12765 we should see similar numbers for each executor. files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile) avg_files_rejected = int(files_rejected_array[0]) THRESHOLD = 3 for files_rejected_str in files_rejected_array: files_rejected = int(files_rejected_str) if files_rejected != 0: assert abs(avg_files_rejected - files_rejected) < THRESHOLD def test_in_predicate_push_down(self, vector, unique_database): self.execute_query("SET RUNTIME_FILTER_MODE=OFF") self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector, use_db=unique_database) def test_is_null_predicate_push_down(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-is-null-predicate-push-down', vector, use_db=unique_database) def test_compound_predicate_push_down(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-compound-predicate-push-down', vector, use_db=unique_database) def test_plain_count_star_optimization(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-plain-count-star-optimization', vector, use_db=unique_database) def test_create_table_like_table(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-create-table-like-table', vector, use_db=unique_database) def test_table_owner(self, vector, unique_database): self.run_table_owner_test(vector, unique_database, "some_random_user") self.run_table_owner_test(vector, unique_database, "another_random_user") def run_table_owner_test(self, vector, db_name, user_name): # Create Iceberg table with a given user running the query. tbl_name = "iceberg_table_owner" sql_stmt = 'CREATE TABLE {0}.{1} (i int) STORED AS ICEBERG'.format( db_name, tbl_name) args = ['-u', user_name, '-q', sql_stmt] run_impala_shell_cmd(vector, args) # Run DESCRIBE FORMATTED to get the owner of the table. args = ['-q', 'DESCRIBE FORMATTED {0}.{1}'.format(db_name, tbl_name)] results = run_impala_shell_cmd(vector, args) result_rows = results.stdout.strip().split('\n') # Find the output row with the owner. owner_row = "" for row in result_rows: if "Owner:" in row: owner_row = row assert owner_row != "", "DESCRIBE output doesn't contain owner" + results.stdout # Verify that the user running the query is the owner of the table. assert user_name in owner_row, "Unexpected owner of Iceberg table. " + \ "Expected user name: {0}. Actual output row: {1}".format(user_name, owner_row) args = ['-q', 'DROP TABLE {0}.{1}'.format(db_name, tbl_name)] results = run_impala_shell_cmd(vector, args) def test_mixed_file_format(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-mixed-file-format', vector, unique_database) def test_load(self, vector, unique_database): """Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the target directory, copies existing test data to HDFS. The second part runs the test cases then cleans up the test directory. """ # Test 1-6 init: target orc/parquet file and directory SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}") DST_DIR = "/tmp/" + unique_database + "/parquet/" self.filesystem_client.make_dir(DST_DIR, permission=777) file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \ "fff150b6136a-job_16619542960420_0002-1-00001.parquet" file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \ "27ff880faff0-job_16619542960420_0004-1-00001.parquet" self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR) self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR) DST_DIR = "/tmp/" + unique_database + "/orc/" self.filesystem_client.make_dir(DST_DIR, permission=777) file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \ "ee500a19c1d1-job_16619542960420_0003-1-00001.orc" file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \ "519c5500ed04-job_16619542960420_0004-1-00001.orc" self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR) self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR) # Test 7 init: overwrite DST_DIR = "/tmp/" + unique_database + "/overwrite/" self.filesystem_client.make_dir(DST_DIR, permission=777) self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR) # Test 8 init: mismatching parquet schema format SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/" "iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}") DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/" self.filesystem_client.make_dir(DST_DIR, permission=777) file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet" self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) # Test 9 init: partitioned DST_DIR = "/tmp/" + unique_database + "/partitioned/" self.filesystem_client.make_dir(DST_DIR, permission=777) self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) # Test 10 init: hidden files DST_DIR = "/tmp/" + unique_database + "/hidden/" self.filesystem_client.make_dir(DST_DIR, permission=777) self.filesystem_client.create_file(DST_DIR + "_hidden.1", "Test data 123") self.filesystem_client.create_file(DST_DIR + "_hidden_2.1", "Test data 123") self.filesystem_client.create_file(DST_DIR + ".hidden_3", "Test data 123") self.filesystem_client.create_file(DST_DIR + ".hidden_4.1", "Test data 123") self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) # Init test table create_iceberg_table_from_directory(self.client, unique_database, "iceberg_mixed_file_format_test", "parquet") # Execute tests self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database) # Clean up temporary directory self.filesystem_client.delete_file_dir("/tmp/{0}".format(unique_database), True) def test_table_sampling(self, vector): self.run_test_case('QueryTest/iceberg-tablesample', vector, use_db="functional_parquet") def _create_table_like_parquet_helper(self, vector, unique_database, tbl_name, expect_success): create_table_from_parquet(self.client, unique_database, tbl_name) args = ['-q', "show files in {0}.{1}".format(unique_database, tbl_name)] results = run_impala_shell_cmd(vector, args) result_rows = results.stdout.strip().split('\n') hdfs_file = None for row in result_rows: if "://" in row: hdfs_file = row.split('|')[1].lstrip() break assert hdfs_file iceberg_tbl_name = "iceberg_{0}".format(tbl_name) sql_stmt = "create table {0}.{1} like parquet '{2}' stored as iceberg".format( unique_database, iceberg_tbl_name, hdfs_file ) args = ['-q', sql_stmt] return run_impala_shell_cmd(vector, args, expect_success=expect_success) def test_create_table_like_parquet(self, vector, unique_database): tbl_name = 'alltypes_tiny_pages' # Not all types are supported by iceberg self._create_table_like_parquet_helper(vector, unique_database, tbl_name, False) tbl_name = "create_table_like_parquet_test" results = self._create_table_like_parquet_helper(vector, unique_database, tbl_name, True) result_rows = results.stdout.strip().split('\n') assert result_rows[3].split('|')[1] == ' Table has been created. ' sql_stmt = "describe {0}.{1}".format(unique_database, tbl_name) args = ['-q', sql_stmt] parquet_results = run_impala_shell_cmd(vector, args) parquet_result_rows = parquet_results.stdout.strip().split('\n') parquet_column_name_type_list = [] for row in parquet_result_rows[1:-2]: parquet_column_name_type_list.append(row.split('|')[1:3]) sql_stmt = "describe {0}.iceberg_{1}".format(unique_database, tbl_name) args = ['-q', sql_stmt] iceberg_results = run_impala_shell_cmd(vector, args) iceberg_result_rows = iceberg_results.stdout.strip().split('\n') iceberg_column_name_type_list = [] for row in iceberg_result_rows[1:-2]: iceberg_column_name_type_list.append(row.split('|')[1:3]) assert parquet_column_name_type_list == iceberg_column_name_type_list @SkipIfFS.hive def test_hive_external_forbidden(self, unique_database): tbl_name = unique_database + ".hive_ext" error_msg = ("cannot be loaded because it is an EXTERNAL table in the HiveCatalog " "that points to another table. Query the original table instead.") self.execute_query("create table {0} (i int) stored by iceberg". format(tbl_name)) # 'iceberg.table_identifier' can refer to another table self.run_stmt_in_hive("""alter table {0} set tblproperties ('external.table.purge'='false', 'iceberg.table_identifier'='functional_iceberg.iceberg_partitioned')""". format(tbl_name)) ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name)) assert error_msg in str(ex) # 'iceberg.mr.table.identifier' can refer to another table self.run_stmt_in_hive(""" alter table {0} unset tblproperties('iceberg.table_identifier')""". format(tbl_name)) self.run_stmt_in_hive("""alter table {0} set tblproperties ('iceberg.mr.table.identifier'='functional_iceberg.iceberg_partitioned')""". format(tbl_name)) ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name)) assert error_msg in str(ex) # 'name' can also refer to another table but cannot be set by Hive/Impala. Also, # during table migration both Impala and Hive clears existing table properties # See IMPALA-12410 @SkipIfFS.incorrent_reported_ec def test_compute_stats(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-compute-stats', vector, unique_database) def test_virtual_columns(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-virtual-columns', vector, unique_database) def test_avro_file_format(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-avro', vector, unique_database) def test_convert_table(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables', vector, unique_database) def test_table_exists(self, unique_database): """Test that iceberg AlreadyExistsException are correctly handled.""" tbl_name = unique_database + ".create_iceberg_exists" # Attempt to create an iceberg table, simulating AlreadyExistsException iceberg_created_options = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@' 'IcebergAlreadyExistsException@Table was created concurrently'} err = self.execute_query_expect_failure(self.client, "create table {0} (i int) stored as iceberg".format(tbl_name), query_options=iceberg_created_options) assert "AlreadyExistsException: Table already exists" in str(err) self.execute_query_expect_success(self.client, "create table if not exists {0} (i int) stored as iceberg".format(tbl_name), query_options=iceberg_created_options) def test_abort_transaction(self, unique_database): """Test that iceberg operations fail correctly when an Iceberg transaction commit fails, and that the effects of the failed operation are not visible.""" tbl_name = unique_database + ".abort_iceberg_transaction" # The query options that inject an iceberg transaction commit failure. abort_ice_transaction_options = {'debug_action': 'CATALOGD_ICEBERG_COMMIT:EXCEPTION@' 'CommitFailedException@' 'simulated commit failure'} # Create an iceberg table and insert a row. self.client.execute("""create table {0} (i int) stored as iceberg""".format(tbl_name)) self.execute_query_expect_success(self.client, "insert into {0} values (1);".format(tbl_name)) # Run a query that would insert a row, but pass the query options that # will cause the iceberg transaction to abort. err = self.execute_query_expect_failure(self.client, "insert into {0} values (2);".format(tbl_name), query_options=abort_ice_transaction_options) # Check that the error message looks reasonable. result = str(err) assert error_msg_startswith(result, "ImpalaRuntimeException: simulated commit failure\n" "CAUSED BY: CommitFailedException: simulated commit failure") # Check that no data was inserted. data = self.execute_query_expect_success(self.client, "select * from {0}".format(tbl_name)) assert data.column_labels == ['I'] assert len(data.data) == 1 assert data.data[0] == '1' # Run a query that would add a column to the table, but pass the query options that # will cause the iceberg transaction to abort. ddl_err = self.execute_query_expect_failure(self.client, "alter table {0} add column {1} bigint" .format(tbl_name, "j"), query_options=abort_ice_transaction_options) ddl_result = str(ddl_err) # Check that the error message looks reasonable. assert error_msg_startswith(ddl_result, "CommitFailedException: simulated commit failure") # Check that no column was added. data = self.execute_query_expect_success(self.client, "select * from {0}".format(tbl_name)) assert data.column_labels == ['I'] assert len(data.data) == 1 assert data.data[0] == '1' def test_drop_partition(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-drop-partition', vector, use_db=unique_database) def test_rollback_after_drop_partition(self, unique_database): table_name = "iceberg_drop_partition_rollback" qualified_table_name = "{}.{}".format(unique_database, table_name) create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int int) PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG""".format(qualified_table_name) insert_into_stmt = """INSERT INTO {} values(1, 2)""".format(qualified_table_name) drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 1)""".format( qualified_table_name) self.execute_query(create_table_stmt) self.execute_query(insert_into_stmt) self.execute_query(drop_partition_stmt) snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=2) rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format( qualified_table_name, snapshots[0].get_snapshot_id()) # Rollback before DROP PARTITION self.execute_query(rollback) snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=3) assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id() assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id() assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time() def test_show_files_partition(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-show-files-partition', vector, use_db=unique_database) def test_scan_metrics_in_profile_basic(self, vector): self.run_test_case('QueryTest/iceberg-scan-metrics-basic', vector) class TestIcebergV2Table(IcebergTestSuite): """Tests related to Iceberg V2 tables.""" @classmethod def add_test_dimensions(cls): super(TestIcebergV2Table, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') add_exec_option_dimension(cls, 'disable_optimized_iceberg_v2_read', [0, 1]) def should_run_for_hive(self, vector): # Hive interop tests are very slow. Only run them for a subset of dimensions. if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 0: return True return False # The test uses pre-written Iceberg tables where the position delete files refer to # the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the # dockerised environment the namenode is accessible on a different hostname/port. @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_plain_count_star_optimization(self, vector): self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization', vector) def test_count_star_optimization_in_complex_query(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-v2-count-star-optimization-in-complex-query', vector, unique_database) @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_read_position_deletes(self, vector): # Remove 'batch_size' option so we can set it at .test file. # Revisit this if 'batch_size' dimension size increase. vector.unset_exec_option('batch_size') self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector) @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_read_position_deletes_orc(self, vector): self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc', vector) @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris @pytest.mark.execute_serially def test_read_position_deletes_compute_stats(self, vector): """Tests COMPUTE STATS on Iceberg V2 tables. Need to be executed serially because it modifies tables that are used by other tests (e.g. multiple instances of this test).""" self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-stats', vector) self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc-stats', vector) @SkipIfFS.hive def test_read_mixed_format_position_deletes(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-mixed-format-position-deletes', vector, unique_database) @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_read_null_delete_records(self, vector): expected_error = 'NULL found as file_path in delete file' query_options = vector.get_value('exec_option') v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1 result = self.execute_query( 'select * from functional_parquet.iceberg_v2_null_delete_record', query_options) assert len(result.data) == 6 errors = result.log print(errors) assert expected_error in errors or v2_op_disabled result = self.execute_query( 'select count(*) from functional_parquet.iceberg_v2_null_delete_record', query_options) assert result.data == ['6'] errors = result.log assert expected_error in errors or v2_op_disabled result = self.execute_query( """select * from functional_parquet.iceberg_v2_null_delete_record where j < 3""", query_options) assert sorted(result.data) == ['1\t1', '2\t2'] errors = result.log assert expected_error in errors or v2_op_disabled @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_read_equality_deletes(self, vector): self.run_test_case('QueryTest/iceberg-v2-read-equality-deletes', vector) @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_table_sampling_v2(self, vector): self.run_test_case('QueryTest/iceberg-tablesample-v2', vector, use_db="functional_parquet") @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_scan_metrics_in_profile_with_deletes(self, vector): def get_latest_snapshot_id(fq_tbl_name): query = ("select snapshot_id from {}.snapshots order by committed_at desc" .format(fq_tbl_name)) res = self.execute_query(query) return res.data[0] ice_db = "functional_parquet" no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes") no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes) pos_delete_all_rows = "{}.{}".format(ice_db, "iceberg_v2_positional_delete_all_rows") pos_delete_all_rows_snapshot_id = get_latest_snapshot_id(pos_delete_all_rows) not_all_data_files_have_delete_files = "{}.{}".format( ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files") not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id( not_all_data_files_have_delete_files) self.run_test_case('QueryTest/iceberg-scan-metrics-with-deletes', vector, test_file_vars={ "NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id, "POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id, "NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID": not_all_data_files_have_delete_files_snapshot_id }) @SkipIf.hardcoded_uris def test_metadata_tables(self, vector, unique_database): # Remove 'batch_size' option so we can set it at .test file. # Revisit this if 'batch_size' dimension size increase. vector.unset_exec_option('batch_size') with self.create_impala_client() as impalad_client: overwrite_snapshot_id = impalad_client.execute("select snapshot_id from " "functional_parquet.iceberg_query_metadata.snapshots " "where operation = 'overwrite';") overwrite_snapshot_ts = impalad_client.execute("select committed_at from " "functional_parquet.iceberg_query_metadata.snapshots " "where operation = 'overwrite';") self.run_test_case('QueryTest/iceberg-metadata-tables', vector, unique_database, test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]), '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])}) @SkipIf.not_hdfs def test_missing_data_files(self, vector, unique_database): def list_files(tbl): query_result = self.execute_query("select file_path from {}.`files`".format(tbl)) return query_result.data def first_snapshot(tbl): query_result = self.execute_query( "select snapshot_id from {}.`snapshots` order by committed_at".format(tbl)) return query_result.data[0] def insert_values(tbl, values): self.execute_query("insert into {} values {}".format(tbl, values)) missing_files_nopart = unique_database + ".missing_files_nopart" missing_files_part = unique_database + ".missing_files_part" self.execute_query("""CREATE TABLE {} (i int, p int) STORED BY ICEBERG TBLPROPERTIES('format-version'='2')""".format(missing_files_nopart)) insert_values(missing_files_nopart, "(1, 1)") first_file = set(list_files(missing_files_nopart)) insert_values(missing_files_nopart, "(2, 2)") all_files = set(list_files(missing_files_nopart)) assert len(all_files) == 2 second_file = next(iter(all_files - first_file)) check_output(["hdfs", "dfs", "-rm", second_file]) self.execute_query("""CREATE TABLE {} (i int, p int) PARTITIONED BY SPEC (p) STORED BY ICEBERG TBLPROPERTIES('format-version'='2')""".format(missing_files_part)) insert_values(missing_files_part, "(1, 1)") insert_values(missing_files_part, "(2, 2)") files = list_files(missing_files_part) part_2_f = None for f in files: if "p=2" in f: part_2_f = f break assert part_2_f is not None check_output(["hdfs", "dfs", "-rm", part_2_f]) self.execute_query("invalidate metadata {}".format(missing_files_nopart)) self.execute_query("invalidate metadata {}".format(missing_files_part)) self.run_test_case('QueryTest/iceberg-missing-data-files', vector, unique_database, test_file_vars={ '$NOPART_FIRST_SNAPSHOT': first_snapshot(missing_files_nopart), '$PART_FIRST_SNAPSHOT': first_snapshot(missing_files_part)}) def test_delete(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-delete', vector, unique_database) def test_delete_partitioned(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-delete-partitioned', vector, unique_database) if IS_HDFS and self.should_run_for_hive(vector): self._delete_partitioned_hive_tests(unique_database) def _delete_partitioned_hive_tests(self, db): hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( db, "id_part")) assert hive_output == "id_part.i,id_part.s\n" hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( db, "trunc_part")) assert hive_output == "trunc_part.i,trunc_part.s\n" hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( db, "multi_part")) assert hive_output == "multi_part.i,multi_part.s,multi_part.f\n" hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( db, "evolve_part")) assert hive_output == \ "evolve_part.i,evolve_part.s,evolve_part.f\n3,three,3.33\n" \ "30,thirty,30.3\n40,forty,40.4\n" hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}.{}".format( db, "ice_store_sales")) assert hive_output == "_c0\n2601498\n" hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{}".format( db, "ice_alltypes_part_v2")) # Cut off the long header line. hive_output = hive_output.split("\n", 1) hive_output = hive_output[1] assert hive_output == \ "2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n" def test_large_scale_deletes(self, vector, unique_database): if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 1: pytest.skip("Only test the optimized v2 operator") self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector, unique_database) @SkipIfFS.hive def test_delete_hive_read(self, unique_database): ice_delete = unique_database + ".ice_delete" self.execute_query("""CREATE TABLE {} (i int, s string) STORED BY ICEBERG TBLPROPERTIES('format-version'='2')""".format(ice_delete)) self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete)) self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete)) self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete)) self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete)) # Hive needs table property 'format-version' explicitly set self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( ice_delete)) hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY i".format(ice_delete)) expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n" assert hive_output == expected_output ice_lineitem = unique_database + ".linteitem_ice" self.execute_query("""CREATE TABLE {} STORED BY ICEBERG TBLPROPERTIES('format-version'='2') AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem)) self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 0".format(ice_lineitem)) impala_result = self.execute_query("SELECT count(*) FROM {}".format(ice_lineitem)) assert impala_result.data[0] == "4799964" # Hive needs table property 'format-version' explicitly set self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( ice_lineitem)) hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}".format(ice_lineitem)) assert hive_output == "_c0\n4799964\n" @SkipIfFS.hive def test_delete_complextypes_mixed_files(self, vector, unique_database): ice_t = unique_database + ".ice_complex_delete" self.run_stmt_in_hive("""create table {} stored by iceberg stored as orc as select * from functional_parquet.complextypestbl;""".format(ice_t)) # Hive needs table property 'format-version' explicitly set self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( ice_t)) self.run_stmt_in_hive("""alter table {} set tblproperties ('write.format.default'='parquet')""".format(ice_t)) self.run_stmt_in_hive("""insert into {} select * from functional_parquet.complextypestbl""".format(ice_t)) vector.get_value('exec_option')['expand_complex_types'] = True self.run_test_case('QueryTest/iceberg-delete-complex', vector, unique_database) hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t)) # Test that Hive sees the same rows deleted. assert hive_output == "id\n4\n5\n6\n7\n8\n" def test_update_basic(self, vector, unique_database): udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') self.run_test_case('QueryTest/iceberg-update-basic', vector, unique_database, test_file_vars={'UDF_LOCATION': udf_location}) self._test_update_basic_snapshots(unique_database) if IS_HDFS and self.should_run_for_hive(vector): self._update_basic_hive_tests(unique_database) def _test_update_basic_snapshots(self, db): """Verifies that the tables have the expected number of snapshots, and the parent ids match the previous snapshot ids. See IMPALA-12708.""" self.validate_snapshots(db, "single_col", 3) self.validate_snapshots(db, "ice_alltypes", 21) self.validate_snapshots(db, "ice_id_partitioned", 7) def validate_snapshots(self, db, tbl, expected_snapshots): tbl_name = "{}.{}".format(db, tbl) snapshots = get_snapshots(self.client, tbl_name, expected_result_size=expected_snapshots) parent_id = None for s in snapshots: assert s.get_parent_id() == parent_id parent_id = s.get_snapshot_id() def _update_basic_hive_tests(self, db): def get_hive_results(tbl, order_by_col): stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col) return self.run_stmt_in_hive(stmt).split("\n", 1)[1] hive_results = get_hive_results("single_col", "i") assert hive_results == "1\n3\n4\n" hive_results = get_hive_results("ice_alltypes", "bool_col") assert hive_results == \ "false,0,111,0.0,0.0,234,123.00,2023-11-07,2000-01-01 00:00:00.0,IMPALA,zerob\n" \ "true,3,222,1.0,1.0,NULL,NULL,2023-11-08,2001-01-01 01:01:01.0,ICEBERG,oneb\n" hive_results = get_hive_results("ice_id_partitioned", "i") assert hive_results == \ "1,0,APACHE IMPALA\n" \ "2,0,iceberg\n" \ "3,0,hive\n" \ "5,2,Kudu\n" \ "10,1,Apache Spark\n" hive_results = get_hive_results("ice_bucket_transform", "i") assert hive_results == \ "2,a fairly long string value,1000,1999-09-19 12:00:01.0\n" \ "4,bbb,2030,2001-01-01 00:00:00.0\n" \ "6,cccccccccccccccccccccccccccccccccccccccc,-123,2023-11-24 17:44:30.0\n" hive_results = get_hive_results("ice_time_transforms_timestamp", "id") assert hive_results == \ "1.5000,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0\n" \ "2.4690,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0\n" \ "1999.9998,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0\n" # noqa: E501 hive_results = get_hive_results("ice_time_transforms_date", "id") assert hive_results == \ "1.5000,2001-01-01,2001-01-01,2001-01-01\n" \ "2.4690,2023-11-24,2023-11-24,2023-11-24\n" \ "1999.9998,2199-12-31,2199-12-31,2199-12-31\n" hive_results = get_hive_results("ice_part_transforms", "i") assert hive_results == \ "1,2023-11-13 18:07:05.0,blue,1234\n" \ "3,2023-11-14 19:07:05.0,green,1700\n" \ "4,2023-11-13 18:07:23.0,gray,2500\n" \ "8,2023-11-01 00:11:11.0,black,722\n" def test_update_partitions(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-update-partitions', vector, unique_database) if IS_HDFS and self.should_run_for_hive(vector): self._update_partitions_hive_tests(unique_database) def _update_partitions_hive_tests(self, db): def get_hive_results(tbl, order_by_col): stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col) return self.run_stmt_in_hive(stmt).split("\n", 1)[1] hive_results = get_hive_results("id_part", "i, s") assert hive_results == \ "2,FIVE\n" \ "2,FOUR\n" \ "3,SIX\n" \ "5,ONE\n" \ "10,TWO\n" \ "15,THREE\n" hive_results = get_hive_results("trunc_part", "i") assert hive_results == \ "1,one\n" \ "5,five\n" \ "103,three\n" \ "1004,FOURfour\n" \ "1006,SIXsix\n" \ "1102,TWOtwo\n" hive_results = get_hive_results("multi_part", "i") assert hive_results == \ "0,void,3.14\n" \ "0,void,3.14\n" \ "0,void,3.14\n" \ "1,one,1.1\n" \ "3,three,3.33\n" \ "5,five,5.5\n" \ "111,fox,1.1\n" hive_results = get_hive_results("evolve_part", "i") assert hive_results == \ "1,one,1.1\n" \ "30,thirty,30.3\n" \ "40,forty,40.4\n" \ "50,fifty,50.5\n" \ "1003,three,3.33\n" \ "1010,ten,10.1\n" \ "1020,twenty,20.2\n" \ "1222,two,2.2\n" hive_results = get_hive_results("date_day_part", "i") assert hive_results == \ "11,1978-01-01\n" \ "12,1979-12-31\n" \ "13,1980-01-01\n" \ "14,2033-11-15\n" hive_results = get_hive_results("ts_hour_part", "i") assert hive_results == \ "101,1958-01-01 01:02:03.0\n" \ "102,1959-12-31 23:59:00.0\n" \ "103,1960-01-01 00:00:00.0\n" \ "104,2013-11-15 15:31:00.0\n" hive_results = get_hive_results("ts_evolve_part", "i") assert hive_results == \ "1001,1988-02-02 01:02:03.0\n" \ "1002,1990-02-01 23:59:00.0\n" \ "1003,1990-02-02 00:00:00.0\n" \ "1004,2043-12-16 15:31:00.0\n" \ "1111,NULL\n" hive_results = get_hive_results("numeric_truncate", "id") assert hive_results == \ "11,21,2111,531,75.20\n" # HIVE-28048: Hive cannot run ORDER BY queries for Iceberg tables partitioned by # decimal columns, so we order the results ourselves. hive_results = self.run_stmt_in_hive("SELECT * FROM {}.{}".format( db, "ice_alltypes_part_v2")) # Throw away the header line and sort the results. hive_results = hive_results.split("\n", 1)[1] hive_results = hive_results.strip().split("\n") hive_results.sort() assert hive_results == [ "2,true,2,11,1.1,2.222,123.321,2022-04-22,impala", "3,true,3,11,1.1,2.222,123.321,2022-05-22,impala"] def test_optimize(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) expected_snapshots = 19 self.validate_snapshots(unique_database, "ice_optimize", expected_snapshots) # The last operation was an OPTIMIZE TABLE statement. # Check that time travel to the previous snapshot returns all results correctly. tbl_name = unique_database + ".ice_optimize" snapshots = get_snapshots( self.client, tbl_name, expected_result_size=expected_snapshots) snapshot_before_last = snapshots[-2] result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name)) result_time_travel = self.execute_query( "select * from {0} for system_version as of {1};".format( tbl_name, snapshot_before_last.get_snapshot_id())) assert result_after_opt.data.sort() == result_time_travel.data.sort() def _check_file_filtering(self, tbl_name, threshold_mb, mode, had_partition_evolution): threshold_bytes = threshold_mb * 1024 * 1024 DATA_FILE = "0" DELETE_FILE = "1" FileMetadata = namedtuple('FileMetadata', 'content, path, partition, size') metadata_query = """select content, file_path, `partition`, file_size_in_bytes from {0}.`files`;""".format(tbl_name) result = self.execute_query(metadata_query) files_before = set() files_per_partition = defaultdict(set) for line in result.data: file = FileMetadata._make(line.split("\t")) partition = file.partition files_per_partition[partition].add(file) files_before.add(file) selected_files = set() partitions_with_removed_files = set() for partition, files in files_per_partition.items(): if len(files) > 1: num_small_files = 0 # count small datafiles for file in files: if file.content == DATA_FILE and int(file.size) < threshold_bytes: num_small_files += 1 for file in files: # We assume that a delete file in a partition references all data files in # that partition, because we cannot differentiate between data files # with/without deletes. if file.content == DELETE_FILE: selected_files.update(files) partitions_with_removed_files.add(partition) break # Only merge small files if there are at least 2 of them. elif num_small_files > 1 and int(file.size) < threshold_bytes: selected_files.add(file) partitions_with_removed_files.add(partition) self.execute_query( "OPTIMIZE TABLE {0} (file_size_threshold_mb={1});".format(tbl_name, threshold_mb)) optimized_result = self.execute_query(metadata_query) files_after = set() for line in optimized_result.data: file = FileMetadata._make(line.split("\t")) files_after.add(file) # Check the resulting files and the modified partitions after the OPTIMIZE operation. # files_after = files_before - selected_files + 1 new file per partition # The result should not contain the files that were selected and should contain 1 new # file per written partition. unchanged_files = files_before - selected_files # Check that files that were not selected are still present in the result. assert unchanged_files.issubset(files_after) # Check that selected files are rewritten and not present in the result. assert selected_files.isdisjoint(files_after) new_files = files_after - unchanged_files assert new_files == files_after - files_before if mode == "NOOP": assert selected_files == set([]) assert files_after == files_before elif mode == "REWRITE_ALL": assert selected_files == files_before assert files_after.isdisjoint(files_before) elif mode == "PARTIAL": assert selected_files < files_before and selected_files != set([]) assert unchanged_files < files_after and unchanged_files != set([]) assert unchanged_files == files_after.intersection(files_before) # Check that all delete files were merged. for file in files_after: assert file.content == DATA_FILE # Check that there's only one new file in every partition. partitions_with_new_files = set() for file in new_files: assert file.partition not in partitions_with_new_files partitions_with_new_files.add(file.partition) assert len(new_files) == len(partitions_with_new_files) # WITH PARTITION EVOLUTION # Only new partitions are written to. # WITHOUT PARTITION EVOLUTION if not had_partition_evolution: # Check that 1 new content file is written in every updated partition. assert len(new_files) == len(partitions_with_removed_files) assert partitions_with_new_files == partitions_with_removed_files def test_optimize_file_filtering(self, unique_database): tbl_name = unique_database + ".ice_optimize_filter" self.execute_query("""CREATE TABLE {0} partitioned by spec (l_linenumber) STORED BY ICEBERG TBLPROPERTIES ('format-version'='2') AS SELECT * FROM tpch_parquet.lineitem WHERE l_quantity < 10;""".format(tbl_name)) self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem WHERE l_quantity>=10 AND l_quantity<=12;""".format(tbl_name)) # There are no delete files in the table, so this should be a no-op. Check that no new # snapshot was created. self._check_file_filtering(tbl_name, 0, "NOOP", False) assert len(get_snapshots(self.client, tbl_name)) == 2 self._check_file_filtering(tbl_name, 5, "PARTIAL", False) self._check_file_filtering(tbl_name, 50, "PARTIAL", False) # Check that the following is a no-op, since the table is already in a compact form. self._check_file_filtering(tbl_name, 100, "NOOP", False) self.execute_query("""UPDATE {0} SET l_linenumber=7 WHERE l_linenumber>4 AND l_linestatus='F';""".format(tbl_name)) self._check_file_filtering(tbl_name, 6, "PARTIAL", False) self.execute_query("""ALTER TABLE {0} SET PARTITION SPEC(l_linestatus);""" .format(tbl_name)) self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL' AND l_linenumber<4;""".format(tbl_name)) self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem WHERE l_quantity=13 AND l_linenumber<3;""".format(tbl_name)) self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem WHERE l_quantity=14 AND l_linenumber<3;""".format(tbl_name)) # Merges the delete files and rewrites the small files. self._check_file_filtering(tbl_name, 2, "PARTIAL", True) # Rewrites the remaining small files (2MB <= file_size < 100MB). self._check_file_filtering(tbl_name, 100, "PARTIAL", True) self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL';""" .format(tbl_name)) # All partitions have delete files, therefore the entire table is rewritten. self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True) def test_merge(self, vector, unique_database): udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') self.run_test_case('QueryTest/iceberg-merge', vector, unique_database, test_file_vars={'UDF_LOCATION': udf_location}) def test_merge_partition(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database) def test_merge_partition_sort(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, unique_database) def test_merge_long(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database) def test_merge_star(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database) def test_merge_equality_update(self, vector, unique_database): create_iceberg_table_from_directory(self.client, unique_database, "iceberg_v2_delete_equality_partitioned", "parquet", table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database) def test_merge_equality_insert(self, vector, unique_database): create_iceberg_table_from_directory(self.client, unique_database, "iceberg_v2_delete_equality_partitioned", "parquet", table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database) def test_merge_duplicate_check(self, vector, unique_database): """Regression test for IMPALA-13932""" # Remove 'num_nodes' option so we can set it at .test file. vector.unset_exec_option('num_nodes') self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, unique_database) def test_writing_multiple_deletes_per_partition(self, vector, unique_database): """Test writing multiple delete files partition in a single DELETE operation.""" self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', vector, use_db=unique_database) def test_cleanup(self, unique_database): """Test that all uncommitted files written by Impala are removed from the file system when a DML commit to an Iceberg table fails, and that the effects of the failed operation are not visible.""" table_name = "iceberg_cleanup_failure" fq_tbl_name = unique_database + "." + table_name # The query options that inject an iceberg validation check failure. fail_ice_commit_options = {'debug_action': 'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@' 'ValidationException@' 'simulated validation check failure'} # Create an iceberg table and insert a row. self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i int) STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')""".format(fq_tbl_name)) self.execute_query_expect_success(self.client, "insert into {0} values (1)".format(fq_tbl_name)) # Run a query that would update a row, but pass the query options that # will cause the iceberg validation check to fail. err = self.execute_query_expect_failure(self.client, "update {0} set i=2 where i=1".format(fq_tbl_name), query_options=fail_ice_commit_options) # Check that we get the error message. assert error_msg_startswith(str(err), "ImpalaRuntimeException: simulated validation check failure\n" "CAUSED BY: ValidationException: simulated validation check failure") # Check that the table content was not updated. data = self.execute_query_expect_success(self.client, "select * from {0}".format(fq_tbl_name)) assert len(data.data) == 1 assert data.data[0] == '1' # Check that the uncommitted data and delete files are removed from the file system # and only the first data file remains. table_location = "{0}/test-warehouse/{1}.db/{2}/data".format( FILESYSTEM_PREFIX, unique_database, table_name) files_result = check_output(["hdfs", "dfs", "-ls", table_location]) assert "Found 1 items" in bytes_to_str(files_result) def test_predicate_push_down_hint(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector, use_db=unique_database) def test_partitions(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-partitions', vector, unique_database) tbl_name = unique_database + ".ice_num_partitions" snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4) second_snapshot = snapshots[1] time_travel_data = self.execute_query( "SELECT * FROM {0} for system_version as of {1};".format( tbl_name, second_snapshot.get_snapshot_id())) assert "partitions=4/unknown" in time_travel_data.runtime_profile selective_time_travel_data = self.execute_query( "SELECT * FROM {0} for system_version as of {1} WHERE id < 5;".format( tbl_name, second_snapshot.get_snapshot_id())) assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile def test_table_repair(self, unique_database): tbl_name = 'tbl_with_removed_files' db_tbl = unique_database + "." + tbl_name repair_query = "alter table {0} execute repair_metadata()" with self.create_impala_client() as impalad_client: impalad_client.execute( "create table {0} (i int) stored as iceberg tblproperties('format-version'='2')" .format(db_tbl)) insert_q = "insert into {0} values ({1})" self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4)) self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5)) result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) assert result.data == ['1', '2', '3', '4', '5'] TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name) DATA_PATH = os.path.join(TABLE_PATH, "data") # Check that table remains intact if there are no missing files result = self.execute_query_expect_success( impalad_client, repair_query.format(db_tbl)) assert result.data[0] == "No missing data files detected." result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) assert result.data == ['1', '2', '3', '4', '5'] # Delete 2 data files from the file system directly to corrupt the table. data_files = self.filesystem_client.ls(DATA_PATH) self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0]) self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1]) self.execute_query_expect_success(impalad_client, "invalidate metadata") result = self.execute_query_expect_success( impalad_client, repair_query.format(db_tbl)) assert result.data[0] == \ "Iceberg table repaired by deleting 2 manifest entries of missing data files." result = impalad_client.execute('select * from {0} order by i'.format(db_tbl)) assert len(result.data) == 3 # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it # runs also with the V2 optimizations setting turned off, some tests were moved here. class TestIcebergDirectedMode(IcebergTestSuite): """Tests related to Iceberg DIRECTED distribution mode.""" @classmethod def add_test_dimensions(cls): super(TestIcebergDirectedMode, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_directed_mode(self, vector): self.run_test_case('QueryTest/iceberg-v2-directed-mode', vector)