Files
impala/tests/query_test/test_iceberg.py
Noemi Pap-Takacs fdad9d3204 IMPALA-13725: Add Iceberg table repair functionalities
In some cases users delete files directly from storage without
going through the Iceberg API, e.g. they remove old partitions.

This corrupts the table, and makes queries that try to read the
missing files fail.
This change introduces a repair statement that deletes the
dangling references of missing files from the metadata.
Note that the table cannot be repaired if there are missing
delete files because Iceberg's DeleteFiles API which is used
to execute the operation allows removing only data files.

Testing:
 - E2E
   - HDFS
   - S3, Ozone
 - analysis

Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e
Reviewed-on: http://gerrit.cloudera.org:8080/23512
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-11-25 13:03:52 +00:00

2313 lines
108 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from collections import defaultdict, namedtuple
import datetime
import json
import logging
import os
import random
import re
from subprocess import check_call, check_output
import time
from avro.datafile import DataFileReader
from avro.io import DatumReader
from builtins import range
import pytest
import pytz
# noinspection PyUnresolvedReferences
from impala_thrift_gen.parquet.ttypes import ConvertedType
from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet,
)
from tests.common.iceberg_test_suite import IcebergTestSuite
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfFS
from tests.common.test_dimensions import add_exec_option_dimension
from tests.common.test_result_verifier import error_msg_startswith
from tests.shell.util import run_impala_shell_cmd
from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, IS_HDFS, WAREHOUSE
from tests.util.get_parquet_metadata import get_parquet_metadata
from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, quote
from tests.util.parse_util import bytes_to_str
LOG = logging.getLogger(__name__)
class TestIcebergTable(IcebergTestSuite):
"""Tests related to Iceberg tables."""
@classmethod
def add_test_dimensions(cls):
super(TestIcebergTable, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def test_iceberg_negative(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-negative', vector, use_db=unique_database)
def test_create_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database)
def test_alter_iceberg_tables_v1(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-alter-v1', vector, use_db=unique_database)
def test_alter_iceberg_tables_v2(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-alter-v2', vector, use_db=unique_database)
def test_alter_iceberg_tables_default(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-alter-default', vector, use_db=unique_database)
def test_iceberg_binary_type(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-binary-type', vector, use_db=unique_database)
def test_external_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-external', vector, unique_database)
def test_expire_snapshots(self, unique_database):
tbl_name = unique_database + ".expire_snapshots"
iceberg_catalogs = IcebergCatalogs(unique_database)
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
# Iceberg doesn't create a snapshot entry for the initial empty table
impalad_client.execute("""
create table {0} (i int) stored as iceberg
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
ts_0 = datetime.datetime.now()
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(1)
impalad_client.execute(insert_q)
time.sleep(1)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
impalad_client.execute(insert_q)
# There should be 4 snapshots initially
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
# Expire the oldest snapshot and test that the oldest one was expired
expire_q = "alter table {0} execute expire_snapshots({1})"
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
# Expire with a timestamp in which the interval does not touch existing snapshot
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
# Expire all, but retain 1
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
# Change number of retained snapshots, then expire all
impalad_client.execute("""alter table {0} set tblproperties
('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
impalad_client.execute(insert_q)
impalad_client.execute(insert_q)
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(insert_q)
ts_tokyo = self.impala_now(impalad_client)
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
impalad_client.execute(insert_q)
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
def test_truncate_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
@SkipIf.not_dfs
def test_drop_incomplete_table(self, unique_database):
"""Test DROP TABLE when the underlying directory is deleted. In that case table
loading fails, but we should be still able to drop the table from Impala."""
tbl_name = unique_database + ".synchronized_iceberg_tbl"
cat_location = get_fs_path("/test-warehouse/" + unique_database)
self.client.execute("""create table {0} (i int) stored as iceberg
tblproperties('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location))
self.filesystem_client.delete_file_dir(cat_location, True)
self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name))
@SkipIf.not_dfs(reason="Dfs required as test to directly delete files.")
def test_drop_corrupt_table(self, unique_database):
"""Test that if the underlying iceberg metadata directory is deleted, then a query
fails with a reasonable error message, and the table can be dropped successfully."""
table = "corrupt_iceberg_tbl"
full_table_name = unique_database + "." + table
self.client.execute("""create table {0} (i int) stored as iceberg""".
format(full_table_name))
metadata_location = get_fs_path("""/test-warehouse/{0}.db/{1}/metadata""".format(
unique_database, table))
assert self.filesystem_client.exists(metadata_location)
status = self.filesystem_client.delete_file_dir(metadata_location, True)
assert status, "Delete failed with {0}".format(status)
assert not self.filesystem_client.exists(metadata_location)
# Invalidate so that table loading problems will happen in the catalog.
self.client.execute("invalidate metadata {0}".format(full_table_name))
# Query should now fail.
err = self.execute_query_expect_failure(self.client, """select * from {0}""".
format(full_table_name))
result = str(err)
assert "AnalysisException: Failed to load metadata for table" in result
assert ("Failed to load metadata for table" in result # local catalog
or "Error loading metadata for Iceberg table" in result) # default catalog
self.execute_query_expect_success(self.client, """drop table {0}""".
format(full_table_name))
def test_insert(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database)
def test_partitioned_insert_v1(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-partitioned-insert-v1', vector,
use_db=unique_database)
def test_partitioned_insert_v2(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-partitioned-insert-v2', vector,
use_db=unique_database)
def test_partitioned_insert_default(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-partitioned-insert-default', vector,
use_db=unique_database)
def test_insert_overwrite(self, vector, unique_database):
"""Run iceberg-overwrite tests, then test that INSERT INTO/OVERWRITE queries running
concurrently with a long running INSERT OVERWRITE are handled gracefully. query_a is
started before query_b/query_c, but query_b/query_c are supposed to finish before
query_a. query_a should fail because the overwrite should not erase query_b/query_c's
result."""
# Run iceberg-overwrite.test
self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database)
# Create test dataset for concurrency tests and warm-up the test table
tbl_name = unique_database + ".overwrite_tbl"
self.client.execute("""create table {0} (i int)
partitioned by spec (truncate(3, i))
stored as iceberg""".format(tbl_name))
self.client.execute("insert into {0} values (1), (2), (3);".format(tbl_name))
# Test queries: 'a' is the long running query while 'b' and 'c' are the short ones
query_a = """insert overwrite {0} select sleep(5000);""".format(tbl_name)
query_b = """insert overwrite {0} select * from {0};""".format(tbl_name)
query_c = """insert into {0} select * from {0};""".format(tbl_name)
# Test concurrent INSERT OVERWRITEs, the exception closes the query handle.
handle = self.client.execute_async(query_a)
time.sleep(1)
self.client.execute(query_b)
try:
self.client.wait_for_finished_timeout(handle, 30)
assert False
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Found conflicting files" in str(e)
# Test INSERT INTO during INSERT OVERWRITE, the exception closes the query handle.
handle = self.client.execute_async(query_a)
time.sleep(1)
self.client.execute(query_c)
try:
self.client.wait_for_finished_timeout(handle, 30)
assert False
except IMPALA_CONNECTION_EXCEPTION as e:
assert "Found conflicting files" in str(e)
def test_ctas(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database)
def test_partition_transform_insert(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector,
use_db=unique_database)
def test_iceberg_orc_field_id(self, vector):
self.run_test_case('QueryTest/iceberg-orc-field-id', vector)
def test_catalogs(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-catalogs', vector, use_db=unique_database)
def test_missing_field_ids(self, vector):
self.run_test_case('QueryTest/iceberg-missing-field-ids', vector)
def test_migrated_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-migrated-tables', vector, unique_database)
def test_migrated_table_field_id_resolution(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_migrated_alter_test", "parquet")
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_migrated_complex_test", "parquet")
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_migrated_alter_test_orc", "orc")
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_migrated_complex_test_orc", "orc")
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution',
vector, unique_database)
if IS_HDFS:
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-orc',
vector, unique_database)
def test_column_case_sensitivity(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_column_case_sensitivity_issue", "parquet")
self.run_test_case('QueryTest/iceberg-column-case-sensitivity-issue',
vector, unique_database)
@SkipIfFS.hive
def test_migrated_table_field_id_resolution_complex(self, vector, unique_database):
def get_table_loc(tbl_name):
return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, tbl_name)
def create_table(tbl_name, file_format, partition_cols):
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
id INT,
name STRING,
teststeps array<struct<step_number:int,step_description:string>>)
PARTITIONED BY ({})
STORED AS {}
""".format(unique_database, tbl_name, partition_cols, file_format))
def add_file_to_table_partition(tbl_name, part_dir, local_filename):
tbl_loc = get_table_loc(tbl_name)
part_dir = os.path.join(tbl_loc, part_dir)
self.filesystem_client.make_dir(part_dir)
data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata",
"migrated_iceberg", local_filename)
self.filesystem_client.copy_from_local(data_file_path, part_dir)
def finalize_table(tbl_name):
self.execute_query("ALTER TABLE {}.{} RECOVER PARTITIONS".format(
unique_database, tbl_name))
self.execute_query("ALTER TABLE {}.{} CONVERT TO ICEBERG".format(
unique_database, tbl_name))
def prepare_test_table(tbl_name, file_format, partition_cols, part_dir, datafile):
create_table(tbl_name, file_format, partition_cols)
add_file_to_table_partition(tbl_name, part_dir, datafile)
finalize_table(tbl_name)
prepare_test_table('all_part_cols_stored_parquet',
"PARQUET",
"result_date STRING",
"result_date=2024-08-26",
"complextypes_and_partition_columns_in_data_files.parquet")
prepare_test_table('not_all_part_cols_stored_parquet',
"PARQUET",
"result_date STRING, p INT",
"result_date=2024-08-26/p=3",
"complextypes_and_partition_columns_in_data_files.parquet")
prepare_test_table('all_part_cols_stored_orc',
"ORC",
"result_date STRING",
"result_date=2024-08-26",
"complextypes_and_partition_columns_in_data_files.orc")
prepare_test_table('not_all_part_cols_stored_orc',
"ORC",
"result_date STRING, p INT",
"result_date=2024-08-26/p=3",
"complextypes_and_partition_columns_in_data_files.orc")
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-complex',
vector, unique_database)
def test_describe_history(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database)
# Create a table with multiple snapshots and verify the table history.
tbl_name = unique_database + ".iceberg_multi_snapshots"
self.client.execute("""create table {0} (i int) stored as iceberg
tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
first_snapshot = snapshots[0]
second_snapshot = snapshots[1]
# Check that first snapshot is older than the second snapshot.
assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
# Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
# The first snapshot has no parent snapshot ID.
assert(first_snapshot.get_parent_id() is None)
# Check "is_current_ancestor" column.
assert(first_snapshot.is_current_ancestor())
assert(second_snapshot.is_current_ancestor())
def test_execute_rollback_negative(self, vector):
"""Negative test for EXECUTE ROLLBACK."""
self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
def test_execute_rollback(self, unique_database):
"""Test for EXECUTE ROLLBACK."""
iceberg_catalogs = IcebergCatalogs(unique_database)
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
# Create a table with multiple snapshots.
tbl_name = unique_database + ".iceberg_execute_rollback"
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
orig_timezone = 'America/Los_Angeles'
impalad_client.execute("SET TIMEZONE='" + orig_timezone + "'")
impalad_client.execute("""
create table {0} (i int) stored as iceberg
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
initial_snapshots = 3
for i in range(initial_snapshots):
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
snapshots = get_snapshots(impalad_client, tbl_name,
expected_result_size=initial_snapshots)
output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
LOG.info("success output={0}".format(output))
# We rolled back, but that creates a new snapshot, so now there are 4.
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
# The new snapshot has the same id (and parent id) as the snapshot we rolled back
# to, but it has a different creation time.
assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
# The "orphaned" snapshot is now not a current ancestor.
assert not snapshots[2].is_current_ancestor()
# We cannot roll back to a snapshot that is not a current ancestor.
output = self.rollback_to_id_expect_failure(tbl_name,
snapshots[2].get_snapshot_id(),
expected_text="Cannot roll back to snapshot, not an ancestor of the current "
"state")
# Create another snapshot.
before_insert = datetime.datetime.now(pytz.timezone(orig_timezone))
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
# Rollback to before the last insert.
self.rollback_to_ts(impalad_client, tbl_name, before_insert)
# This creates another snapshot.
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
# The snapshot id is the same, the dates differ
assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
assert not snapshots[4].is_current_ancestor()
# Show that the EXECUTE ROLLBACK is respecting the current timezone.
# To do this we try to roll back to a time for which there is no
# snapshot, this will fail with an error message that includes the specified
# time. We parse out that time. By doing this in two timezones we can see
# that the parameter being used was affected by the current timezone.
one_hour_ago = before_insert - datetime.timedelta(hours=1)
# We use Timezones from Japan and Iceland to avoid any DST complexities.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
japan_ts = self.get_snapshot_ts_from_failed_rollback(
impalad_client, tbl_name, one_hour_ago)
impalad_client.execute("SET TIMEZONE='Iceland'")
iceland_ts = self.get_snapshot_ts_from_failed_rollback(
impalad_client, tbl_name, one_hour_ago)
diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
assert diff_hours == 9
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
"""Run an EXECUTE ROLLBACK which is expected to fail.
Parse the error message to extract the timestamp for which there
was no snapshot, and convert the string to an integer"""
try:
self.rollback_to_ts(client, tbl_name, ts)
assert False, "Query should have failed"
except IMPALA_CONNECTION_EXCEPTION as e:
result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
time_str = result.group(1)
snapshot_ts = int(time_str)
assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
return snapshot_ts
def rollback_to_ts(self, client, tbl_name, ts):
"""Rollback a table to a snapshot timestamp."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
return self.execute_query_expect_success(client, query)
def rollback_to_id(self, tbl_name, id):
"""Rollback a table to a snapshot id."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
return self.execute_query_expect_success(self.client, query)
def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
"""Attempt to roll back a table to a snapshot id, expecting a failure."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
output = self.execute_query_expect_failure(self.client, query)
if expected_text:
assert expected_text in str(output)
return output
def test_execute_remove_orphan_files(self, unique_database):
tbl_name = 'tbl_with_orphan_files'
db_tbl = unique_database + "." + tbl_name
with self.create_impala_client() as impalad_client:
impalad_client.execute("create table {0} (i int) stored as iceberg"
.format(db_tbl))
insert_q = "insert into {0} values ({1})"
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3']
# Add some junk files to data and metadata dir.
TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name)
DATA_PATH = os.path.join(TABLE_PATH, "data")
METADATA_PATH = os.path.join(TABLE_PATH, "metadata")
SRC_DIR = os.path.join(
os.environ['IMPALA_HOME'],
"testdata/data/iceberg_test/iceberg_mixed_file_format_test/{0}/{1}")
# Copy first set of junk files.
file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
"fff150b6136a-job_16619542960420_0002-1-00001.parquet"
file_avro1 = "055baf62-de6d-4583-bf21-f187f9482343-m0.avro"
self.filesystem_client.copy_from_local(
SRC_DIR.format('data', file_parq1), DATA_PATH)
self.filesystem_client.copy_from_local(
SRC_DIR.format('metadata', file_avro1), METADATA_PATH)
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1))
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro1))
# Keep current time.
result = impalad_client.execute('select cast(now() as string)')
cp1_time = result.data[0]
time.sleep(1)
# Copy second set of junk files.
file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
"27ff880faff0-job_16619542960420_0004-1-00001.parquet"
file_avro2 = "871d1473-8566-46c0-a530-a2256b3f396f-m0.avro"
self.filesystem_client.copy_from_local(
SRC_DIR.format('data', file_parq2), DATA_PATH)
self.filesystem_client.copy_from_local(
SRC_DIR.format('metadata', file_avro2), METADATA_PATH)
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2))
# Execute REMOVE_ORPHAN_FILES at specific timestamp.
result = impalad_client.execute(
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES('{1}')".format(db_tbl, cp1_time))
assert result.data[0] == 'Remove orphan files executed.'
assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1))
assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq1))
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2))
# Execute REMOVE_ORPHAN_FILES at now().
result = impalad_client.execute(
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES(now())".format(db_tbl))
assert result.data[0] == 'Remove orphan files executed.'
assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq2))
# Assert table still queryable.
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3']
def test_describe_history_params(self, unique_database):
tbl_name = unique_database + ".describe_history"
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
# Iceberg doesn't create a snapshot entry for the initial empty table
impalad_client.execute("create table {0} (i int) stored as iceberg"
.format(tbl_name))
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(1)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(1)
ts_3 = self.execute_query_ts(impalad_client, insert_q)
# Describe history without predicate
data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
assert len(data.data) == 3
# Describe history with FROM predicate
self.expect_num_snapshots_from(impalad_client, tbl_name,
ts_1 - datetime.timedelta(hours=1), 3)
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 2)
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
# Describe history with BETWEEN <ts> AND <ts> predicate
self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
self.expect_results_between(impalad_client, tbl_name,
ts_1 - datetime.timedelta(hours=1), ts_2, 2)
self.expect_results_between(impalad_client, tbl_name,
ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE. Persist the local times first and create a new snapshot.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
now_tokyo = self.impala_now(impalad_client)
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
now_budapest = self.impala_now(impalad_client)
self.execute_query_ts(impalad_client, "insert into {0} values (4)".format(tbl_name))
self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 1)
# Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
self.expect_num_snapshots_from(impalad_client, tbl_name, now_tokyo, 1)
# Interpreting Budapest time in Tokyo time points to the past.
self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4)
def test_time_travel(self, unique_database):
tbl_name = unique_database + ".time_travel"
def expect_results(query, expected_results, expected_cols):
data = impalad_client.execute(query)
assert len(data.data) == len(expected_results)
for r in expected_results:
assert r in data.data
expected_col_labels = expected_cols['labels']
expected_col_types = expected_cols['types']
assert data.column_labels == expected_col_labels
assert data.column_types == expected_col_types
def expect_for_count_star(query, expected):
data = impalad_client.execute(query)
assert len(data.data) == 1
assert expected in data.data
assert "NumRowGroups" not in data.runtime_profile
assert "NumFileMetadataRead" not in data.runtime_profile
def expect_results_t(ts, expected_results, expected_cols):
expect_results(
"select * from {0} for system_time as of {1}".format(tbl_name, ts),
expected_results, expected_cols)
def expect_for_count_star_t(ts, expected):
expect_for_count_star(
"select count(*) from {0} for system_time as of {1}".format(tbl_name, ts),
expected)
def expect_snapshot_id_in_plan_t(ts, snapshot_id):
data = impalad_client.execute(
"explain select * from {0} for system_time as of {1}".format(
tbl_name, ts))
assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data
def expect_results_v(snapshot_id, expected_results, expected_cols):
expect_results(
"select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id),
expected_results, expected_cols)
def expect_for_count_star_v(snapshot_id, expected):
expect_for_count_star(
"select count(*) from {0} for system_version as of {1}".format(
tbl_name, snapshot_id),
expected)
def expect_snapshot_id_in_plan_v(snapshot_id):
data = impalad_client.execute(
"explain select * from {0} for system_version as of {1}".format(
tbl_name, snapshot_id))
assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data
def impala_now():
now_data = impalad_client.execute("select now()")
return now_data.data[0]
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
# Iceberg doesn't create a snapshot entry for the initial empty table
impalad_client.execute("create table {0} (i int) stored as iceberg"
.format(tbl_name))
ts_1 = self.execute_query_ts(impalad_client, "insert into {0} values (1)"
.format(tbl_name))
ts_2 = self.execute_query_ts(impalad_client, "insert into {0} values (2)"
.format(tbl_name))
ts_3 = self.execute_query_ts(impalad_client, "truncate table {0}".format(tbl_name))
time.sleep(5)
ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)"
.format(tbl_name))
ts_no_ss = self.execute_query_ts(impalad_client,
"alter table {0} add column {1} bigint"
.format(tbl_name, "j"))
ts_5 = self.execute_query_ts(impalad_client, "insert into {0} (i,j) values (3, 103)"
.format(tbl_name))
# Descriptions of the different schemas we expect to see as Time Travel queries
# use the schema from the specified time or snapshot.
#
# When the schema is just the 'J' column.
j_cols = {
'labels': ['J'],
'types': ['BIGINT']
}
# When the schema is just the 'I' column.
i_cols = {
'labels': ['I'],
'types': ['INT']
}
# When the schema is the 'I' and 'J' columns.
ij_cols = {
'labels': ['I', 'J'],
'types': ['INT', 'BIGINT']
}
# Query table as of timestamps.
expect_results_t("now()", ['100\tNULL', '3\t103'], ij_cols)
expect_results_t(quote(ts_1), ['1'], i_cols)
expect_results_t(quote(ts_2), ['1', '2'], i_cols)
expect_results_t(quote(ts_3), [], i_cols)
expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [], i_cols)
expect_results_t(quote(ts_4), ['100'], i_cols)
expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [], i_cols)
# There is no new snapshot created by the schema change between ts_4 and ts_no_ss.
# So at ts_no_ss we see the schema as of ts_4
expect_results_t(quote(ts_no_ss), ['100'], i_cols)
expect_results_t(quote(ts_5), ['100\tNULL', '3\t103'], ij_cols)
# Future queries return the current snapshot.
expect_results_t(cast_ts(ts_5) + " + interval 1 hours", ['100\tNULL', '3\t103'],
ij_cols)
# Query table as of snapshot IDs.
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
expect_snapshot_id_in_plan_v(snapshots[0].get_snapshot_id())
expect_snapshot_id_in_plan_v(snapshots[1].get_snapshot_id())
expect_snapshot_id_in_plan_v(snapshots[2].get_snapshot_id())
expect_snapshot_id_in_plan_v(snapshots[3].get_snapshot_id())
expect_snapshot_id_in_plan_v(snapshots[4].get_snapshot_id())
expect_snapshot_id_in_plan_t(quote(ts_1), snapshots[0].get_snapshot_id())
expect_snapshot_id_in_plan_t(quote(ts_2), snapshots[1].get_snapshot_id())
expect_snapshot_id_in_plan_t(quote(ts_3), snapshots[2].get_snapshot_id())
expect_snapshot_id_in_plan_t(quote(ts_4), snapshots[3].get_snapshot_id())
expect_snapshot_id_in_plan_t(quote(ts_5), snapshots[4].get_snapshot_id())
# Test of plain count star optimization
# 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
expect_for_count_star_t("now()", '2')
expect_for_count_star_t(quote(ts_1), '1')
expect_for_count_star_t(quote(ts_2), '2')
expect_for_count_star_t(quote(ts_3), '0')
expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0')
expect_for_count_star_t(quote(ts_4), '1')
expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
expect_for_count_star_t(cast_ts(ts_5), '2')
expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
# SELECT diff
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
tbl=tbl_name, ts_new=ts_2, ts_old=ts_1),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
# Mix SYSTEM_TIME and SYSTEM_VERSION
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
SELECT *, NULL FROM {tbl} FOR SYSTEM_TIME
AS OF '{ts_old}'""".format(
tbl=tbl_name, ts_new=ts_5, ts_old=ts_4),
['3\t103'], ij_cols)
# Query old snapshot
try:
impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_TIME AS OF {1}".format(
tbl_name, "now() - interval 2 years"))
assert False # Exception must be thrown
except Exception as e:
assert "Cannot find a snapshot older than" in str(e)
# Query invalid snapshot
try:
impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_VERSION AS OF 42".format(
tbl_name))
assert False # Exception must be thrown
except Exception as e:
assert "Cannot find snapshot with ID 42" in str(e)
# Go back to one column
impalad_client.execute("alter table {0} drop column i".format(tbl_name))
# Test that deleted column is not selectable.
try:
impalad_client.execute("SELECT i FROM {0}".format(tbl_name))
assert False # Exception must be thrown
except Exception as e:
assert "Could not resolve column/field reference: 'i'" in str(e)
# Back at ts_2 the deleted 'I' column is there
expect_results("SELECT * FROM {0} FOR SYSTEM_TIME AS OF '{1}'".
format(tbl_name, ts_2), ['1', '2'], i_cols)
expect_results("SELECT i FROM {0} FOR SYSTEM_TIME AS OF '{1}'".
format(tbl_name, ts_2), ['1', '2'], i_cols)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE
impalad_client.execute("truncate table {0}".format(tbl_name))
impalad_client.execute("insert into {0} values (1111)".format(tbl_name))
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
now_budapest = impala_now()
expect_results_t(quote(now_budapest), ['1111'], j_cols)
# Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
now_tokyo = impala_now()
expect_results_t(quote(now_tokyo), ['1111'], j_cols)
try:
# Interpreting Budapest time in Tokyo time points to the past when the table
# didn't exist.
expect_results_t(quote(now_budapest), [], j_cols)
assert False
except Exception as e:
assert "Cannot find a snapshot older than" in str(e)
def test_time_travel_queries(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-time-travel', vector, use_db=unique_database)
@SkipIf.not_dfs
def test_strings_utf8(self, unique_database):
# Create table
table_name = "ice_str_utf8"
qualified_table_name = "%s.%s" % (unique_database, table_name)
query = 'create table %s (a string) stored as iceberg' % qualified_table_name
self.client.execute(query)
# Inserted string data should have UTF8 annotation regardless of query options.
query = 'insert into %s values ("impala")' % qualified_table_name
self.execute_query(query, {'parquet_annotate_strings_utf8': False})
# Copy the created file to the local filesystem and parse metadata
local_file = '/tmp/iceberg_utf8_test_%s.parq' % random.randint(0, 10000)
LOG.info("test_strings_utf8 local file name: " + local_file)
hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/data/*.parq'
% (unique_database, table_name))
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_file, local_file])
metadata = get_parquet_metadata(local_file)
# Extract SchemaElements corresponding to the table column
a_schema_element = metadata.schema[1]
assert a_schema_element.name == 'a'
# Check that the schema uses the UTF8 annotation
assert a_schema_element.converted_type == ConvertedType.UTF8
os.remove(local_file)
# Get hdfs path to manifest list that belongs to the snapshot identified by
# 'snapshot_counter'.
def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name,
snapshot_counter):
local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000))
hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json'
% (db_name, table_name, snapshot_counter))
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
manifest_list_hdfs_path = None
try:
with open(local_path, 'r') as fp:
metadata = json.load(fp)
current_snapshot_id = metadata['current-snapshot-id']
for snapshot in metadata['snapshots']:
if snapshot['snapshot-id'] == current_snapshot_id:
manifest_list_hdfs_path = snapshot['manifest-list']
break
finally:
os.remove(local_path)
return manifest_list_hdfs_path
# Get list of hdfs paths to manifest files from the manifest list avro file.
def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path):
local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000))
check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path])
manifest_hdfs_path_list = []
reader = None
try:
with open(local_path, 'rb') as fp:
reader = DataFileReader(fp, DatumReader())
for manifest in reader:
manifest_hdfs_path_list.append(manifest['manifest_path'])
finally:
if reader:
reader.close()
os.remove(local_path)
return manifest_hdfs_path_list
# Get 'data_file' structs from avro manifest files.
def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list):
datafiles = []
for hdfs_path in manifest_hdfs_path_list:
local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000))
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
reader = None
try:
with open(local_path, 'rb') as fp:
reader = DataFileReader(fp, DatumReader())
datafiles.extend([rec['data_file'] for rec in reader])
finally:
if reader:
reader.close()
os.remove(local_path)
return datafiles
def get_latest_metadata_path(self, database_name, table_name):
describe = 'describe extended %s.%s ' % (database_name, table_name)
output = self.client.execute(describe)
metadata_location = [s for s in output.data if s.startswith('\tmetadata_location')]
assert len(metadata_location) == 1
metadata_location_split = metadata_location[0].split('\t')
assert len(metadata_location_split) == 3
metadata_path = metadata_location_split[2]
return metadata_path
# Get the current partition spec as JSON from the latest metadata file
def get_current_partition_spec(self, database_name, table_name):
hdfs_path = self.get_latest_metadata_path(database_name, table_name)
output = check_output(['hadoop', 'fs', '-cat', hdfs_path])
current_partition_spec = None
metadata = json.loads(output)
current_spec_id = metadata['default-spec-id']
for spec in metadata['partition-specs']:
if spec['spec-id'] == current_spec_id:
current_partition_spec = spec
break
return current_partition_spec
@SkipIf.not_dfs
def test_partition_spec_update_v1(self, unique_database):
# Create table
table_name = "ice_part"
qualified_table_name = "%s.%s" % (unique_database, table_name)
create_table = """create table {}
(s string, i int) partitioned by spec(truncate(5, s), identity(i))
stored as iceberg
tblproperties ('format-version'='1')""".format(qualified_table_name)
self.client.execute(create_table)
partition_spec = self.get_current_partition_spec(unique_database, table_name)
assert len(partition_spec['fields']) == 2
truncate_s = partition_spec['fields'][0]
identity_i = partition_spec['fields'][1]
# At table creation, partition names does not contain the parameter value.
assert truncate_s['name'] == 's_trunc'
assert identity_i['name'] == 'i'
assert truncate_s['field-id'] == 1000
assert identity_i['field-id'] == 1001
# Partition evolution
partition_evolution = 'alter table %s set partition ' \
'spec(identity(i), truncate(6,s))' % qualified_table_name
self.client.execute(partition_evolution)
# V1 partition evolution keeps the old, modified fields, but changes their
# transform to VOID.
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
assert len(evolved_partition_spec['fields']) == 3
old_truncate_s = evolved_partition_spec['fields'][0]
identity_i = evolved_partition_spec['fields'][1]
truncate_s = evolved_partition_spec['fields'][2]
assert old_truncate_s['name'] == 's_trunc'
assert identity_i['name'] == 'i'
# Modified field name contains the parameter value.
assert truncate_s['name'] == 's_trunc_6'
assert old_truncate_s['field-id'] == 1000
assert identity_i['field-id'] == 1001
# field-id increases for the modified field.
assert truncate_s['field-id'] == 1002
assert old_truncate_s['transform'] == 'void'
@SkipIf.not_dfs
def test_partition_spec_update_v2(self, unique_database):
# Create table
table_name = "ice_part"
qualified_table_name = "%s.%s" % (unique_database, table_name)
create_table = 'create table %s ' \
'(s string, i int) partitioned by spec(truncate(5, s), identity(i)) ' \
'stored as iceberg tblproperties ("format-version" = "2")' \
% qualified_table_name
self.client.execute(create_table)
partition_spec = self.get_current_partition_spec(unique_database, table_name)
assert len(partition_spec['fields']) == 2
truncate_s = partition_spec['fields'][0]
identity_i = partition_spec['fields'][1]
# At table creation, partition names does not contain the parameter value.
assert truncate_s['name'] == 's_trunc'
assert identity_i['name'] == 'i'
assert truncate_s['field-id'] == 1000
assert identity_i['field-id'] == 1001
# Partition evolution for s
partition_evolution = 'alter table %s set partition ' \
'spec(identity(i), truncate(6,s))' % qualified_table_name
self.client.execute(partition_evolution)
# V2 partition evolution updates the previous partitioning for
# the modified field.
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
assert len(evolved_partition_spec['fields']) == 2
identity_i = evolved_partition_spec['fields'][0]
truncate_s = evolved_partition_spec['fields'][1]
assert identity_i['name'] == 'i'
# Modified field name contains the parameter value.
assert truncate_s['name'] == 's_trunc_6'
assert identity_i['field-id'] == 1001
# field-id increased for the modified field.
assert truncate_s['field-id'] == 1002
# Partition evolution for i and s
partition_evolution = 'alter table %s set partition ' \
'spec(bucket(4, i), truncate(3,s))' \
% qualified_table_name
self.client.execute(partition_evolution)
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
assert len(evolved_partition_spec['fields']) == 2
bucket_i = evolved_partition_spec['fields'][0]
truncate_s = evolved_partition_spec['fields'][1]
# The field naming follows the transform changes.
assert bucket_i['name'] == 'i_bucket_4'
assert truncate_s['name'] == 's_trunc_3'
# field-id's are increasing with each modification
assert bucket_i['field-id'] == 1003
assert truncate_s['field-id'] == 1004
@SkipIf.not_dfs
def test_writing_metrics_to_metadata_v1(self, unique_database):
self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v1', '1')
@SkipIf.not_dfs
def test_writing_metrics_to_metadata_v2(self, unique_database):
self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v2', '2')
def _test_writing_metrics_to_metadata_impl(self, unique_database, table_name, version):
# Create table
qualified_table_name = "%s.%s" % (unique_database, table_name)
query = """create table {}
(s string, i int, b boolean, bi bigint, ts timestamp, dt date,
dc decimal(10, 3))
stored as iceberg
tblproperties ('format-version'='{}')""".format(qualified_table_name, version)
self.client.execute(query)
# Insert data
# 1st data file:
query = 'insert into %s values ' \
'("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \
'("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \
'("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \
'(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \
% qualified_table_name
self.execute_query(query)
# 2nd data file:
query = 'insert into %s values ' \
'(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \
'(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \
% qualified_table_name
self.execute_query(query)
# Get hdfs path to manifest list file
manifest_list_hdfs_path = self.get_manifest_list_hdfs_path(
'/tmp/iceberg_metrics_test', unique_database, table_name, '00002')
# Get the list of hdfs paths to manifest files
assert manifest_list_hdfs_path is not None
manifest_hdfs_path_list = self.get_manifest_hdfs_path_list(
'/tmp/iceberg_metrics_test', manifest_list_hdfs_path)
# Get 'data_file' records from manifest files.
assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0
datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test',
manifest_hdfs_path_list)
# Check column stats in datafiles
assert datafiles is not None and len(datafiles) == 2
# The 1st datafile contains the 2 NULL rows
assert datafiles[0]['record_count'] == 2
assert datafiles[0]['column_sizes'] == \
[{'key': 1, 'value': 39},
{'key': 2, 'value': 39},
{'key': 3, 'value': 25},
{'key': 4, 'value': 39},
{'key': 5, 'value': 39},
{'key': 6, 'value': 39},
{'key': 7, 'value': 39}]
assert datafiles[0]['value_counts'] == \
[{'key': 1, 'value': 2},
{'key': 2, 'value': 2},
{'key': 3, 'value': 2},
{'key': 4, 'value': 2},
{'key': 5, 'value': 2},
{'key': 6, 'value': 2},
{'key': 7, 'value': 2}]
assert datafiles[0]['null_value_counts'] == \
[{'key': 1, 'value': 2},
{'key': 2, 'value': 2},
{'key': 3, 'value': 2},
{'key': 4, 'value': 2},
{'key': 5, 'value': 2},
{'key': 6, 'value': 2},
{'key': 7, 'value': 2}]
# Upper/lower bounds should be empty lists
assert datafiles[0]['lower_bounds'] == []
assert datafiles[0]['upper_bounds'] == []
# 2nd datafile
assert datafiles[1]['record_count'] == 4
assert datafiles[1]['column_sizes'] == \
[{'key': 1, 'value': 66},
{'key': 2, 'value': 56},
{'key': 3, 'value': 26},
{'key': 4, 'value': 59},
{'key': 5, 'value': 68},
{'key': 6, 'value': 56},
{'key': 7, 'value': 53}]
assert datafiles[1]['value_counts'] == \
[{'key': 1, 'value': 4},
{'key': 2, 'value': 4},
{'key': 3, 'value': 4},
{'key': 4, 'value': 4},
{'key': 5, 'value': 4},
{'key': 6, 'value': 4},
{'key': 7, 'value': 4}]
assert datafiles[1]['null_value_counts'] == \
[{'key': 1, 'value': 1},
{'key': 2, 'value': 1},
{'key': 3, 'value': 2},
{'key': 4, 'value': 2},
{'key': 5, 'value': 1},
{'key': 6, 'value': 1},
{'key': 7, 'value': 2}]
assert datafiles[1]['lower_bounds'] == \
[{'key': 1, 'value': b'abc'},
# INT is serialized as 4-byte little endian
{'key': 2, 'value': b'\x00\x00\x00\x00'},
# BOOLEAN is serialized as 0x00 for FALSE
{'key': 3, 'value': b'\x00'},
# BIGINT is serialized as 8-byte little endian
{'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'},
# TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
# 1970-01-01 00:00:00)
{'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
# DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
{'key': 6, 'value': b'\x93\xfe\xff\xff'},
# Unlike other numerical values, DECIMAL is serialized as big-endian.
{'key': 7, 'value': b'\xd8\xf0'}]
assert datafiles[1]['upper_bounds'] == \
[{'key': 1, 'value': b'ghij'},
# INT is serialized as 4-byte little endian
{'key': 2, 'value': b'\x03\x00\x00\x00'},
# BOOLEAN is serialized as 0x01 for TRUE
{'key': 3, 'value': b'\x01'},
# BIGINT is serialized as 8-byte little endian
{'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'},
# TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
# 1970-01-01 00:00:00)
{'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'},
# DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
{'key': 6, 'value': b'\x6d\x01\x00\x00'},
# Unlike other numerical values, DECIMAL is serialized as big-endian.
{'key': 7, 'value': b'\x00\xdc\x14'}]
def test_using_upper_lower_bound_metrics(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector,
use_db=unique_database)
def test_writing_many_files(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-write-many-files', vector,
use_db=unique_database)
@pytest.mark.execute_serially
def test_writing_many_files_stress(self, vector, unique_database):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector,
use_db=unique_database)
@pytest.mark.execute_serially
def test_table_load_time_for_many_files(self, unique_database):
if self.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
tbl_name = unique_database + ".iceberg_many_files"
self.execute_query("""CREATE TABLE {}
PARTITIONED BY SPEC (bucket(2039, l_orderkey))
STORED AS ICEBERG
AS SELECT * FROM tpch_parquet.lineitem""".format(tbl_name))
self.execute_query("invalidate metadata")
start_time = time.time()
self.execute_query("describe formatted {}".format(tbl_name))
elapsed_time = time.time() - start_time
if IS_HDFS:
time_limit = 10
else:
time_limit = 20
assert elapsed_time < time_limit
def test_consistent_scheduling(self, unique_database):
"""IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for
Iceberg tables."""
def collect_split_stats(profile):
splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s]
splits.sort()
return splits
with self.create_impala_client() as impalad_client:
impalad_client.execute("use " + unique_database)
impalad_client.execute("""create table line_ice stored as iceberg
as select * from tpch_parquet.lineitem""")
first_result = impalad_client.execute("""select count(*) from line_ice""")
ref_profile = first_result.runtime_profile
ref_split_stats = collect_split_stats(ref_profile)
for i in range(0, 10):
# Subsequent executions of the same query should schedule scan ranges similarly.
result = impalad_client.execute("""select count(*) from line_ice""")
profile = result.runtime_profile
split_stats = collect_split_stats(profile)
assert ref_split_stats == split_stats
def test_scheduling_partitioned_tables(self, unique_database):
"""IMPALA-12765: Balance consecutive partitions better for Iceberg tables"""
# We are setting the replica_preference query option in this test, so let's create a
# local impala client.
inventory_tbl = "inventory_ice"
item_tbl = "item_ice"
date_dim_tbl = "date_dim_ice"
with self.create_impala_client() as impalad_client:
impalad_client.execute("use " + unique_database)
impalad_client.execute("set replica_preference=remote")
impalad_client.execute("""
CREATE TABLE {}
PARTITIONED BY SPEC (inv_date_sk)
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.inventory;
""".format(inventory_tbl))
impalad_client.execute("""
CREATE TABLE {}
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.item;
""".format(item_tbl))
impalad_client.execute("""
CREATE TABLE {}
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.date_dim;
""".format(date_dim_tbl))
q22_result = impalad_client.execute("""
select i_product_name, i_brand, i_class, i_category,
avg(inv_quantity_on_hand) qoh
from inventory_ice, date_dim_ice, item_ice
where inv_date_sk=d_date_sk and
inv_item_sk=i_item_sk and
d_month_seq between 1199 and 1199 + 11
group by rollup(i_product_name, i_brand, i_class, i_category)
order by qoh, i_product_name, i_brand, i_class, i_category
limit 100
""")
profile = q22_result.runtime_profile
# "Files rejected:" contains the number of files being rejected by runtime
# filters. With IMPALA-12765 we should see similar numbers for each executor.
files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile)
avg_files_rejected = int(files_rejected_array[0])
THRESHOLD = 3
for files_rejected_str in files_rejected_array:
files_rejected = int(files_rejected_str)
if files_rejected != 0:
assert abs(avg_files_rejected - files_rejected) < THRESHOLD
def test_in_predicate_push_down(self, vector, unique_database):
self.execute_query("SET RUNTIME_FILTER_MODE=OFF")
self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,
use_db=unique_database)
def test_is_null_predicate_push_down(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-is-null-predicate-push-down', vector,
use_db=unique_database)
def test_compound_predicate_push_down(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-compound-predicate-push-down', vector,
use_db=unique_database)
def test_plain_count_star_optimization(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-plain-count-star-optimization', vector,
use_db=unique_database)
def test_create_table_like_table(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-create-table-like-table', vector,
use_db=unique_database)
def test_table_owner(self, vector, unique_database):
self.run_table_owner_test(vector, unique_database, "some_random_user")
self.run_table_owner_test(vector, unique_database, "another_random_user")
def run_table_owner_test(self, vector, db_name, user_name):
# Create Iceberg table with a given user running the query.
tbl_name = "iceberg_table_owner"
sql_stmt = 'CREATE TABLE {0}.{1} (i int) STORED AS ICEBERG'.format(
db_name, tbl_name)
args = ['-u', user_name, '-q', sql_stmt]
run_impala_shell_cmd(vector, args)
# Run DESCRIBE FORMATTED to get the owner of the table.
args = ['-q', 'DESCRIBE FORMATTED {0}.{1}'.format(db_name, tbl_name)]
results = run_impala_shell_cmd(vector, args)
result_rows = results.stdout.strip().split('\n')
# Find the output row with the owner.
owner_row = ""
for row in result_rows:
if "Owner:" in row:
owner_row = row
assert owner_row != "", "DESCRIBE output doesn't contain owner" + results.stdout
# Verify that the user running the query is the owner of the table.
assert user_name in owner_row, "Unexpected owner of Iceberg table. " + \
"Expected user name: {0}. Actual output row: {1}".format(user_name, owner_row)
args = ['-q', 'DROP TABLE {0}.{1}'.format(db_name, tbl_name)]
results = run_impala_shell_cmd(vector, args)
def test_mixed_file_format(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
unique_database)
def test_load(self, vector, unique_database):
"""Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the
target directory, copies existing test data to HDFS. The second part runs the test
cases then cleans up the test directory.
"""
# Test 1-6 init: target orc/parquet file and directory
SRC_DIR = os.path.join(os.environ['IMPALA_HOME'],
"testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}")
DST_DIR = "/tmp/" + unique_database + "/parquet/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
"fff150b6136a-job_16619542960420_0002-1-00001.parquet"
file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
"27ff880faff0-job_16619542960420_0004-1-00001.parquet"
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR)
DST_DIR = "/tmp/" + unique_database + "/orc/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \
"ee500a19c1d1-job_16619542960420_0003-1-00001.orc"
file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \
"519c5500ed04-job_16619542960420_0004-1-00001.orc"
self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR)
self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR)
# Test 7 init: overwrite
DST_DIR = "/tmp/" + unique_database + "/overwrite/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
# Test 8 init: mismatching parquet schema format
SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/"
"iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}")
DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet"
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
# Test 9 init: partitioned
DST_DIR = "/tmp/" + unique_database + "/partitioned/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
# Test 10 init: hidden files
DST_DIR = "/tmp/" + unique_database + "/hidden/"
self.filesystem_client.make_dir(DST_DIR, permission=777)
self.filesystem_client.create_file(DST_DIR + "_hidden.1", "Test data 123")
self.filesystem_client.create_file(DST_DIR + "_hidden_2.1", "Test data 123")
self.filesystem_client.create_file(DST_DIR + ".hidden_3", "Test data 123")
self.filesystem_client.create_file(DST_DIR + ".hidden_4.1", "Test data 123")
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
# Init test table
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_mixed_file_format_test", "parquet")
# Execute tests
self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database)
# Clean up temporary directory
self.filesystem_client.delete_file_dir("/tmp/{0}".format(unique_database), True)
def test_table_sampling(self, vector):
self.run_test_case('QueryTest/iceberg-tablesample', vector,
use_db="functional_parquet")
def _create_table_like_parquet_helper(self, vector, unique_database, tbl_name,
expect_success):
create_table_from_parquet(self.client, unique_database, tbl_name)
args = ['-q', "show files in {0}.{1}".format(unique_database, tbl_name)]
results = run_impala_shell_cmd(vector, args)
result_rows = results.stdout.strip().split('\n')
hdfs_file = None
for row in result_rows:
if "://" in row:
hdfs_file = row.split('|')[1].lstrip()
break
assert hdfs_file
iceberg_tbl_name = "iceberg_{0}".format(tbl_name)
sql_stmt = "create table {0}.{1} like parquet '{2}' stored as iceberg".format(
unique_database, iceberg_tbl_name, hdfs_file
)
args = ['-q', sql_stmt]
return run_impala_shell_cmd(vector, args, expect_success=expect_success)
def test_create_table_like_parquet(self, vector, unique_database):
tbl_name = 'alltypes_tiny_pages'
# Not all types are supported by iceberg
self._create_table_like_parquet_helper(vector, unique_database, tbl_name, False)
tbl_name = "create_table_like_parquet_test"
results = self._create_table_like_parquet_helper(vector, unique_database, tbl_name,
True)
result_rows = results.stdout.strip().split('\n')
assert result_rows[3].split('|')[1] == ' Table has been created. '
sql_stmt = "describe {0}.{1}".format(unique_database, tbl_name)
args = ['-q', sql_stmt]
parquet_results = run_impala_shell_cmd(vector, args)
parquet_result_rows = parquet_results.stdout.strip().split('\n')
parquet_column_name_type_list = []
for row in parquet_result_rows[1:-2]:
parquet_column_name_type_list.append(row.split('|')[1:3])
sql_stmt = "describe {0}.iceberg_{1}".format(unique_database, tbl_name)
args = ['-q', sql_stmt]
iceberg_results = run_impala_shell_cmd(vector, args)
iceberg_result_rows = iceberg_results.stdout.strip().split('\n')
iceberg_column_name_type_list = []
for row in iceberg_result_rows[1:-2]:
iceberg_column_name_type_list.append(row.split('|')[1:3])
assert parquet_column_name_type_list == iceberg_column_name_type_list
@SkipIfFS.hive
def test_hive_external_forbidden(self, unique_database):
tbl_name = unique_database + ".hive_ext"
error_msg = ("cannot be loaded because it is an EXTERNAL table in the HiveCatalog "
"that points to another table. Query the original table instead.")
self.execute_query("create table {0} (i int) stored by iceberg".
format(tbl_name))
# 'iceberg.table_identifier' can refer to another table
self.run_stmt_in_hive("""alter table {0} set tblproperties
('external.table.purge'='false',
'iceberg.table_identifier'='functional_iceberg.iceberg_partitioned')""".
format(tbl_name))
ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
assert error_msg in str(ex)
# 'iceberg.mr.table.identifier' can refer to another table
self.run_stmt_in_hive("""
alter table {0} unset tblproperties('iceberg.table_identifier')""".
format(tbl_name))
self.run_stmt_in_hive("""alter table {0} set tblproperties
('iceberg.mr.table.identifier'='functional_iceberg.iceberg_partitioned')""".
format(tbl_name))
ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
assert error_msg in str(ex)
# 'name' can also refer to another table but cannot be set by Hive/Impala. Also,
# during table migration both Impala and Hive clears existing table properties
# See IMPALA-12410
@SkipIfFS.incorrent_reported_ec
def test_compute_stats(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-compute-stats', vector, unique_database)
def test_virtual_columns(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-virtual-columns', vector, unique_database)
def test_avro_file_format(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-avro', vector, unique_database)
def test_convert_table(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables',
vector, unique_database)
def test_table_exists(self, unique_database):
"""Test that iceberg AlreadyExistsException are correctly handled."""
tbl_name = unique_database + ".create_iceberg_exists"
# Attempt to create an iceberg table, simulating AlreadyExistsException
iceberg_created_options = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
'IcebergAlreadyExistsException@Table was created concurrently'}
err = self.execute_query_expect_failure(self.client,
"create table {0} (i int) stored as iceberg".format(tbl_name),
query_options=iceberg_created_options)
assert "AlreadyExistsException: Table already exists" in str(err)
self.execute_query_expect_success(self.client,
"create table if not exists {0} (i int) stored as iceberg".format(tbl_name),
query_options=iceberg_created_options)
def test_abort_transaction(self, unique_database):
"""Test that iceberg operations fail correctly when an Iceberg transaction commit
fails, and that the effects of the failed operation are not visible."""
tbl_name = unique_database + ".abort_iceberg_transaction"
# The query options that inject an iceberg transaction commit failure.
abort_ice_transaction_options = {'debug_action':
'CATALOGD_ICEBERG_COMMIT:EXCEPTION@'
'CommitFailedException@'
'simulated commit failure'}
# Create an iceberg table and insert a row.
self.client.execute("""create table {0} (i int)
stored as iceberg""".format(tbl_name))
self.execute_query_expect_success(self.client,
"insert into {0} values (1);".format(tbl_name))
# Run a query that would insert a row, but pass the query options that
# will cause the iceberg transaction to abort.
err = self.execute_query_expect_failure(self.client,
"insert into {0} values (2);".format(tbl_name),
query_options=abort_ice_transaction_options)
# Check that the error message looks reasonable.
result = str(err)
assert error_msg_startswith(result,
"ImpalaRuntimeException: simulated commit failure\n"
"CAUSED BY: CommitFailedException: simulated commit failure")
# Check that no data was inserted.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(tbl_name))
assert data.column_labels == ['I']
assert len(data.data) == 1
assert data.data[0] == '1'
# Run a query that would add a column to the table, but pass the query options that
# will cause the iceberg transaction to abort.
ddl_err = self.execute_query_expect_failure(self.client,
"alter table {0} add column {1} bigint"
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
ddl_result = str(ddl_err)
# Check that the error message looks reasonable.
assert error_msg_startswith(ddl_result,
"CommitFailedException: simulated commit failure")
# Check that no column was added.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(tbl_name))
assert data.column_labels == ['I']
assert len(data.data) == 1
assert data.data[0] == '1'
def test_drop_partition(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-drop-partition', vector,
use_db=unique_database)
def test_rollback_after_drop_partition(self, unique_database):
table_name = "iceberg_drop_partition_rollback"
qualified_table_name = "{}.{}".format(unique_database, table_name)
create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int int)
PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG""".format(qualified_table_name)
insert_into_stmt = """INSERT INTO {} values(1, 2)""".format(qualified_table_name)
drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 1)""".format(
qualified_table_name)
self.execute_query(create_table_stmt)
self.execute_query(insert_into_stmt)
self.execute_query(drop_partition_stmt)
snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=2)
rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format(
qualified_table_name, snapshots[0].get_snapshot_id())
# Rollback before DROP PARTITION
self.execute_query(rollback)
snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=3)
assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id()
assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id()
assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time()
def test_show_files_partition(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-show-files-partition', vector,
use_db=unique_database)
def test_scan_metrics_in_profile_basic(self, vector):
self.run_test_case('QueryTest/iceberg-scan-metrics-basic', vector)
class TestIcebergV2Table(IcebergTestSuite):
"""Tests related to Iceberg V2 tables."""
@classmethod
def add_test_dimensions(cls):
super(TestIcebergV2Table, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
add_exec_option_dimension(cls, 'disable_optimized_iceberg_v2_read', [0, 1])
def should_run_for_hive(self, vector):
# Hive interop tests are very slow. Only run them for a subset of dimensions.
if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 0:
return True
return False
# The test uses pre-written Iceberg tables where the position delete files refer to
# the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the
# dockerised environment the namenode is accessible on a different hostname/port.
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_plain_count_star_optimization(self, vector):
self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization',
vector)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_position_deletes(self, vector):
# Remove 'batch_size' option so we can set it at .test file.
# Revisit this if 'batch_size' dimension size increase.
vector.unset_exec_option('batch_size')
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_position_deletes_orc(self, vector):
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc', vector)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
@pytest.mark.execute_serially
def test_read_position_deletes_compute_stats(self, vector):
"""Tests COMPUTE STATS on Iceberg V2 tables. Need to be executed serially
because it modifies tables that are used by other tests (e.g. multiple
instances of this test)."""
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-stats', vector)
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc-stats', vector)
@SkipIfFS.hive
def test_read_mixed_format_position_deletes(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-mixed-format-position-deletes',
vector, unique_database)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_null_delete_records(self, vector):
expected_error = 'NULL found as file_path in delete file'
query_options = vector.get_value('exec_option')
v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1
result = self.execute_query(
'select * from functional_parquet.iceberg_v2_null_delete_record', query_options)
assert len(result.data) == 6
errors = result.log
print(errors)
assert expected_error in errors or v2_op_disabled
result = self.execute_query(
'select count(*) from functional_parquet.iceberg_v2_null_delete_record',
query_options)
assert result.data == ['6']
errors = result.log
assert expected_error in errors or v2_op_disabled
result = self.execute_query(
"""select * from functional_parquet.iceberg_v2_null_delete_record
where j < 3""", query_options)
assert sorted(result.data) == ['1\t1', '2\t2']
errors = result.log
assert expected_error in errors or v2_op_disabled
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_read_equality_deletes(self, vector):
self.run_test_case('QueryTest/iceberg-v2-read-equality-deletes', vector)
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_table_sampling_v2(self, vector):
self.run_test_case('QueryTest/iceberg-tablesample-v2', vector,
use_db="functional_parquet")
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_scan_metrics_in_profile_with_deletes(self, vector):
def get_latest_snapshot_id(fq_tbl_name):
query = ("select snapshot_id from {}.snapshots order by committed_at desc"
.format(fq_tbl_name))
res = self.execute_query(query)
return res.data[0]
ice_db = "functional_parquet"
no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes")
no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes)
pos_delete_all_rows = "{}.{}".format(ice_db, "iceberg_v2_positional_delete_all_rows")
pos_delete_all_rows_snapshot_id = get_latest_snapshot_id(pos_delete_all_rows)
not_all_data_files_have_delete_files = "{}.{}".format(
ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files")
not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id(
not_all_data_files_have_delete_files)
self.run_test_case('QueryTest/iceberg-scan-metrics-with-deletes', vector,
test_file_vars={
"NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id,
"POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id,
"NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID":
not_all_data_files_have_delete_files_snapshot_id
})
@SkipIf.hardcoded_uris
def test_metadata_tables(self, vector, unique_database):
# Remove 'batch_size' option so we can set it at .test file.
# Revisit this if 'batch_size' dimension size increase.
vector.unset_exec_option('batch_size')
with self.create_impala_client() as impalad_client:
overwrite_snapshot_id = impalad_client.execute("select snapshot_id from "
"functional_parquet.iceberg_query_metadata.snapshots "
"where operation = 'overwrite';")
overwrite_snapshot_ts = impalad_client.execute("select committed_at from "
"functional_parquet.iceberg_query_metadata.snapshots "
"where operation = 'overwrite';")
self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
unique_database,
test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]),
'$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])})
@SkipIf.not_hdfs
def test_missing_data_files(self, vector, unique_database):
def list_files(tbl):
query_result = self.execute_query("select file_path from {}.`files`".format(tbl))
return query_result.data
def first_snapshot(tbl):
query_result = self.execute_query(
"select snapshot_id from {}.`snapshots` order by committed_at".format(tbl))
return query_result.data[0]
def insert_values(tbl, values):
self.execute_query("insert into {} values {}".format(tbl, values))
missing_files_nopart = unique_database + ".missing_files_nopart"
missing_files_part = unique_database + ".missing_files_part"
self.execute_query("""CREATE TABLE {} (i int, p int)
STORED BY ICEBERG
TBLPROPERTIES('format-version'='2')""".format(missing_files_nopart))
insert_values(missing_files_nopart, "(1, 1)")
first_file = set(list_files(missing_files_nopart))
insert_values(missing_files_nopart, "(2, 2)")
all_files = set(list_files(missing_files_nopart))
assert len(all_files) == 2
second_file = next(iter(all_files - first_file))
check_output(["hdfs", "dfs", "-rm", second_file])
self.execute_query("""CREATE TABLE {} (i int, p int)
PARTITIONED BY SPEC (p)
STORED BY ICEBERG
TBLPROPERTIES('format-version'='2')""".format(missing_files_part))
insert_values(missing_files_part, "(1, 1)")
insert_values(missing_files_part, "(2, 2)")
files = list_files(missing_files_part)
part_2_f = None
for f in files:
if "p=2" in f:
part_2_f = f
break
assert part_2_f is not None
check_output(["hdfs", "dfs", "-rm", part_2_f])
self.execute_query("invalidate metadata {}".format(missing_files_nopart))
self.execute_query("invalidate metadata {}".format(missing_files_part))
self.run_test_case('QueryTest/iceberg-missing-data-files', vector,
unique_database,
test_file_vars={
'$NOPART_FIRST_SNAPSHOT': first_snapshot(missing_files_nopart),
'$PART_FIRST_SNAPSHOT': first_snapshot(missing_files_part)})
def test_delete(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-delete', vector,
unique_database)
def test_delete_partitioned(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-delete-partitioned', vector,
unique_database)
if IS_HDFS and self.should_run_for_hive(vector):
self._delete_partitioned_hive_tests(unique_database)
def _delete_partitioned_hive_tests(self, db):
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "id_part"))
assert hive_output == "id_part.i,id_part.s\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "trunc_part"))
assert hive_output == "trunc_part.i,trunc_part.s\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "multi_part"))
assert hive_output == "multi_part.i,multi_part.s,multi_part.f\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
db, "evolve_part"))
assert hive_output == \
"evolve_part.i,evolve_part.s,evolve_part.f\n3,three,3.33\n" \
"30,thirty,30.3\n40,forty,40.4\n"
hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}.{}".format(
db, "ice_store_sales"))
assert hive_output == "_c0\n2601498\n"
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{}".format(
db, "ice_alltypes_part_v2"))
# Cut off the long header line.
hive_output = hive_output.split("\n", 1)
hive_output = hive_output[1]
assert hive_output == \
"2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n"
def test_large_scale_deletes(self, vector, unique_database):
if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 1:
pytest.skip("Only test the optimized v2 operator")
self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector,
unique_database)
@SkipIfFS.hive
def test_delete_hive_read(self, unique_database):
ice_delete = unique_database + ".ice_delete"
self.execute_query("""CREATE TABLE {} (i int, s string)
STORED BY ICEBERG
TBLPROPERTIES('format-version'='2')""".format(ice_delete))
self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete))
self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete))
self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete))
self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete))
# Hive needs table property 'format-version' explicitly set
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
ice_delete))
hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY i".format(ice_delete))
expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n"
assert hive_output == expected_output
ice_lineitem = unique_database + ".linteitem_ice"
self.execute_query("""CREATE TABLE {}
STORED BY ICEBERG
TBLPROPERTIES('format-version'='2')
AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem))
self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 0".format(ice_lineitem))
impala_result = self.execute_query("SELECT count(*) FROM {}".format(ice_lineitem))
assert impala_result.data[0] == "4799964"
# Hive needs table property 'format-version' explicitly set
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
ice_lineitem))
hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}".format(ice_lineitem))
assert hive_output == "_c0\n4799964\n"
@SkipIfFS.hive
def test_delete_complextypes_mixed_files(self, vector, unique_database):
ice_t = unique_database + ".ice_complex_delete"
self.run_stmt_in_hive("""create table {}
stored by iceberg stored as orc as
select * from functional_parquet.complextypestbl;""".format(ice_t))
# Hive needs table property 'format-version' explicitly set
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
ice_t))
self.run_stmt_in_hive("""alter table {}
set tblproperties ('write.format.default'='parquet')""".format(ice_t))
self.run_stmt_in_hive("""insert into {}
select * from functional_parquet.complextypestbl""".format(ice_t))
vector.get_value('exec_option')['expand_complex_types'] = True
self.run_test_case('QueryTest/iceberg-delete-complex', vector,
unique_database)
hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t))
# Test that Hive sees the same rows deleted.
assert hive_output == "id\n4\n5\n6\n7\n8\n"
def test_update_basic(self, vector, unique_database):
udf_location = get_fs_path('/test-warehouse/libTestUdfs.so')
self.run_test_case('QueryTest/iceberg-update-basic', vector,
unique_database, test_file_vars={'UDF_LOCATION': udf_location})
self._test_update_basic_snapshots(unique_database)
if IS_HDFS and self.should_run_for_hive(vector):
self._update_basic_hive_tests(unique_database)
def _test_update_basic_snapshots(self, db):
"""Verifies that the tables have the expected number of snapshots, and
the parent ids match the previous snapshot ids. See IMPALA-12708."""
self.validate_snapshots(db, "single_col", 3)
self.validate_snapshots(db, "ice_alltypes", 21)
self.validate_snapshots(db, "ice_id_partitioned", 7)
def validate_snapshots(self, db, tbl, expected_snapshots):
tbl_name = "{}.{}".format(db, tbl)
snapshots = get_snapshots(self.client, tbl_name,
expected_result_size=expected_snapshots)
parent_id = None
for s in snapshots:
assert s.get_parent_id() == parent_id
parent_id = s.get_snapshot_id()
def _update_basic_hive_tests(self, db):
def get_hive_results(tbl, order_by_col):
stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
hive_results = get_hive_results("single_col", "i")
assert hive_results == "1\n3\n4\n"
hive_results = get_hive_results("ice_alltypes", "bool_col")
assert hive_results == \
"false,0,111,0.0,0.0,234,123.00,2023-11-07,2000-01-01 00:00:00.0,IMPALA,zerob\n" \
"true,3,222,1.0,1.0,NULL,NULL,2023-11-08,2001-01-01 01:01:01.0,ICEBERG,oneb\n"
hive_results = get_hive_results("ice_id_partitioned", "i")
assert hive_results == \
"1,0,APACHE IMPALA\n" \
"2,0,iceberg\n" \
"3,0,hive\n" \
"5,2,Kudu\n" \
"10,1,Apache Spark\n"
hive_results = get_hive_results("ice_bucket_transform", "i")
assert hive_results == \
"2,a fairly long string value,1000,1999-09-19 12:00:01.0\n" \
"4,bbb,2030,2001-01-01 00:00:00.0\n" \
"6,cccccccccccccccccccccccccccccccccccccccc,-123,2023-11-24 17:44:30.0\n"
hive_results = get_hive_results("ice_time_transforms_timestamp", "id")
assert hive_results == \
"1.5000,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0\n" \
"2.4690,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0\n" \
"1999.9998,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0\n" # noqa: E501
hive_results = get_hive_results("ice_time_transforms_date", "id")
assert hive_results == \
"1.5000,2001-01-01,2001-01-01,2001-01-01\n" \
"2.4690,2023-11-24,2023-11-24,2023-11-24\n" \
"1999.9998,2199-12-31,2199-12-31,2199-12-31\n"
hive_results = get_hive_results("ice_part_transforms", "i")
assert hive_results == \
"1,2023-11-13 18:07:05.0,blue,1234\n" \
"3,2023-11-14 19:07:05.0,green,1700\n" \
"4,2023-11-13 18:07:23.0,gray,2500\n" \
"8,2023-11-01 00:11:11.0,black,722\n"
def test_update_partitions(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-update-partitions', vector,
unique_database)
if IS_HDFS and self.should_run_for_hive(vector):
self._update_partitions_hive_tests(unique_database)
def _update_partitions_hive_tests(self, db):
def get_hive_results(tbl, order_by_col):
stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
hive_results = get_hive_results("id_part", "i, s")
assert hive_results == \
"2,FIVE\n" \
"2,FOUR\n" \
"3,SIX\n" \
"5,ONE\n" \
"10,TWO\n" \
"15,THREE\n"
hive_results = get_hive_results("trunc_part", "i")
assert hive_results == \
"1,one\n" \
"5,five\n" \
"103,three\n" \
"1004,FOURfour\n" \
"1006,SIXsix\n" \
"1102,TWOtwo\n"
hive_results = get_hive_results("multi_part", "i")
assert hive_results == \
"0,void,3.14\n" \
"0,void,3.14\n" \
"0,void,3.14\n" \
"1,one,1.1\n" \
"3,three,3.33\n" \
"5,five,5.5\n" \
"111,fox,1.1\n"
hive_results = get_hive_results("evolve_part", "i")
assert hive_results == \
"1,one,1.1\n" \
"30,thirty,30.3\n" \
"40,forty,40.4\n" \
"50,fifty,50.5\n" \
"1003,three,3.33\n" \
"1010,ten,10.1\n" \
"1020,twenty,20.2\n" \
"1222,two,2.2\n"
hive_results = get_hive_results("date_day_part", "i")
assert hive_results == \
"11,1978-01-01\n" \
"12,1979-12-31\n" \
"13,1980-01-01\n" \
"14,2033-11-15\n"
hive_results = get_hive_results("ts_hour_part", "i")
assert hive_results == \
"101,1958-01-01 01:02:03.0\n" \
"102,1959-12-31 23:59:00.0\n" \
"103,1960-01-01 00:00:00.0\n" \
"104,2013-11-15 15:31:00.0\n"
hive_results = get_hive_results("ts_evolve_part", "i")
assert hive_results == \
"1001,1988-02-02 01:02:03.0\n" \
"1002,1990-02-01 23:59:00.0\n" \
"1003,1990-02-02 00:00:00.0\n" \
"1004,2043-12-16 15:31:00.0\n" \
"1111,NULL\n"
hive_results = get_hive_results("numeric_truncate", "id")
assert hive_results == \
"11,21,2111,531,75.20\n"
# HIVE-28048: Hive cannot run ORDER BY queries for Iceberg tables partitioned by
# decimal columns, so we order the results ourselves.
hive_results = self.run_stmt_in_hive("SELECT * FROM {}.{}".format(
db, "ice_alltypes_part_v2"))
# Throw away the header line and sort the results.
hive_results = hive_results.split("\n", 1)[1]
hive_results = hive_results.strip().split("\n")
hive_results.sort()
assert hive_results == [
"2,true,2,11,1.1,2.222,123.321,2022-04-22,impala",
"3,true,3,11,1.1,2.222,123.321,2022-05-22,impala"]
def test_optimize(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database)
expected_snapshots = 19
self.validate_snapshots(unique_database, "ice_optimize", expected_snapshots)
# The last operation was an OPTIMIZE TABLE statement.
# Check that time travel to the previous snapshot returns all results correctly.
tbl_name = unique_database + ".ice_optimize"
snapshots = get_snapshots(
self.client, tbl_name, expected_result_size=expected_snapshots)
snapshot_before_last = snapshots[-2]
result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name))
result_time_travel = self.execute_query(
"select * from {0} for system_version as of {1};".format(
tbl_name, snapshot_before_last.get_snapshot_id()))
assert result_after_opt.data.sort() == result_time_travel.data.sort()
def _check_file_filtering(self, tbl_name, threshold_mb, mode, had_partition_evolution):
threshold_bytes = threshold_mb * 1024 * 1024
DATA_FILE = "0"
DELETE_FILE = "1"
FileMetadata = namedtuple('FileMetadata', 'content, path, partition, size')
metadata_query = """select content, file_path, `partition`, file_size_in_bytes
from {0}.`files`;""".format(tbl_name)
result = self.execute_query(metadata_query)
files_before = set()
files_per_partition = defaultdict(set)
for line in result.data:
file = FileMetadata._make(line.split("\t"))
partition = file.partition
files_per_partition[partition].add(file)
files_before.add(file)
selected_files = set()
partitions_with_removed_files = set()
for partition, files in files_per_partition.items():
if len(files) > 1:
num_small_files = 0
# count small datafiles
for file in files:
if file.content == DATA_FILE and int(file.size) < threshold_bytes:
num_small_files += 1
for file in files:
# We assume that a delete file in a partition references all data files in
# that partition, because we cannot differentiate between data files
# with/without deletes.
if file.content == DELETE_FILE:
selected_files.update(files)
partitions_with_removed_files.add(partition)
break
# Only merge small files if there are at least 2 of them.
elif num_small_files > 1 and int(file.size) < threshold_bytes:
selected_files.add(file)
partitions_with_removed_files.add(partition)
self.execute_query(
"OPTIMIZE TABLE {0} (file_size_threshold_mb={1});".format(tbl_name, threshold_mb))
optimized_result = self.execute_query(metadata_query)
files_after = set()
for line in optimized_result.data:
file = FileMetadata._make(line.split("\t"))
files_after.add(file)
# Check the resulting files and the modified partitions after the OPTIMIZE operation.
# files_after = files_before - selected_files + 1 new file per partition
# The result should not contain the files that were selected and should contain 1 new
# file per written partition.
unchanged_files = files_before - selected_files
# Check that files that were not selected are still present in the result.
assert unchanged_files.issubset(files_after)
# Check that selected files are rewritten and not present in the result.
assert selected_files.isdisjoint(files_after)
new_files = files_after - unchanged_files
assert new_files == files_after - files_before
if mode == "NOOP":
assert selected_files == set([])
assert files_after == files_before
elif mode == "REWRITE_ALL":
assert selected_files == files_before
assert files_after.isdisjoint(files_before)
elif mode == "PARTIAL":
assert selected_files < files_before and selected_files != set([])
assert unchanged_files < files_after and unchanged_files != set([])
assert unchanged_files == files_after.intersection(files_before)
# Check that all delete files were merged.
for file in files_after:
assert file.content == DATA_FILE
# Check that there's only one new file in every partition.
partitions_with_new_files = set()
for file in new_files:
assert file.partition not in partitions_with_new_files
partitions_with_new_files.add(file.partition)
assert len(new_files) == len(partitions_with_new_files)
# WITH PARTITION EVOLUTION
# Only new partitions are written to.
# WITHOUT PARTITION EVOLUTION
if not had_partition_evolution:
# Check that 1 new content file is written in every updated partition.
assert len(new_files) == len(partitions_with_removed_files)
assert partitions_with_new_files == partitions_with_removed_files
def test_optimize_file_filtering(self, unique_database):
tbl_name = unique_database + ".ice_optimize_filter"
self.execute_query("""CREATE TABLE {0} partitioned by spec (l_linenumber)
STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')
AS SELECT * FROM tpch_parquet.lineitem
WHERE l_quantity < 10;""".format(tbl_name))
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
WHERE l_quantity>=10 AND l_quantity<=12;""".format(tbl_name))
# There are no delete files in the table, so this should be a no-op. Check that no new
# snapshot was created.
self._check_file_filtering(tbl_name, 0, "NOOP", False)
assert len(get_snapshots(self.client, tbl_name)) == 2
self._check_file_filtering(tbl_name, 5, "PARTIAL", False)
self._check_file_filtering(tbl_name, 50, "PARTIAL", False)
# Check that the following is a no-op, since the table is already in a compact form.
self._check_file_filtering(tbl_name, 100, "NOOP", False)
self.execute_query("""UPDATE {0} SET l_linenumber=7 WHERE l_linenumber>4 AND
l_linestatus='F';""".format(tbl_name))
self._check_file_filtering(tbl_name, 6, "PARTIAL", False)
self.execute_query("""ALTER TABLE {0} SET PARTITION SPEC(l_linestatus);"""
.format(tbl_name))
self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL'
AND l_linenumber<4;""".format(tbl_name))
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
WHERE l_quantity=13 AND l_linenumber<3;""".format(tbl_name))
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
WHERE l_quantity=14 AND l_linenumber<3;""".format(tbl_name))
# Merges the delete files and rewrites the small files.
self._check_file_filtering(tbl_name, 2, "PARTIAL", True)
# Rewrites the remaining small files (2MB <= file_size < 100MB).
self._check_file_filtering(tbl_name, 100, "PARTIAL", True)
self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL';"""
.format(tbl_name))
# All partitions have delete files, therefore the entire table is rewritten.
self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True)
def test_merge(self, vector, unique_database):
udf_location = get_fs_path('/test-warehouse/libTestUdfs.so')
self.run_test_case('QueryTest/iceberg-merge', vector, unique_database,
test_file_vars={'UDF_LOCATION': udf_location})
def test_merge_partition(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database)
def test_merge_partition_sort(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, unique_database)
def test_merge_long(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database)
def test_merge_star(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
def test_merge_equality_update(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_v2_delete_equality_partitioned", "parquet",
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database)
def test_merge_equality_insert(self, vector, unique_database):
create_iceberg_table_from_directory(self.client, unique_database,
"iceberg_v2_delete_equality_partitioned", "parquet",
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database)
def test_merge_duplicate_check(self, vector, unique_database):
"""Regression test for IMPALA-13932"""
# Remove 'num_nodes' option so we can set it at .test file.
vector.unset_exec_option('num_nodes')
self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, unique_database)
def test_writing_multiple_deletes_per_partition(self, vector, unique_database):
"""Test writing multiple delete files partition in a single DELETE operation."""
self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', vector,
use_db=unique_database)
def test_cleanup(self, unique_database):
"""Test that all uncommitted files written by Impala are removed from the file
system when a DML commit to an Iceberg table fails, and that the effects of the
failed operation are not visible."""
table_name = "iceberg_cleanup_failure"
fq_tbl_name = unique_database + "." + table_name
# The query options that inject an iceberg validation check failure.
fail_ice_commit_options = {'debug_action':
'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@'
'ValidationException@'
'simulated validation check failure'}
# Create an iceberg table and insert a row.
self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i int)
STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')""".format(fq_tbl_name))
self.execute_query_expect_success(self.client,
"insert into {0} values (1)".format(fq_tbl_name))
# Run a query that would update a row, but pass the query options that
# will cause the iceberg validation check to fail.
err = self.execute_query_expect_failure(self.client,
"update {0} set i=2 where i=1".format(fq_tbl_name),
query_options=fail_ice_commit_options)
# Check that we get the error message.
assert error_msg_startswith(str(err),
"ImpalaRuntimeException: simulated validation check failure\n"
"CAUSED BY: ValidationException: simulated validation check failure")
# Check that the table content was not updated.
data = self.execute_query_expect_success(self.client,
"select * from {0}".format(fq_tbl_name))
assert len(data.data) == 1
assert data.data[0] == '1'
# Check that the uncommitted data and delete files are removed from the file system
# and only the first data file remains.
table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
FILESYSTEM_PREFIX, unique_database, table_name)
files_result = check_output(["hdfs", "dfs", "-ls", table_location])
assert "Found 1 items" in bytes_to_str(files_result)
def test_predicate_push_down_hint(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector,
use_db=unique_database)
def test_partitions(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-partitions', vector, unique_database)
tbl_name = unique_database + ".ice_num_partitions"
snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4)
second_snapshot = snapshots[1]
time_travel_data = self.execute_query(
"SELECT * FROM {0} for system_version as of {1};".format(
tbl_name, second_snapshot.get_snapshot_id()))
assert "partitions=4/unknown" in time_travel_data.runtime_profile
selective_time_travel_data = self.execute_query(
"SELECT * FROM {0} for system_version as of {1} WHERE id < 5;".format(
tbl_name, second_snapshot.get_snapshot_id()))
assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
def test_table_repair(self, unique_database):
tbl_name = 'tbl_with_removed_files'
db_tbl = unique_database + "." + tbl_name
repair_query = "alter table {0} execute repair_metadata()"
with self.create_impala_client() as impalad_client:
impalad_client.execute(
"create table {0} (i int) stored as iceberg tblproperties('format-version'='2')"
.format(db_tbl))
insert_q = "insert into {0} values ({1})"
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5))
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3', '4', '5']
TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name)
DATA_PATH = os.path.join(TABLE_PATH, "data")
# Check that table remains intact if there are no missing files
result = self.execute_query_expect_success(
impalad_client, repair_query.format(db_tbl))
assert result.data[0] == "No missing data files detected."
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3', '4', '5']
# Delete 2 data files from the file system directly to corrupt the table.
data_files = self.filesystem_client.ls(DATA_PATH)
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0])
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1])
self.execute_query_expect_success(impalad_client, "invalidate metadata")
result = self.execute_query_expect_success(
impalad_client, repair_query.format(db_tbl))
assert result.data[0] == \
"Iceberg table repaired by deleting 2 manifest entries of missing data files."
result = impalad_client.execute('select * from {0} order by i'.format(db_tbl))
assert len(result.data) == 3
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
# runs also with the V2 optimizations setting turned off, some tests were moved here.
class TestIcebergDirectedMode(IcebergTestSuite):
"""Tests related to Iceberg DIRECTED distribution mode."""
@classmethod
def add_test_dimensions(cls):
super(TestIcebergDirectedMode, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_directed_mode(self, vector):
self.run_test_case('QueryTest/iceberg-v2-directed-mode', vector)