mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
We optimize plain count(*) queries on Iceberg tables the following way:
AGGREGATE
COUNT(*)
|
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / \
without / \
deletes SCAN SCAN
datafiles deletes
||
rewrite
||
\/
ArithmethicExpr: LHS + RHS
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / \
/ \
SCAN SCAN
datafiles deletes
This optimization consists of two parts:
1 Rewriting count(*) expression to count(*) + "record_count" (of data
files without deletes)
2 In IcebergScanPlanner we only need to consruct the right side of
the original UNION ALL operator, i.e.:
ANTI JOIN
/ \
/ \
SCAN SCAN
datafiles deletes
SelectStmt decides whether we can do the count(*) optimization, and if
so, does the following:
1: SelectStmt sets 'TotalRecordsNumV2' in the analyzer, then during the
expression rewrite phase the CountStarToConstRule rewrites the
count(*) to count(*) + record_count
2: SelectStmt sets "OptimizeCountStarForIcebergV2" in the query context
then IcebergScanPlanner creates plan accordingly.
This mechanism works for simple queries, but can turn on count(*)
optimization in IcebergScanPlanner for all Iceberg V2 tables in complex
queries. Even if only one subquery enables count(*) optimization during
analysis.
With this patch the followings change:
1: We introduce IcebergV2CountStarAccumulator which we use instead of
the ArithmethicExpr. So after rewrite we still know if count(*)
optimization should be enabled for the planner.
2: Instead of using the query context, we pass the information to the
IcebergScanPlanner via the TableRef object.
Testing
* e2e tests
Change-Id: I1940031298eb634aa82c3d32bbbf16bce8eaf874
Reviewed-on: http://gerrit.cloudera.org:8080/23705
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
2317 lines
108 KiB
Python
2317 lines
108 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from collections import defaultdict, namedtuple
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
from subprocess import check_call, check_output
|
|
import time
|
|
|
|
from avro.datafile import DataFileReader
|
|
from avro.io import DatumReader
|
|
from builtins import range
|
|
import pytest
|
|
import pytz
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
from impala_thrift_gen.parquet.ttypes import ConvertedType
|
|
from tests.common.file_utils import (
|
|
create_iceberg_table_from_directory,
|
|
create_table_from_parquet,
|
|
)
|
|
from tests.common.iceberg_test_suite import IcebergTestSuite
|
|
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
|
|
from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfFS
|
|
from tests.common.test_dimensions import add_exec_option_dimension
|
|
from tests.common.test_result_verifier import error_msg_startswith
|
|
from tests.shell.util import run_impala_shell_cmd
|
|
from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, IS_HDFS, WAREHOUSE
|
|
from tests.util.get_parquet_metadata import get_parquet_metadata
|
|
from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, quote
|
|
from tests.util.parse_util import bytes_to_str
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class TestIcebergTable(IcebergTestSuite):
|
|
"""Tests related to Iceberg tables."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestIcebergTable, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_iceberg_negative(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-negative', vector, use_db=unique_database)
|
|
|
|
def test_create_iceberg_tables(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database)
|
|
|
|
def test_alter_iceberg_tables_v1(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-alter-v1', vector, use_db=unique_database)
|
|
|
|
def test_alter_iceberg_tables_v2(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-alter-v2', vector, use_db=unique_database)
|
|
|
|
def test_alter_iceberg_tables_default(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-alter-default', vector, use_db=unique_database)
|
|
|
|
def test_iceberg_binary_type(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-binary-type', vector, use_db=unique_database)
|
|
|
|
def test_external_iceberg_tables(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-external', vector, unique_database)
|
|
|
|
def test_expire_snapshots(self, unique_database):
|
|
tbl_name = unique_database + ".expire_snapshots"
|
|
iceberg_catalogs = IcebergCatalogs(unique_database)
|
|
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
|
|
# We are setting the TIMEZONE query option in this test, so let's create a local
|
|
# impala client.
|
|
with self.create_impala_client() as impalad_client:
|
|
# Iceberg doesn't create a snapshot entry for the initial empty table
|
|
impalad_client.execute("""
|
|
create table {0} (i int) stored as iceberg
|
|
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
|
|
ts_0 = datetime.datetime.now()
|
|
insert_q = "insert into {0} values (1)".format(tbl_name)
|
|
ts_1 = self.execute_query_ts(impalad_client, insert_q)
|
|
time.sleep(1)
|
|
impalad_client.execute(insert_q)
|
|
time.sleep(1)
|
|
ts_2 = self.execute_query_ts(impalad_client, insert_q)
|
|
impalad_client.execute(insert_q)
|
|
|
|
# There should be 4 snapshots initially
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
|
|
# Expire the oldest snapshot and test that the oldest one was expired
|
|
expire_q = "alter table {0} execute expire_snapshots({1})"
|
|
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
|
|
|
|
# Expire with a timestamp in which the interval does not touch existing snapshot
|
|
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
|
|
|
|
# Expire all, but retain 1
|
|
impalad_client.execute(expire_q.format(tbl_name,
|
|
cast_ts(datetime.datetime.now())))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
|
|
|
|
# Change number of retained snapshots, then expire all
|
|
impalad_client.execute("""alter table {0} set tblproperties
|
|
('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
|
|
impalad_client.execute(insert_q)
|
|
impalad_client.execute(insert_q)
|
|
impalad_client.execute(expire_q.format(tbl_name,
|
|
cast_ts(datetime.datetime.now())))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
|
|
|
|
# Check that timezone is interpreted in local timezone controlled by query option
|
|
# TIMEZONE.
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
impalad_client.execute(insert_q)
|
|
ts_tokyo = self.impala_now(impalad_client)
|
|
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
|
|
impalad_client.execute(insert_q)
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
|
|
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
|
|
|
|
def test_truncate_iceberg_tables(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
|
|
|
|
@SkipIf.not_dfs
|
|
def test_drop_incomplete_table(self, unique_database):
|
|
"""Test DROP TABLE when the underlying directory is deleted. In that case table
|
|
loading fails, but we should be still able to drop the table from Impala."""
|
|
tbl_name = unique_database + ".synchronized_iceberg_tbl"
|
|
cat_location = get_fs_path("/test-warehouse/" + unique_database)
|
|
self.client.execute("""create table {0} (i int) stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.catalog',
|
|
'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location))
|
|
self.filesystem_client.delete_file_dir(cat_location, True)
|
|
self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name))
|
|
|
|
@SkipIf.not_dfs(reason="Dfs required as test to directly delete files.")
|
|
def test_drop_corrupt_table(self, unique_database):
|
|
"""Test that if the underlying iceberg metadata directory is deleted, then a query
|
|
fails with a reasonable error message, and the table can be dropped successfully."""
|
|
table = "corrupt_iceberg_tbl"
|
|
full_table_name = unique_database + "." + table
|
|
self.client.execute("""create table {0} (i int) stored as iceberg""".
|
|
format(full_table_name))
|
|
metadata_location = get_fs_path("""/test-warehouse/{0}.db/{1}/metadata""".format(
|
|
unique_database, table))
|
|
assert self.filesystem_client.exists(metadata_location)
|
|
status = self.filesystem_client.delete_file_dir(metadata_location, True)
|
|
assert status, "Delete failed with {0}".format(status)
|
|
assert not self.filesystem_client.exists(metadata_location)
|
|
|
|
# Invalidate so that table loading problems will happen in the catalog.
|
|
self.client.execute("invalidate metadata {0}".format(full_table_name))
|
|
|
|
# Query should now fail.
|
|
err = self.execute_query_expect_failure(self.client, """select * from {0}""".
|
|
format(full_table_name))
|
|
result = str(err)
|
|
assert "AnalysisException: Failed to load metadata for table" in result
|
|
assert ("Failed to load metadata for table" in result # local catalog
|
|
or "Error loading metadata for Iceberg table" in result) # default catalog
|
|
self.execute_query_expect_success(self.client, """drop table {0}""".
|
|
format(full_table_name))
|
|
|
|
def test_insert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database)
|
|
|
|
def test_partitioned_insert_v1(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-partitioned-insert-v1', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_partitioned_insert_v2(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-partitioned-insert-v2', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_partitioned_insert_default(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-partitioned-insert-default', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_insert_overwrite(self, vector, unique_database):
|
|
"""Run iceberg-overwrite tests, then test that INSERT INTO/OVERWRITE queries running
|
|
concurrently with a long running INSERT OVERWRITE are handled gracefully. query_a is
|
|
started before query_b/query_c, but query_b/query_c are supposed to finish before
|
|
query_a. query_a should fail because the overwrite should not erase query_b/query_c's
|
|
result."""
|
|
# Run iceberg-overwrite.test
|
|
self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database)
|
|
|
|
# Create test dataset for concurrency tests and warm-up the test table
|
|
tbl_name = unique_database + ".overwrite_tbl"
|
|
self.client.execute("""create table {0} (i int)
|
|
partitioned by spec (truncate(3, i))
|
|
stored as iceberg""".format(tbl_name))
|
|
self.client.execute("insert into {0} values (1), (2), (3);".format(tbl_name))
|
|
|
|
# Test queries: 'a' is the long running query while 'b' and 'c' are the short ones
|
|
query_a = """insert overwrite {0} select sleep(5000);""".format(tbl_name)
|
|
query_b = """insert overwrite {0} select * from {0};""".format(tbl_name)
|
|
query_c = """insert into {0} select * from {0};""".format(tbl_name)
|
|
|
|
# Test concurrent INSERT OVERWRITEs, the exception closes the query handle.
|
|
handle = self.client.execute_async(query_a)
|
|
time.sleep(1)
|
|
self.client.execute(query_b)
|
|
try:
|
|
self.client.wait_for_finished_timeout(handle, 30)
|
|
assert False
|
|
except IMPALA_CONNECTION_EXCEPTION as e:
|
|
assert "Found conflicting files" in str(e)
|
|
|
|
# Test INSERT INTO during INSERT OVERWRITE, the exception closes the query handle.
|
|
handle = self.client.execute_async(query_a)
|
|
time.sleep(1)
|
|
self.client.execute(query_c)
|
|
try:
|
|
self.client.wait_for_finished_timeout(handle, 30)
|
|
assert False
|
|
except IMPALA_CONNECTION_EXCEPTION as e:
|
|
assert "Found conflicting files" in str(e)
|
|
|
|
def test_ctas(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database)
|
|
|
|
def test_partition_transform_insert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_iceberg_orc_field_id(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-orc-field-id', vector)
|
|
|
|
def test_catalogs(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-catalogs', vector, use_db=unique_database)
|
|
|
|
def test_missing_field_ids(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-missing-field-ids', vector)
|
|
|
|
def test_migrated_tables(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-migrated-tables', vector, unique_database)
|
|
|
|
def test_migrated_table_field_id_resolution(self, vector, unique_database):
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_migrated_alter_test", "parquet")
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_migrated_complex_test", "parquet")
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_migrated_alter_test_orc", "orc")
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_migrated_complex_test_orc", "orc")
|
|
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution',
|
|
vector, unique_database)
|
|
if IS_HDFS:
|
|
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-orc',
|
|
vector, unique_database)
|
|
|
|
def test_column_case_sensitivity(self, vector, unique_database):
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_column_case_sensitivity_issue", "parquet")
|
|
self.run_test_case('QueryTest/iceberg-column-case-sensitivity-issue',
|
|
vector, unique_database)
|
|
|
|
@SkipIfFS.hive
|
|
def test_migrated_table_field_id_resolution_complex(self, vector, unique_database):
|
|
def get_table_loc(tbl_name):
|
|
return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, tbl_name)
|
|
|
|
def create_table(tbl_name, file_format, partition_cols):
|
|
self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} (
|
|
id INT,
|
|
name STRING,
|
|
teststeps array<struct<step_number:int,step_description:string>>)
|
|
PARTITIONED BY ({})
|
|
STORED AS {}
|
|
""".format(unique_database, tbl_name, partition_cols, file_format))
|
|
|
|
def add_file_to_table_partition(tbl_name, part_dir, local_filename):
|
|
tbl_loc = get_table_loc(tbl_name)
|
|
part_dir = os.path.join(tbl_loc, part_dir)
|
|
self.filesystem_client.make_dir(part_dir)
|
|
data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata",
|
|
"migrated_iceberg", local_filename)
|
|
self.filesystem_client.copy_from_local(data_file_path, part_dir)
|
|
|
|
def finalize_table(tbl_name):
|
|
self.execute_query("ALTER TABLE {}.{} RECOVER PARTITIONS".format(
|
|
unique_database, tbl_name))
|
|
self.execute_query("ALTER TABLE {}.{} CONVERT TO ICEBERG".format(
|
|
unique_database, tbl_name))
|
|
|
|
def prepare_test_table(tbl_name, file_format, partition_cols, part_dir, datafile):
|
|
create_table(tbl_name, file_format, partition_cols)
|
|
add_file_to_table_partition(tbl_name, part_dir, datafile)
|
|
finalize_table(tbl_name)
|
|
|
|
prepare_test_table('all_part_cols_stored_parquet',
|
|
"PARQUET",
|
|
"result_date STRING",
|
|
"result_date=2024-08-26",
|
|
"complextypes_and_partition_columns_in_data_files.parquet")
|
|
prepare_test_table('not_all_part_cols_stored_parquet',
|
|
"PARQUET",
|
|
"result_date STRING, p INT",
|
|
"result_date=2024-08-26/p=3",
|
|
"complextypes_and_partition_columns_in_data_files.parquet")
|
|
prepare_test_table('all_part_cols_stored_orc',
|
|
"ORC",
|
|
"result_date STRING",
|
|
"result_date=2024-08-26",
|
|
"complextypes_and_partition_columns_in_data_files.orc")
|
|
prepare_test_table('not_all_part_cols_stored_orc',
|
|
"ORC",
|
|
"result_date STRING, p INT",
|
|
"result_date=2024-08-26/p=3",
|
|
"complextypes_and_partition_columns_in_data_files.orc")
|
|
|
|
self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-complex',
|
|
vector, unique_database)
|
|
|
|
def test_describe_history(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database)
|
|
|
|
# Create a table with multiple snapshots and verify the table history.
|
|
tbl_name = unique_database + ".iceberg_multi_snapshots"
|
|
self.client.execute("""create table {0} (i int) stored as iceberg
|
|
tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
|
|
self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
|
|
self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
|
|
snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
|
|
first_snapshot = snapshots[0]
|
|
second_snapshot = snapshots[1]
|
|
# Check that first snapshot is older than the second snapshot.
|
|
assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
|
|
# Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
|
|
assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
|
|
# The first snapshot has no parent snapshot ID.
|
|
assert(first_snapshot.get_parent_id() is None)
|
|
# Check "is_current_ancestor" column.
|
|
assert(first_snapshot.is_current_ancestor())
|
|
assert(second_snapshot.is_current_ancestor())
|
|
|
|
def test_execute_rollback_negative(self, vector):
|
|
"""Negative test for EXECUTE ROLLBACK."""
|
|
self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
|
|
|
|
def test_execute_rollback(self, unique_database):
|
|
"""Test for EXECUTE ROLLBACK."""
|
|
iceberg_catalogs = IcebergCatalogs(unique_database)
|
|
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
|
|
# Create a table with multiple snapshots.
|
|
tbl_name = unique_database + ".iceberg_execute_rollback"
|
|
# We are setting the TIMEZONE query option in this test, so let's create a local
|
|
# impala client.
|
|
with self.create_impala_client() as impalad_client:
|
|
orig_timezone = 'America/Los_Angeles'
|
|
impalad_client.execute("SET TIMEZONE='" + orig_timezone + "'")
|
|
impalad_client.execute("""
|
|
create table {0} (i int) stored as iceberg
|
|
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
|
|
initial_snapshots = 3
|
|
for i in range(initial_snapshots):
|
|
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
|
|
snapshots = get_snapshots(impalad_client, tbl_name,
|
|
expected_result_size=initial_snapshots)
|
|
|
|
output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
|
|
LOG.info("success output={0}".format(output))
|
|
|
|
# We rolled back, but that creates a new snapshot, so now there are 4.
|
|
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
|
|
# The new snapshot has the same id (and parent id) as the snapshot we rolled back
|
|
# to, but it has a different creation time.
|
|
assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
|
|
assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
|
|
assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
|
|
# The "orphaned" snapshot is now not a current ancestor.
|
|
assert not snapshots[2].is_current_ancestor()
|
|
|
|
# We cannot roll back to a snapshot that is not a current ancestor.
|
|
output = self.rollback_to_id_expect_failure(tbl_name,
|
|
snapshots[2].get_snapshot_id(),
|
|
expected_text="Cannot roll back to snapshot, not an ancestor of the current "
|
|
"state")
|
|
|
|
# Create another snapshot.
|
|
before_insert = datetime.datetime.now(pytz.timezone(orig_timezone))
|
|
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
|
|
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
|
|
|
|
# Rollback to before the last insert.
|
|
self.rollback_to_ts(impalad_client, tbl_name, before_insert)
|
|
# This creates another snapshot.
|
|
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
|
|
# The snapshot id is the same, the dates differ
|
|
assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
|
|
assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
|
|
assert not snapshots[4].is_current_ancestor()
|
|
|
|
# Show that the EXECUTE ROLLBACK is respecting the current timezone.
|
|
# To do this we try to roll back to a time for which there is no
|
|
# snapshot, this will fail with an error message that includes the specified
|
|
# time. We parse out that time. By doing this in two timezones we can see
|
|
# that the parameter being used was affected by the current timezone.
|
|
one_hour_ago = before_insert - datetime.timedelta(hours=1)
|
|
# We use Timezones from Japan and Iceland to avoid any DST complexities.
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
japan_ts = self.get_snapshot_ts_from_failed_rollback(
|
|
impalad_client, tbl_name, one_hour_ago)
|
|
impalad_client.execute("SET TIMEZONE='Iceland'")
|
|
iceland_ts = self.get_snapshot_ts_from_failed_rollback(
|
|
impalad_client, tbl_name, one_hour_ago)
|
|
diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
|
|
assert diff_hours == 9
|
|
|
|
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
|
|
|
|
def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
|
|
"""Run an EXECUTE ROLLBACK which is expected to fail.
|
|
Parse the error message to extract the timestamp for which there
|
|
was no snapshot, and convert the string to an integer"""
|
|
try:
|
|
self.rollback_to_ts(client, tbl_name, ts)
|
|
assert False, "Query should have failed"
|
|
except IMPALA_CONNECTION_EXCEPTION as e:
|
|
result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
|
|
time_str = result.group(1)
|
|
snapshot_ts = int(time_str)
|
|
assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
|
|
return snapshot_ts
|
|
|
|
def rollback_to_ts(self, client, tbl_name, ts):
|
|
"""Rollback a table to a snapshot timestamp."""
|
|
query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
|
|
return self.execute_query_expect_success(client, query)
|
|
|
|
def rollback_to_id(self, tbl_name, id):
|
|
"""Rollback a table to a snapshot id."""
|
|
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
|
|
return self.execute_query_expect_success(self.client, query)
|
|
|
|
def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
|
|
"""Attempt to roll back a table to a snapshot id, expecting a failure."""
|
|
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
|
|
output = self.execute_query_expect_failure(self.client, query)
|
|
if expected_text:
|
|
assert expected_text in str(output)
|
|
return output
|
|
|
|
def test_execute_remove_orphan_files(self, unique_database):
|
|
tbl_name = 'tbl_with_orphan_files'
|
|
db_tbl = unique_database + "." + tbl_name
|
|
with self.create_impala_client() as impalad_client:
|
|
impalad_client.execute("create table {0} (i int) stored as iceberg"
|
|
.format(db_tbl))
|
|
insert_q = "insert into {0} values ({1})"
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
|
|
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
|
|
assert result.data == ['1', '2', '3']
|
|
|
|
# Add some junk files to data and metadata dir.
|
|
TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name)
|
|
DATA_PATH = os.path.join(TABLE_PATH, "data")
|
|
METADATA_PATH = os.path.join(TABLE_PATH, "metadata")
|
|
SRC_DIR = os.path.join(
|
|
os.environ['IMPALA_HOME'],
|
|
"testdata/data/iceberg_test/iceberg_mixed_file_format_test/{0}/{1}")
|
|
|
|
# Copy first set of junk files.
|
|
file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
|
|
"fff150b6136a-job_16619542960420_0002-1-00001.parquet"
|
|
file_avro1 = "055baf62-de6d-4583-bf21-f187f9482343-m0.avro"
|
|
self.filesystem_client.copy_from_local(
|
|
SRC_DIR.format('data', file_parq1), DATA_PATH)
|
|
self.filesystem_client.copy_from_local(
|
|
SRC_DIR.format('metadata', file_avro1), METADATA_PATH)
|
|
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1))
|
|
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro1))
|
|
# Keep current time.
|
|
result = impalad_client.execute('select cast(now() as string)')
|
|
cp1_time = result.data[0]
|
|
time.sleep(1)
|
|
|
|
# Copy second set of junk files.
|
|
file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
|
|
"27ff880faff0-job_16619542960420_0004-1-00001.parquet"
|
|
file_avro2 = "871d1473-8566-46c0-a530-a2256b3f396f-m0.avro"
|
|
self.filesystem_client.copy_from_local(
|
|
SRC_DIR.format('data', file_parq2), DATA_PATH)
|
|
self.filesystem_client.copy_from_local(
|
|
SRC_DIR.format('metadata', file_avro2), METADATA_PATH)
|
|
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
|
|
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2))
|
|
|
|
# Execute REMOVE_ORPHAN_FILES at specific timestamp.
|
|
result = impalad_client.execute(
|
|
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES('{1}')".format(db_tbl, cp1_time))
|
|
assert result.data[0] == 'Remove orphan files executed.'
|
|
assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1))
|
|
assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq1))
|
|
assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
|
|
assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2))
|
|
|
|
# Execute REMOVE_ORPHAN_FILES at now().
|
|
result = impalad_client.execute(
|
|
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES(now())".format(db_tbl))
|
|
assert result.data[0] == 'Remove orphan files executed.'
|
|
assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2))
|
|
assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq2))
|
|
|
|
# Assert table still queryable.
|
|
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
|
|
assert result.data == ['1', '2', '3']
|
|
|
|
def test_describe_history_params(self, unique_database):
|
|
tbl_name = unique_database + ".describe_history"
|
|
|
|
# We are setting the TIMEZONE query option in this test, so let's create a local
|
|
# impala client.
|
|
with self.create_impala_client() as impalad_client:
|
|
# Iceberg doesn't create a snapshot entry for the initial empty table
|
|
impalad_client.execute("create table {0} (i int) stored as iceberg"
|
|
.format(tbl_name))
|
|
insert_q = "insert into {0} values (1)".format(tbl_name)
|
|
ts_1 = self.execute_query_ts(impalad_client, insert_q)
|
|
time.sleep(1)
|
|
ts_2 = self.execute_query_ts(impalad_client, insert_q)
|
|
time.sleep(1)
|
|
ts_3 = self.execute_query_ts(impalad_client, insert_q)
|
|
# Describe history without predicate
|
|
data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
|
|
assert len(data.data) == 3
|
|
|
|
# Describe history with FROM predicate
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name,
|
|
ts_1 - datetime.timedelta(hours=1), 3)
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 2)
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
|
|
|
|
# Describe history with BETWEEN <ts> AND <ts> predicate
|
|
self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
|
|
self.expect_results_between(impalad_client, tbl_name,
|
|
ts_1 - datetime.timedelta(hours=1), ts_2, 2)
|
|
self.expect_results_between(impalad_client, tbl_name,
|
|
ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
|
|
|
|
# Check that timezone is interpreted in local timezone controlled by query option
|
|
# TIMEZONE. Persist the local times first and create a new snapshot.
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
now_tokyo = self.impala_now(impalad_client)
|
|
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
|
|
now_budapest = self.impala_now(impalad_client)
|
|
self.execute_query_ts(impalad_client, "insert into {0} values (4)".format(tbl_name))
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 1)
|
|
|
|
# Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, now_tokyo, 1)
|
|
|
|
# Interpreting Budapest time in Tokyo time points to the past.
|
|
self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4)
|
|
|
|
def test_time_travel(self, unique_database):
|
|
tbl_name = unique_database + ".time_travel"
|
|
|
|
def expect_results(query, expected_results, expected_cols):
|
|
data = impalad_client.execute(query)
|
|
assert len(data.data) == len(expected_results)
|
|
for r in expected_results:
|
|
assert r in data.data
|
|
expected_col_labels = expected_cols['labels']
|
|
expected_col_types = expected_cols['types']
|
|
assert data.column_labels == expected_col_labels
|
|
assert data.column_types == expected_col_types
|
|
|
|
def expect_for_count_star(query, expected):
|
|
data = impalad_client.execute(query)
|
|
assert len(data.data) == 1
|
|
assert expected in data.data
|
|
assert "NumRowGroups" not in data.runtime_profile
|
|
assert "NumFileMetadataRead" not in data.runtime_profile
|
|
|
|
def expect_results_t(ts, expected_results, expected_cols):
|
|
expect_results(
|
|
"select * from {0} for system_time as of {1}".format(tbl_name, ts),
|
|
expected_results, expected_cols)
|
|
|
|
def expect_for_count_star_t(ts, expected):
|
|
expect_for_count_star(
|
|
"select count(*) from {0} for system_time as of {1}".format(tbl_name, ts),
|
|
expected)
|
|
|
|
def expect_snapshot_id_in_plan_t(ts, snapshot_id):
|
|
data = impalad_client.execute(
|
|
"explain select * from {0} for system_time as of {1}".format(
|
|
tbl_name, ts))
|
|
assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data
|
|
|
|
def expect_results_v(snapshot_id, expected_results, expected_cols):
|
|
expect_results(
|
|
"select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id),
|
|
expected_results, expected_cols)
|
|
|
|
def expect_for_count_star_v(snapshot_id, expected):
|
|
expect_for_count_star(
|
|
"select count(*) from {0} for system_version as of {1}".format(
|
|
tbl_name, snapshot_id),
|
|
expected)
|
|
|
|
def expect_snapshot_id_in_plan_v(snapshot_id):
|
|
data = impalad_client.execute(
|
|
"explain select * from {0} for system_version as of {1}".format(
|
|
tbl_name, snapshot_id))
|
|
assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data
|
|
|
|
def impala_now():
|
|
now_data = impalad_client.execute("select now()")
|
|
return now_data.data[0]
|
|
|
|
# We are setting the TIMEZONE query option in this test, so let's create a local
|
|
# impala client.
|
|
with self.create_impala_client() as impalad_client:
|
|
# Iceberg doesn't create a snapshot entry for the initial empty table
|
|
impalad_client.execute("create table {0} (i int) stored as iceberg"
|
|
.format(tbl_name))
|
|
ts_1 = self.execute_query_ts(impalad_client, "insert into {0} values (1)"
|
|
.format(tbl_name))
|
|
ts_2 = self.execute_query_ts(impalad_client, "insert into {0} values (2)"
|
|
.format(tbl_name))
|
|
ts_3 = self.execute_query_ts(impalad_client, "truncate table {0}".format(tbl_name))
|
|
time.sleep(5)
|
|
ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)"
|
|
.format(tbl_name))
|
|
ts_no_ss = self.execute_query_ts(impalad_client,
|
|
"alter table {0} add column {1} bigint"
|
|
.format(tbl_name, "j"))
|
|
ts_5 = self.execute_query_ts(impalad_client, "insert into {0} (i,j) values (3, 103)"
|
|
.format(tbl_name))
|
|
|
|
# Descriptions of the different schemas we expect to see as Time Travel queries
|
|
# use the schema from the specified time or snapshot.
|
|
#
|
|
# When the schema is just the 'J' column.
|
|
j_cols = {
|
|
'labels': ['J'],
|
|
'types': ['BIGINT']
|
|
}
|
|
# When the schema is just the 'I' column.
|
|
i_cols = {
|
|
'labels': ['I'],
|
|
'types': ['INT']
|
|
}
|
|
# When the schema is the 'I' and 'J' columns.
|
|
ij_cols = {
|
|
'labels': ['I', 'J'],
|
|
'types': ['INT', 'BIGINT']
|
|
}
|
|
|
|
# Query table as of timestamps.
|
|
expect_results_t("now()", ['100\tNULL', '3\t103'], ij_cols)
|
|
expect_results_t(quote(ts_1), ['1'], i_cols)
|
|
expect_results_t(quote(ts_2), ['1', '2'], i_cols)
|
|
expect_results_t(quote(ts_3), [], i_cols)
|
|
expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [], i_cols)
|
|
expect_results_t(quote(ts_4), ['100'], i_cols)
|
|
expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [], i_cols)
|
|
# There is no new snapshot created by the schema change between ts_4 and ts_no_ss.
|
|
# So at ts_no_ss we see the schema as of ts_4
|
|
expect_results_t(quote(ts_no_ss), ['100'], i_cols)
|
|
expect_results_t(quote(ts_5), ['100\tNULL', '3\t103'], ij_cols)
|
|
# Future queries return the current snapshot.
|
|
expect_results_t(cast_ts(ts_5) + " + interval 1 hours", ['100\tNULL', '3\t103'],
|
|
ij_cols)
|
|
|
|
# Query table as of snapshot IDs.
|
|
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
|
|
expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
|
|
expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
|
|
expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
|
|
expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
|
|
expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
|
|
|
|
expect_snapshot_id_in_plan_v(snapshots[0].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_v(snapshots[1].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_v(snapshots[2].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_v(snapshots[3].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_v(snapshots[4].get_snapshot_id())
|
|
|
|
expect_snapshot_id_in_plan_t(quote(ts_1), snapshots[0].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_t(quote(ts_2), snapshots[1].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_t(quote(ts_3), snapshots[2].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_t(quote(ts_4), snapshots[3].get_snapshot_id())
|
|
expect_snapshot_id_in_plan_t(quote(ts_5), snapshots[4].get_snapshot_id())
|
|
|
|
# Test of plain count star optimization
|
|
# 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
|
|
expect_for_count_star_t("now()", '2')
|
|
expect_for_count_star_t(quote(ts_1), '1')
|
|
expect_for_count_star_t(quote(ts_2), '2')
|
|
expect_for_count_star_t(quote(ts_3), '0')
|
|
expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0')
|
|
expect_for_count_star_t(quote(ts_4), '1')
|
|
expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
|
|
expect_for_count_star_t(cast_ts(ts_5), '2')
|
|
expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
|
|
expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
|
|
expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
|
|
expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
|
|
expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
|
|
expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
|
|
|
|
# SELECT diff
|
|
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
|
|
MINUS
|
|
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
|
|
tbl=tbl_name, ts_new=ts_2, ts_old=ts_1),
|
|
['2'], i_cols)
|
|
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
|
|
MINUS
|
|
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
|
|
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
|
|
v_old=snapshots[0].get_snapshot_id()),
|
|
['2'], i_cols)
|
|
# Mix SYSTEM_TIME and SYSTEM_VERSION
|
|
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
|
|
MINUS
|
|
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
|
|
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
|
|
['2'], i_cols)
|
|
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
|
|
MINUS
|
|
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
|
|
tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
|
|
['2'], i_cols)
|
|
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
|
|
MINUS
|
|
SELECT *, NULL FROM {tbl} FOR SYSTEM_TIME
|
|
AS OF '{ts_old}'""".format(
|
|
tbl=tbl_name, ts_new=ts_5, ts_old=ts_4),
|
|
['3\t103'], ij_cols)
|
|
|
|
# Query old snapshot
|
|
try:
|
|
impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_TIME AS OF {1}".format(
|
|
tbl_name, "now() - interval 2 years"))
|
|
assert False # Exception must be thrown
|
|
except Exception as e:
|
|
assert "Cannot find a snapshot older than" in str(e)
|
|
# Query invalid snapshot
|
|
try:
|
|
impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_VERSION AS OF 42".format(
|
|
tbl_name))
|
|
assert False # Exception must be thrown
|
|
except Exception as e:
|
|
assert "Cannot find snapshot with ID 42" in str(e)
|
|
|
|
# Go back to one column
|
|
impalad_client.execute("alter table {0} drop column i".format(tbl_name))
|
|
|
|
# Test that deleted column is not selectable.
|
|
try:
|
|
impalad_client.execute("SELECT i FROM {0}".format(tbl_name))
|
|
assert False # Exception must be thrown
|
|
except Exception as e:
|
|
assert "Could not resolve column/field reference: 'i'" in str(e)
|
|
|
|
# Back at ts_2 the deleted 'I' column is there
|
|
expect_results("SELECT * FROM {0} FOR SYSTEM_TIME AS OF '{1}'".
|
|
format(tbl_name, ts_2), ['1', '2'], i_cols)
|
|
expect_results("SELECT i FROM {0} FOR SYSTEM_TIME AS OF '{1}'".
|
|
format(tbl_name, ts_2), ['1', '2'], i_cols)
|
|
|
|
# Check that timezone is interpreted in local timezone controlled by query option
|
|
# TIMEZONE
|
|
impalad_client.execute("truncate table {0}".format(tbl_name))
|
|
impalad_client.execute("insert into {0} values (1111)".format(tbl_name))
|
|
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
|
|
now_budapest = impala_now()
|
|
expect_results_t(quote(now_budapest), ['1111'], j_cols)
|
|
|
|
# Let's switch to Tokyo time. Tokyo time is always greater than Budapest time.
|
|
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
|
|
now_tokyo = impala_now()
|
|
expect_results_t(quote(now_tokyo), ['1111'], j_cols)
|
|
try:
|
|
# Interpreting Budapest time in Tokyo time points to the past when the table
|
|
# didn't exist.
|
|
expect_results_t(quote(now_budapest), [], j_cols)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Cannot find a snapshot older than" in str(e)
|
|
|
|
def test_time_travel_queries(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-time-travel', vector, use_db=unique_database)
|
|
|
|
@SkipIf.not_dfs
|
|
def test_strings_utf8(self, unique_database):
|
|
# Create table
|
|
table_name = "ice_str_utf8"
|
|
qualified_table_name = "%s.%s" % (unique_database, table_name)
|
|
query = 'create table %s (a string) stored as iceberg' % qualified_table_name
|
|
self.client.execute(query)
|
|
|
|
# Inserted string data should have UTF8 annotation regardless of query options.
|
|
query = 'insert into %s values ("impala")' % qualified_table_name
|
|
self.execute_query(query, {'parquet_annotate_strings_utf8': False})
|
|
|
|
# Copy the created file to the local filesystem and parse metadata
|
|
local_file = '/tmp/iceberg_utf8_test_%s.parq' % random.randint(0, 10000)
|
|
LOG.info("test_strings_utf8 local file name: " + local_file)
|
|
hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/data/*.parq'
|
|
% (unique_database, table_name))
|
|
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_file, local_file])
|
|
metadata = get_parquet_metadata(local_file)
|
|
|
|
# Extract SchemaElements corresponding to the table column
|
|
a_schema_element = metadata.schema[1]
|
|
assert a_schema_element.name == 'a'
|
|
|
|
# Check that the schema uses the UTF8 annotation
|
|
assert a_schema_element.converted_type == ConvertedType.UTF8
|
|
|
|
os.remove(local_file)
|
|
|
|
# Get hdfs path to manifest list that belongs to the snapshot identified by
|
|
# 'snapshot_counter'.
|
|
def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name,
|
|
snapshot_counter):
|
|
local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000))
|
|
hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json'
|
|
% (db_name, table_name, snapshot_counter))
|
|
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
|
|
|
|
manifest_list_hdfs_path = None
|
|
try:
|
|
with open(local_path, 'r') as fp:
|
|
metadata = json.load(fp)
|
|
current_snapshot_id = metadata['current-snapshot-id']
|
|
for snapshot in metadata['snapshots']:
|
|
if snapshot['snapshot-id'] == current_snapshot_id:
|
|
manifest_list_hdfs_path = snapshot['manifest-list']
|
|
break
|
|
finally:
|
|
os.remove(local_path)
|
|
return manifest_list_hdfs_path
|
|
|
|
# Get list of hdfs paths to manifest files from the manifest list avro file.
|
|
def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path):
|
|
local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000))
|
|
check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path])
|
|
|
|
manifest_hdfs_path_list = []
|
|
reader = None
|
|
try:
|
|
with open(local_path, 'rb') as fp:
|
|
reader = DataFileReader(fp, DatumReader())
|
|
for manifest in reader:
|
|
manifest_hdfs_path_list.append(manifest['manifest_path'])
|
|
finally:
|
|
if reader:
|
|
reader.close()
|
|
os.remove(local_path)
|
|
return manifest_hdfs_path_list
|
|
|
|
# Get 'data_file' structs from avro manifest files.
|
|
def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list):
|
|
datafiles = []
|
|
for hdfs_path in manifest_hdfs_path_list:
|
|
local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000))
|
|
check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
|
|
|
|
reader = None
|
|
try:
|
|
with open(local_path, 'rb') as fp:
|
|
reader = DataFileReader(fp, DatumReader())
|
|
datafiles.extend([rec['data_file'] for rec in reader])
|
|
finally:
|
|
if reader:
|
|
reader.close()
|
|
os.remove(local_path)
|
|
return datafiles
|
|
|
|
def get_latest_metadata_path(self, database_name, table_name):
|
|
describe = 'describe extended %s.%s ' % (database_name, table_name)
|
|
output = self.client.execute(describe)
|
|
metadata_location = [s for s in output.data if s.startswith('\tmetadata_location')]
|
|
assert len(metadata_location) == 1
|
|
metadata_location_split = metadata_location[0].split('\t')
|
|
assert len(metadata_location_split) == 3
|
|
metadata_path = metadata_location_split[2]
|
|
return metadata_path
|
|
|
|
# Get the current partition spec as JSON from the latest metadata file
|
|
def get_current_partition_spec(self, database_name, table_name):
|
|
hdfs_path = self.get_latest_metadata_path(database_name, table_name)
|
|
output = check_output(['hadoop', 'fs', '-cat', hdfs_path])
|
|
|
|
current_partition_spec = None
|
|
metadata = json.loads(output)
|
|
|
|
current_spec_id = metadata['default-spec-id']
|
|
for spec in metadata['partition-specs']:
|
|
if spec['spec-id'] == current_spec_id:
|
|
current_partition_spec = spec
|
|
break
|
|
return current_partition_spec
|
|
|
|
@SkipIf.not_dfs
|
|
def test_partition_spec_update_v1(self, unique_database):
|
|
# Create table
|
|
table_name = "ice_part"
|
|
qualified_table_name = "%s.%s" % (unique_database, table_name)
|
|
create_table = """create table {}
|
|
(s string, i int) partitioned by spec(truncate(5, s), identity(i))
|
|
stored as iceberg
|
|
tblproperties ('format-version'='1')""".format(qualified_table_name)
|
|
self.client.execute(create_table)
|
|
|
|
partition_spec = self.get_current_partition_spec(unique_database, table_name)
|
|
assert len(partition_spec['fields']) == 2
|
|
truncate_s = partition_spec['fields'][0]
|
|
identity_i = partition_spec['fields'][1]
|
|
# At table creation, partition names does not contain the parameter value.
|
|
assert truncate_s['name'] == 's_trunc'
|
|
assert identity_i['name'] == 'i'
|
|
assert truncate_s['field-id'] == 1000
|
|
assert identity_i['field-id'] == 1001
|
|
|
|
# Partition evolution
|
|
partition_evolution = 'alter table %s set partition ' \
|
|
'spec(identity(i), truncate(6,s))' % qualified_table_name
|
|
self.client.execute(partition_evolution)
|
|
|
|
# V1 partition evolution keeps the old, modified fields, but changes their
|
|
# transform to VOID.
|
|
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
|
|
assert len(evolved_partition_spec['fields']) == 3
|
|
old_truncate_s = evolved_partition_spec['fields'][0]
|
|
identity_i = evolved_partition_spec['fields'][1]
|
|
truncate_s = evolved_partition_spec['fields'][2]
|
|
assert old_truncate_s['name'] == 's_trunc'
|
|
assert identity_i['name'] == 'i'
|
|
# Modified field name contains the parameter value.
|
|
assert truncate_s['name'] == 's_trunc_6'
|
|
assert old_truncate_s['field-id'] == 1000
|
|
assert identity_i['field-id'] == 1001
|
|
# field-id increases for the modified field.
|
|
assert truncate_s['field-id'] == 1002
|
|
assert old_truncate_s['transform'] == 'void'
|
|
|
|
@SkipIf.not_dfs
|
|
def test_partition_spec_update_v2(self, unique_database):
|
|
# Create table
|
|
table_name = "ice_part"
|
|
qualified_table_name = "%s.%s" % (unique_database, table_name)
|
|
create_table = 'create table %s ' \
|
|
'(s string, i int) partitioned by spec(truncate(5, s), identity(i)) ' \
|
|
'stored as iceberg tblproperties ("format-version" = "2")' \
|
|
% qualified_table_name
|
|
self.client.execute(create_table)
|
|
|
|
partition_spec = self.get_current_partition_spec(unique_database, table_name)
|
|
assert len(partition_spec['fields']) == 2
|
|
truncate_s = partition_spec['fields'][0]
|
|
identity_i = partition_spec['fields'][1]
|
|
# At table creation, partition names does not contain the parameter value.
|
|
assert truncate_s['name'] == 's_trunc'
|
|
assert identity_i['name'] == 'i'
|
|
assert truncate_s['field-id'] == 1000
|
|
assert identity_i['field-id'] == 1001
|
|
|
|
# Partition evolution for s
|
|
partition_evolution = 'alter table %s set partition ' \
|
|
'spec(identity(i), truncate(6,s))' % qualified_table_name
|
|
self.client.execute(partition_evolution)
|
|
|
|
# V2 partition evolution updates the previous partitioning for
|
|
# the modified field.
|
|
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
|
|
assert len(evolved_partition_spec['fields']) == 2
|
|
identity_i = evolved_partition_spec['fields'][0]
|
|
truncate_s = evolved_partition_spec['fields'][1]
|
|
assert identity_i['name'] == 'i'
|
|
# Modified field name contains the parameter value.
|
|
assert truncate_s['name'] == 's_trunc_6'
|
|
assert identity_i['field-id'] == 1001
|
|
# field-id increased for the modified field.
|
|
assert truncate_s['field-id'] == 1002
|
|
|
|
# Partition evolution for i and s
|
|
partition_evolution = 'alter table %s set partition ' \
|
|
'spec(bucket(4, i), truncate(3,s))' \
|
|
% qualified_table_name
|
|
self.client.execute(partition_evolution)
|
|
|
|
evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name)
|
|
assert len(evolved_partition_spec['fields']) == 2
|
|
bucket_i = evolved_partition_spec['fields'][0]
|
|
truncate_s = evolved_partition_spec['fields'][1]
|
|
# The field naming follows the transform changes.
|
|
assert bucket_i['name'] == 'i_bucket_4'
|
|
assert truncate_s['name'] == 's_trunc_3'
|
|
# field-id's are increasing with each modification
|
|
assert bucket_i['field-id'] == 1003
|
|
assert truncate_s['field-id'] == 1004
|
|
|
|
@SkipIf.not_dfs
|
|
def test_writing_metrics_to_metadata_v1(self, unique_database):
|
|
self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v1', '1')
|
|
|
|
@SkipIf.not_dfs
|
|
def test_writing_metrics_to_metadata_v2(self, unique_database):
|
|
self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v2', '2')
|
|
|
|
def _test_writing_metrics_to_metadata_impl(self, unique_database, table_name, version):
|
|
# Create table
|
|
qualified_table_name = "%s.%s" % (unique_database, table_name)
|
|
query = """create table {}
|
|
(s string, i int, b boolean, bi bigint, ts timestamp, dt date,
|
|
dc decimal(10, 3))
|
|
stored as iceberg
|
|
tblproperties ('format-version'='{}')""".format(qualified_table_name, version)
|
|
self.client.execute(query)
|
|
# Insert data
|
|
# 1st data file:
|
|
query = 'insert into %s values ' \
|
|
'("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \
|
|
'("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \
|
|
'("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \
|
|
'(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \
|
|
% qualified_table_name
|
|
self.execute_query(query)
|
|
# 2nd data file:
|
|
query = 'insert into %s values ' \
|
|
'(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \
|
|
'(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \
|
|
% qualified_table_name
|
|
self.execute_query(query)
|
|
|
|
# Get hdfs path to manifest list file
|
|
manifest_list_hdfs_path = self.get_manifest_list_hdfs_path(
|
|
'/tmp/iceberg_metrics_test', unique_database, table_name, '00002')
|
|
|
|
# Get the list of hdfs paths to manifest files
|
|
assert manifest_list_hdfs_path is not None
|
|
manifest_hdfs_path_list = self.get_manifest_hdfs_path_list(
|
|
'/tmp/iceberg_metrics_test', manifest_list_hdfs_path)
|
|
|
|
# Get 'data_file' records from manifest files.
|
|
assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0
|
|
datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test',
|
|
manifest_hdfs_path_list)
|
|
|
|
# Check column stats in datafiles
|
|
assert datafiles is not None and len(datafiles) == 2
|
|
|
|
# The 1st datafile contains the 2 NULL rows
|
|
assert datafiles[0]['record_count'] == 2
|
|
assert datafiles[0]['column_sizes'] == \
|
|
[{'key': 1, 'value': 39},
|
|
{'key': 2, 'value': 39},
|
|
{'key': 3, 'value': 25},
|
|
{'key': 4, 'value': 39},
|
|
{'key': 5, 'value': 39},
|
|
{'key': 6, 'value': 39},
|
|
{'key': 7, 'value': 39}]
|
|
assert datafiles[0]['value_counts'] == \
|
|
[{'key': 1, 'value': 2},
|
|
{'key': 2, 'value': 2},
|
|
{'key': 3, 'value': 2},
|
|
{'key': 4, 'value': 2},
|
|
{'key': 5, 'value': 2},
|
|
{'key': 6, 'value': 2},
|
|
{'key': 7, 'value': 2}]
|
|
assert datafiles[0]['null_value_counts'] == \
|
|
[{'key': 1, 'value': 2},
|
|
{'key': 2, 'value': 2},
|
|
{'key': 3, 'value': 2},
|
|
{'key': 4, 'value': 2},
|
|
{'key': 5, 'value': 2},
|
|
{'key': 6, 'value': 2},
|
|
{'key': 7, 'value': 2}]
|
|
# Upper/lower bounds should be empty lists
|
|
assert datafiles[0]['lower_bounds'] == []
|
|
assert datafiles[0]['upper_bounds'] == []
|
|
|
|
# 2nd datafile
|
|
assert datafiles[1]['record_count'] == 4
|
|
assert datafiles[1]['column_sizes'] == \
|
|
[{'key': 1, 'value': 66},
|
|
{'key': 2, 'value': 56},
|
|
{'key': 3, 'value': 26},
|
|
{'key': 4, 'value': 59},
|
|
{'key': 5, 'value': 68},
|
|
{'key': 6, 'value': 56},
|
|
{'key': 7, 'value': 53}]
|
|
assert datafiles[1]['value_counts'] == \
|
|
[{'key': 1, 'value': 4},
|
|
{'key': 2, 'value': 4},
|
|
{'key': 3, 'value': 4},
|
|
{'key': 4, 'value': 4},
|
|
{'key': 5, 'value': 4},
|
|
{'key': 6, 'value': 4},
|
|
{'key': 7, 'value': 4}]
|
|
assert datafiles[1]['null_value_counts'] == \
|
|
[{'key': 1, 'value': 1},
|
|
{'key': 2, 'value': 1},
|
|
{'key': 3, 'value': 2},
|
|
{'key': 4, 'value': 2},
|
|
{'key': 5, 'value': 1},
|
|
{'key': 6, 'value': 1},
|
|
{'key': 7, 'value': 2}]
|
|
assert datafiles[1]['lower_bounds'] == \
|
|
[{'key': 1, 'value': b'abc'},
|
|
# INT is serialized as 4-byte little endian
|
|
{'key': 2, 'value': b'\x00\x00\x00\x00'},
|
|
# BOOLEAN is serialized as 0x00 for FALSE
|
|
{'key': 3, 'value': b'\x00'},
|
|
# BIGINT is serialized as 8-byte little endian
|
|
{'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'},
|
|
# TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
|
|
# 1970-01-01 00:00:00)
|
|
{'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
|
|
# DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
|
|
{'key': 6, 'value': b'\x93\xfe\xff\xff'},
|
|
# Unlike other numerical values, DECIMAL is serialized as big-endian.
|
|
{'key': 7, 'value': b'\xd8\xf0'}]
|
|
assert datafiles[1]['upper_bounds'] == \
|
|
[{'key': 1, 'value': b'ghij'},
|
|
# INT is serialized as 4-byte little endian
|
|
{'key': 2, 'value': b'\x03\x00\x00\x00'},
|
|
# BOOLEAN is serialized as 0x01 for TRUE
|
|
{'key': 3, 'value': b'\x01'},
|
|
# BIGINT is serialized as 8-byte little endian
|
|
{'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'},
|
|
# TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
|
|
# 1970-01-01 00:00:00)
|
|
{'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'},
|
|
# DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
|
|
{'key': 6, 'value': b'\x6d\x01\x00\x00'},
|
|
# Unlike other numerical values, DECIMAL is serialized as big-endian.
|
|
{'key': 7, 'value': b'\x00\xdc\x14'}]
|
|
|
|
def test_using_upper_lower_bound_metrics(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_writing_many_files(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-write-many-files', vector,
|
|
use_db=unique_database)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_writing_many_files_stress(self, vector, unique_database):
|
|
if self.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector,
|
|
use_db=unique_database)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_table_load_time_for_many_files(self, unique_database):
|
|
if self.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
tbl_name = unique_database + ".iceberg_many_files"
|
|
self.execute_query("""CREATE TABLE {}
|
|
PARTITIONED BY SPEC (bucket(2039, l_orderkey))
|
|
STORED AS ICEBERG
|
|
AS SELECT * FROM tpch_parquet.lineitem""".format(tbl_name))
|
|
self.execute_query("invalidate metadata")
|
|
start_time = time.time()
|
|
self.execute_query("describe formatted {}".format(tbl_name))
|
|
elapsed_time = time.time() - start_time
|
|
if IS_HDFS:
|
|
time_limit = 10
|
|
else:
|
|
time_limit = 20
|
|
assert elapsed_time < time_limit
|
|
|
|
def test_consistent_scheduling(self, unique_database):
|
|
"""IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for
|
|
Iceberg tables."""
|
|
def collect_split_stats(profile):
|
|
splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s]
|
|
splits.sort()
|
|
return splits
|
|
|
|
with self.create_impala_client() as impalad_client:
|
|
impalad_client.execute("use " + unique_database)
|
|
impalad_client.execute("""create table line_ice stored as iceberg
|
|
as select * from tpch_parquet.lineitem""")
|
|
first_result = impalad_client.execute("""select count(*) from line_ice""")
|
|
ref_profile = first_result.runtime_profile
|
|
ref_split_stats = collect_split_stats(ref_profile)
|
|
|
|
for i in range(0, 10):
|
|
# Subsequent executions of the same query should schedule scan ranges similarly.
|
|
result = impalad_client.execute("""select count(*) from line_ice""")
|
|
profile = result.runtime_profile
|
|
split_stats = collect_split_stats(profile)
|
|
assert ref_split_stats == split_stats
|
|
|
|
def test_scheduling_partitioned_tables(self, unique_database):
|
|
"""IMPALA-12765: Balance consecutive partitions better for Iceberg tables"""
|
|
# We are setting the replica_preference query option in this test, so let's create a
|
|
# local impala client.
|
|
inventory_tbl = "inventory_ice"
|
|
item_tbl = "item_ice"
|
|
date_dim_tbl = "date_dim_ice"
|
|
with self.create_impala_client() as impalad_client:
|
|
impalad_client.execute("use " + unique_database)
|
|
impalad_client.execute("set replica_preference=remote")
|
|
impalad_client.execute("""
|
|
CREATE TABLE {}
|
|
PARTITIONED BY SPEC (inv_date_sk)
|
|
STORED BY ICEBERG
|
|
AS SELECT * from tpcds_partitioned_parquet_snap.inventory;
|
|
""".format(inventory_tbl))
|
|
impalad_client.execute("""
|
|
CREATE TABLE {}
|
|
STORED BY ICEBERG
|
|
AS SELECT * from tpcds_partitioned_parquet_snap.item;
|
|
""".format(item_tbl))
|
|
impalad_client.execute("""
|
|
CREATE TABLE {}
|
|
STORED BY ICEBERG
|
|
AS SELECT * from tpcds_partitioned_parquet_snap.date_dim;
|
|
""".format(date_dim_tbl))
|
|
q22_result = impalad_client.execute("""
|
|
select i_product_name, i_brand, i_class, i_category,
|
|
avg(inv_quantity_on_hand) qoh
|
|
from inventory_ice, date_dim_ice, item_ice
|
|
where inv_date_sk=d_date_sk and
|
|
inv_item_sk=i_item_sk and
|
|
d_month_seq between 1199 and 1199 + 11
|
|
group by rollup(i_product_name, i_brand, i_class, i_category)
|
|
order by qoh, i_product_name, i_brand, i_class, i_category
|
|
limit 100
|
|
""")
|
|
profile = q22_result.runtime_profile
|
|
# "Files rejected:" contains the number of files being rejected by runtime
|
|
# filters. With IMPALA-12765 we should see similar numbers for each executor.
|
|
files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile)
|
|
avg_files_rejected = int(files_rejected_array[0])
|
|
THRESHOLD = 3
|
|
for files_rejected_str in files_rejected_array:
|
|
files_rejected = int(files_rejected_str)
|
|
if files_rejected != 0:
|
|
assert abs(avg_files_rejected - files_rejected) < THRESHOLD
|
|
|
|
def test_in_predicate_push_down(self, vector, unique_database):
|
|
self.execute_query("SET RUNTIME_FILTER_MODE=OFF")
|
|
self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_is_null_predicate_push_down(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-is-null-predicate-push-down', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_compound_predicate_push_down(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-compound-predicate-push-down', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_plain_count_star_optimization(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-plain-count-star-optimization', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_create_table_like_table(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-create-table-like-table', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_table_owner(self, vector, unique_database):
|
|
self.run_table_owner_test(vector, unique_database, "some_random_user")
|
|
self.run_table_owner_test(vector, unique_database, "another_random_user")
|
|
|
|
def run_table_owner_test(self, vector, db_name, user_name):
|
|
# Create Iceberg table with a given user running the query.
|
|
tbl_name = "iceberg_table_owner"
|
|
sql_stmt = 'CREATE TABLE {0}.{1} (i int) STORED AS ICEBERG'.format(
|
|
db_name, tbl_name)
|
|
args = ['-u', user_name, '-q', sql_stmt]
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Run DESCRIBE FORMATTED to get the owner of the table.
|
|
args = ['-q', 'DESCRIBE FORMATTED {0}.{1}'.format(db_name, tbl_name)]
|
|
results = run_impala_shell_cmd(vector, args)
|
|
result_rows = results.stdout.strip().split('\n')
|
|
|
|
# Find the output row with the owner.
|
|
owner_row = ""
|
|
for row in result_rows:
|
|
if "Owner:" in row:
|
|
owner_row = row
|
|
assert owner_row != "", "DESCRIBE output doesn't contain owner" + results.stdout
|
|
# Verify that the user running the query is the owner of the table.
|
|
assert user_name in owner_row, "Unexpected owner of Iceberg table. " + \
|
|
"Expected user name: {0}. Actual output row: {1}".format(user_name, owner_row)
|
|
|
|
args = ['-q', 'DROP TABLE {0}.{1}'.format(db_name, tbl_name)]
|
|
results = run_impala_shell_cmd(vector, args)
|
|
|
|
def test_mixed_file_format(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
|
|
unique_database)
|
|
|
|
def test_load(self, vector, unique_database):
|
|
"""Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the
|
|
target directory, copies existing test data to HDFS. The second part runs the test
|
|
cases then cleans up the test directory.
|
|
"""
|
|
# Test 1-6 init: target orc/parquet file and directory
|
|
SRC_DIR = os.path.join(os.environ['IMPALA_HOME'],
|
|
"testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}")
|
|
DST_DIR = "/tmp/" + unique_database + "/parquet/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
|
|
"fff150b6136a-job_16619542960420_0002-1-00001.parquet"
|
|
file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
|
|
"27ff880faff0-job_16619542960420_0004-1-00001.parquet"
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR)
|
|
DST_DIR = "/tmp/" + unique_database + "/orc/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \
|
|
"ee500a19c1d1-job_16619542960420_0003-1-00001.orc"
|
|
file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \
|
|
"519c5500ed04-job_16619542960420_0004-1-00001.orc"
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR)
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR)
|
|
# Test 7 init: overwrite
|
|
DST_DIR = "/tmp/" + unique_database + "/overwrite/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
|
|
# Test 8 init: mismatching parquet schema format
|
|
SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/"
|
|
"iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}")
|
|
DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet"
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
|
|
# Test 9 init: partitioned
|
|
DST_DIR = "/tmp/" + unique_database + "/partitioned/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
|
|
# Test 10 init: hidden files
|
|
DST_DIR = "/tmp/" + unique_database + "/hidden/"
|
|
self.filesystem_client.make_dir(DST_DIR, permission=777)
|
|
self.filesystem_client.create_file(DST_DIR + "_hidden.1", "Test data 123")
|
|
self.filesystem_client.create_file(DST_DIR + "_hidden_2.1", "Test data 123")
|
|
self.filesystem_client.create_file(DST_DIR + ".hidden_3", "Test data 123")
|
|
self.filesystem_client.create_file(DST_DIR + ".hidden_4.1", "Test data 123")
|
|
self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
|
|
|
|
# Init test table
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_mixed_file_format_test", "parquet")
|
|
|
|
# Execute tests
|
|
self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database)
|
|
# Clean up temporary directory
|
|
self.filesystem_client.delete_file_dir("/tmp/{0}".format(unique_database), True)
|
|
|
|
def test_table_sampling(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-tablesample', vector,
|
|
use_db="functional_parquet")
|
|
|
|
def _create_table_like_parquet_helper(self, vector, unique_database, tbl_name,
|
|
expect_success):
|
|
create_table_from_parquet(self.client, unique_database, tbl_name)
|
|
args = ['-q', "show files in {0}.{1}".format(unique_database, tbl_name)]
|
|
results = run_impala_shell_cmd(vector, args)
|
|
result_rows = results.stdout.strip().split('\n')
|
|
hdfs_file = None
|
|
for row in result_rows:
|
|
if "://" in row:
|
|
hdfs_file = row.split('|')[1].lstrip()
|
|
break
|
|
assert hdfs_file
|
|
|
|
iceberg_tbl_name = "iceberg_{0}".format(tbl_name)
|
|
sql_stmt = "create table {0}.{1} like parquet '{2}' stored as iceberg".format(
|
|
unique_database, iceberg_tbl_name, hdfs_file
|
|
)
|
|
args = ['-q', sql_stmt]
|
|
|
|
return run_impala_shell_cmd(vector, args, expect_success=expect_success)
|
|
|
|
def test_create_table_like_parquet(self, vector, unique_database):
|
|
tbl_name = 'alltypes_tiny_pages'
|
|
# Not all types are supported by iceberg
|
|
self._create_table_like_parquet_helper(vector, unique_database, tbl_name, False)
|
|
|
|
tbl_name = "create_table_like_parquet_test"
|
|
results = self._create_table_like_parquet_helper(vector, unique_database, tbl_name,
|
|
True)
|
|
result_rows = results.stdout.strip().split('\n')
|
|
assert result_rows[3].split('|')[1] == ' Table has been created. '
|
|
|
|
sql_stmt = "describe {0}.{1}".format(unique_database, tbl_name)
|
|
args = ['-q', sql_stmt]
|
|
parquet_results = run_impala_shell_cmd(vector, args)
|
|
parquet_result_rows = parquet_results.stdout.strip().split('\n')
|
|
|
|
parquet_column_name_type_list = []
|
|
for row in parquet_result_rows[1:-2]:
|
|
parquet_column_name_type_list.append(row.split('|')[1:3])
|
|
|
|
sql_stmt = "describe {0}.iceberg_{1}".format(unique_database, tbl_name)
|
|
args = ['-q', sql_stmt]
|
|
iceberg_results = run_impala_shell_cmd(vector, args)
|
|
iceberg_result_rows = iceberg_results.stdout.strip().split('\n')
|
|
|
|
iceberg_column_name_type_list = []
|
|
for row in iceberg_result_rows[1:-2]:
|
|
iceberg_column_name_type_list.append(row.split('|')[1:3])
|
|
|
|
assert parquet_column_name_type_list == iceberg_column_name_type_list
|
|
|
|
@SkipIfFS.hive
|
|
def test_hive_external_forbidden(self, unique_database):
|
|
tbl_name = unique_database + ".hive_ext"
|
|
error_msg = ("cannot be loaded because it is an EXTERNAL table in the HiveCatalog "
|
|
"that points to another table. Query the original table instead.")
|
|
self.execute_query("create table {0} (i int) stored by iceberg".
|
|
format(tbl_name))
|
|
# 'iceberg.table_identifier' can refer to another table
|
|
self.run_stmt_in_hive("""alter table {0} set tblproperties
|
|
('external.table.purge'='false',
|
|
'iceberg.table_identifier'='functional_iceberg.iceberg_partitioned')""".
|
|
format(tbl_name))
|
|
ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
|
|
assert error_msg in str(ex)
|
|
# 'iceberg.mr.table.identifier' can refer to another table
|
|
self.run_stmt_in_hive("""
|
|
alter table {0} unset tblproperties('iceberg.table_identifier')""".
|
|
format(tbl_name))
|
|
self.run_stmt_in_hive("""alter table {0} set tblproperties
|
|
('iceberg.mr.table.identifier'='functional_iceberg.iceberg_partitioned')""".
|
|
format(tbl_name))
|
|
ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name))
|
|
assert error_msg in str(ex)
|
|
# 'name' can also refer to another table but cannot be set by Hive/Impala. Also,
|
|
# during table migration both Impala and Hive clears existing table properties
|
|
# See IMPALA-12410
|
|
|
|
@SkipIfFS.incorrent_reported_ec
|
|
def test_compute_stats(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-compute-stats', vector, unique_database)
|
|
|
|
def test_virtual_columns(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-virtual-columns', vector, unique_database)
|
|
|
|
def test_avro_file_format(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-avro', vector, unique_database)
|
|
|
|
def test_convert_table(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables',
|
|
vector, unique_database)
|
|
|
|
def test_table_exists(self, unique_database):
|
|
"""Test that iceberg AlreadyExistsException are correctly handled."""
|
|
tbl_name = unique_database + ".create_iceberg_exists"
|
|
# Attempt to create an iceberg table, simulating AlreadyExistsException
|
|
iceberg_created_options = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@'
|
|
'IcebergAlreadyExistsException@Table was created concurrently'}
|
|
err = self.execute_query_expect_failure(self.client,
|
|
"create table {0} (i int) stored as iceberg".format(tbl_name),
|
|
query_options=iceberg_created_options)
|
|
assert "AlreadyExistsException: Table already exists" in str(err)
|
|
self.execute_query_expect_success(self.client,
|
|
"create table if not exists {0} (i int) stored as iceberg".format(tbl_name),
|
|
query_options=iceberg_created_options)
|
|
|
|
def test_abort_transaction(self, unique_database):
|
|
"""Test that iceberg operations fail correctly when an Iceberg transaction commit
|
|
fails, and that the effects of the failed operation are not visible."""
|
|
tbl_name = unique_database + ".abort_iceberg_transaction"
|
|
# The query options that inject an iceberg transaction commit failure.
|
|
abort_ice_transaction_options = {'debug_action':
|
|
'CATALOGD_ICEBERG_COMMIT:EXCEPTION@'
|
|
'CommitFailedException@'
|
|
'simulated commit failure'}
|
|
# Create an iceberg table and insert a row.
|
|
self.client.execute("""create table {0} (i int)
|
|
stored as iceberg""".format(tbl_name))
|
|
self.execute_query_expect_success(self.client,
|
|
"insert into {0} values (1);".format(tbl_name))
|
|
|
|
# Run a query that would insert a row, but pass the query options that
|
|
# will cause the iceberg transaction to abort.
|
|
err = self.execute_query_expect_failure(self.client,
|
|
"insert into {0} values (2);".format(tbl_name),
|
|
query_options=abort_ice_transaction_options)
|
|
# Check that the error message looks reasonable.
|
|
result = str(err)
|
|
assert error_msg_startswith(result,
|
|
"ImpalaRuntimeException: simulated commit failure\n"
|
|
"CAUSED BY: CommitFailedException: simulated commit failure")
|
|
# Check that no data was inserted.
|
|
data = self.execute_query_expect_success(self.client,
|
|
"select * from {0}".format(tbl_name))
|
|
assert data.column_labels == ['I']
|
|
assert len(data.data) == 1
|
|
assert data.data[0] == '1'
|
|
|
|
# Run a query that would add a column to the table, but pass the query options that
|
|
# will cause the iceberg transaction to abort.
|
|
ddl_err = self.execute_query_expect_failure(self.client,
|
|
"alter table {0} add column {1} bigint"
|
|
.format(tbl_name, "j"), query_options=abort_ice_transaction_options)
|
|
ddl_result = str(ddl_err)
|
|
# Check that the error message looks reasonable.
|
|
assert error_msg_startswith(ddl_result,
|
|
"CommitFailedException: simulated commit failure")
|
|
# Check that no column was added.
|
|
data = self.execute_query_expect_success(self.client,
|
|
"select * from {0}".format(tbl_name))
|
|
assert data.column_labels == ['I']
|
|
assert len(data.data) == 1
|
|
assert data.data[0] == '1'
|
|
|
|
def test_drop_partition(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-drop-partition', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_rollback_after_drop_partition(self, unique_database):
|
|
table_name = "iceberg_drop_partition_rollback"
|
|
qualified_table_name = "{}.{}".format(unique_database, table_name)
|
|
create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int int)
|
|
PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG""".format(qualified_table_name)
|
|
insert_into_stmt = """INSERT INTO {} values(1, 2)""".format(qualified_table_name)
|
|
drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 1)""".format(
|
|
qualified_table_name)
|
|
|
|
self.execute_query(create_table_stmt)
|
|
self.execute_query(insert_into_stmt)
|
|
self.execute_query(drop_partition_stmt)
|
|
|
|
snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=2)
|
|
rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format(
|
|
qualified_table_name, snapshots[0].get_snapshot_id())
|
|
# Rollback before DROP PARTITION
|
|
self.execute_query(rollback)
|
|
snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=3)
|
|
assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id()
|
|
assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id()
|
|
assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time()
|
|
|
|
def test_show_files_partition(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-show-files-partition', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_scan_metrics_in_profile_basic(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-scan-metrics-basic', vector)
|
|
|
|
|
|
class TestIcebergV2Table(IcebergTestSuite):
|
|
"""Tests related to Iceberg V2 tables."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestIcebergV2Table, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
add_exec_option_dimension(cls, 'disable_optimized_iceberg_v2_read', [0, 1])
|
|
|
|
def should_run_for_hive(self, vector):
|
|
# Hive interop tests are very slow. Only run them for a subset of dimensions.
|
|
if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 0:
|
|
return True
|
|
return False
|
|
|
|
# The test uses pre-written Iceberg tables where the position delete files refer to
|
|
# the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the
|
|
# dockerised environment the namenode is accessible on a different hostname/port.
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_plain_count_star_optimization(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization',
|
|
vector)
|
|
|
|
def test_count_star_optimization_in_complex_query(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-v2-count-star-optimization-in-complex-query',
|
|
vector, unique_database)
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_read_position_deletes(self, vector):
|
|
# Remove 'batch_size' option so we can set it at .test file.
|
|
# Revisit this if 'batch_size' dimension size increase.
|
|
vector.unset_exec_option('batch_size')
|
|
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector)
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_read_position_deletes_orc(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc', vector)
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
@pytest.mark.execute_serially
|
|
def test_read_position_deletes_compute_stats(self, vector):
|
|
"""Tests COMPUTE STATS on Iceberg V2 tables. Need to be executed serially
|
|
because it modifies tables that are used by other tests (e.g. multiple
|
|
instances of this test)."""
|
|
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-stats', vector)
|
|
self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc-stats', vector)
|
|
|
|
@SkipIfFS.hive
|
|
def test_read_mixed_format_position_deletes(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-mixed-format-position-deletes',
|
|
vector, unique_database)
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_read_null_delete_records(self, vector):
|
|
expected_error = 'NULL found as file_path in delete file'
|
|
query_options = vector.get_value('exec_option')
|
|
v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1
|
|
result = self.execute_query(
|
|
'select * from functional_parquet.iceberg_v2_null_delete_record', query_options)
|
|
assert len(result.data) == 6
|
|
errors = result.log
|
|
print(errors)
|
|
assert expected_error in errors or v2_op_disabled
|
|
result = self.execute_query(
|
|
'select count(*) from functional_parquet.iceberg_v2_null_delete_record',
|
|
query_options)
|
|
assert result.data == ['6']
|
|
errors = result.log
|
|
assert expected_error in errors or v2_op_disabled
|
|
result = self.execute_query(
|
|
"""select * from functional_parquet.iceberg_v2_null_delete_record
|
|
where j < 3""", query_options)
|
|
assert sorted(result.data) == ['1\t1', '2\t2']
|
|
errors = result.log
|
|
assert expected_error in errors or v2_op_disabled
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_read_equality_deletes(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-v2-read-equality-deletes', vector)
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_table_sampling_v2(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-tablesample-v2', vector,
|
|
use_db="functional_parquet")
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_scan_metrics_in_profile_with_deletes(self, vector):
|
|
def get_latest_snapshot_id(fq_tbl_name):
|
|
query = ("select snapshot_id from {}.snapshots order by committed_at desc"
|
|
.format(fq_tbl_name))
|
|
res = self.execute_query(query)
|
|
return res.data[0]
|
|
|
|
ice_db = "functional_parquet"
|
|
|
|
no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes")
|
|
no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes)
|
|
|
|
pos_delete_all_rows = "{}.{}".format(ice_db, "iceberg_v2_positional_delete_all_rows")
|
|
pos_delete_all_rows_snapshot_id = get_latest_snapshot_id(pos_delete_all_rows)
|
|
|
|
not_all_data_files_have_delete_files = "{}.{}".format(
|
|
ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files")
|
|
not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id(
|
|
not_all_data_files_have_delete_files)
|
|
|
|
self.run_test_case('QueryTest/iceberg-scan-metrics-with-deletes', vector,
|
|
test_file_vars={
|
|
"NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id,
|
|
"POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id,
|
|
"NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID":
|
|
not_all_data_files_have_delete_files_snapshot_id
|
|
})
|
|
|
|
@SkipIf.hardcoded_uris
|
|
def test_metadata_tables(self, vector, unique_database):
|
|
# Remove 'batch_size' option so we can set it at .test file.
|
|
# Revisit this if 'batch_size' dimension size increase.
|
|
vector.unset_exec_option('batch_size')
|
|
with self.create_impala_client() as impalad_client:
|
|
overwrite_snapshot_id = impalad_client.execute("select snapshot_id from "
|
|
"functional_parquet.iceberg_query_metadata.snapshots "
|
|
"where operation = 'overwrite';")
|
|
overwrite_snapshot_ts = impalad_client.execute("select committed_at from "
|
|
"functional_parquet.iceberg_query_metadata.snapshots "
|
|
"where operation = 'overwrite';")
|
|
self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
|
|
unique_database,
|
|
test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]),
|
|
'$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])})
|
|
|
|
@SkipIf.not_hdfs
|
|
def test_missing_data_files(self, vector, unique_database):
|
|
def list_files(tbl):
|
|
query_result = self.execute_query("select file_path from {}.`files`".format(tbl))
|
|
return query_result.data
|
|
|
|
def first_snapshot(tbl):
|
|
query_result = self.execute_query(
|
|
"select snapshot_id from {}.`snapshots` order by committed_at".format(tbl))
|
|
return query_result.data[0]
|
|
|
|
def insert_values(tbl, values):
|
|
self.execute_query("insert into {} values {}".format(tbl, values))
|
|
|
|
missing_files_nopart = unique_database + ".missing_files_nopart"
|
|
missing_files_part = unique_database + ".missing_files_part"
|
|
self.execute_query("""CREATE TABLE {} (i int, p int)
|
|
STORED BY ICEBERG
|
|
TBLPROPERTIES('format-version'='2')""".format(missing_files_nopart))
|
|
insert_values(missing_files_nopart, "(1, 1)")
|
|
first_file = set(list_files(missing_files_nopart))
|
|
insert_values(missing_files_nopart, "(2, 2)")
|
|
|
|
all_files = set(list_files(missing_files_nopart))
|
|
assert len(all_files) == 2
|
|
second_file = next(iter(all_files - first_file))
|
|
check_output(["hdfs", "dfs", "-rm", second_file])
|
|
|
|
self.execute_query("""CREATE TABLE {} (i int, p int)
|
|
PARTITIONED BY SPEC (p)
|
|
STORED BY ICEBERG
|
|
TBLPROPERTIES('format-version'='2')""".format(missing_files_part))
|
|
insert_values(missing_files_part, "(1, 1)")
|
|
insert_values(missing_files_part, "(2, 2)")
|
|
files = list_files(missing_files_part)
|
|
part_2_f = None
|
|
for f in files:
|
|
if "p=2" in f:
|
|
part_2_f = f
|
|
break
|
|
assert part_2_f is not None
|
|
check_output(["hdfs", "dfs", "-rm", part_2_f])
|
|
|
|
self.execute_query("invalidate metadata {}".format(missing_files_nopart))
|
|
self.execute_query("invalidate metadata {}".format(missing_files_part))
|
|
|
|
self.run_test_case('QueryTest/iceberg-missing-data-files', vector,
|
|
unique_database,
|
|
test_file_vars={
|
|
'$NOPART_FIRST_SNAPSHOT': first_snapshot(missing_files_nopart),
|
|
'$PART_FIRST_SNAPSHOT': first_snapshot(missing_files_part)})
|
|
|
|
def test_delete(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-delete', vector,
|
|
unique_database)
|
|
|
|
def test_delete_partitioned(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-delete-partitioned', vector,
|
|
unique_database)
|
|
if IS_HDFS and self.should_run_for_hive(vector):
|
|
self._delete_partitioned_hive_tests(unique_database)
|
|
|
|
def _delete_partitioned_hive_tests(self, db):
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
|
|
db, "id_part"))
|
|
assert hive_output == "id_part.i,id_part.s\n"
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
|
|
db, "trunc_part"))
|
|
assert hive_output == "trunc_part.i,trunc_part.s\n"
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
|
|
db, "multi_part"))
|
|
assert hive_output == "multi_part.i,multi_part.s,multi_part.f\n"
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
|
|
db, "evolve_part"))
|
|
assert hive_output == \
|
|
"evolve_part.i,evolve_part.s,evolve_part.f\n3,three,3.33\n" \
|
|
"30,thirty,30.3\n40,forty,40.4\n"
|
|
hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}.{}".format(
|
|
db, "ice_store_sales"))
|
|
assert hive_output == "_c0\n2601498\n"
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{}".format(
|
|
db, "ice_alltypes_part_v2"))
|
|
# Cut off the long header line.
|
|
hive_output = hive_output.split("\n", 1)
|
|
hive_output = hive_output[1]
|
|
assert hive_output == \
|
|
"2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n"
|
|
|
|
def test_large_scale_deletes(self, vector, unique_database):
|
|
if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 1:
|
|
pytest.skip("Only test the optimized v2 operator")
|
|
self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector,
|
|
unique_database)
|
|
|
|
@SkipIfFS.hive
|
|
def test_delete_hive_read(self, unique_database):
|
|
ice_delete = unique_database + ".ice_delete"
|
|
self.execute_query("""CREATE TABLE {} (i int, s string)
|
|
STORED BY ICEBERG
|
|
TBLPROPERTIES('format-version'='2')""".format(ice_delete))
|
|
self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete))
|
|
self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete))
|
|
self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete))
|
|
self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete))
|
|
|
|
# Hive needs table property 'format-version' explicitly set
|
|
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
|
|
ice_delete))
|
|
hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY i".format(ice_delete))
|
|
expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n"
|
|
assert hive_output == expected_output
|
|
|
|
ice_lineitem = unique_database + ".linteitem_ice"
|
|
self.execute_query("""CREATE TABLE {}
|
|
STORED BY ICEBERG
|
|
TBLPROPERTIES('format-version'='2')
|
|
AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem))
|
|
self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 0".format(ice_lineitem))
|
|
impala_result = self.execute_query("SELECT count(*) FROM {}".format(ice_lineitem))
|
|
assert impala_result.data[0] == "4799964"
|
|
# Hive needs table property 'format-version' explicitly set
|
|
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
|
|
ice_lineitem))
|
|
hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}".format(ice_lineitem))
|
|
assert hive_output == "_c0\n4799964\n"
|
|
|
|
@SkipIfFS.hive
|
|
def test_delete_complextypes_mixed_files(self, vector, unique_database):
|
|
ice_t = unique_database + ".ice_complex_delete"
|
|
self.run_stmt_in_hive("""create table {}
|
|
stored by iceberg stored as orc as
|
|
select * from functional_parquet.complextypestbl;""".format(ice_t))
|
|
# Hive needs table property 'format-version' explicitly set
|
|
self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format(
|
|
ice_t))
|
|
self.run_stmt_in_hive("""alter table {}
|
|
set tblproperties ('write.format.default'='parquet')""".format(ice_t))
|
|
self.run_stmt_in_hive("""insert into {}
|
|
select * from functional_parquet.complextypestbl""".format(ice_t))
|
|
|
|
vector.get_value('exec_option')['expand_complex_types'] = True
|
|
self.run_test_case('QueryTest/iceberg-delete-complex', vector,
|
|
unique_database)
|
|
hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t))
|
|
# Test that Hive sees the same rows deleted.
|
|
assert hive_output == "id\n4\n5\n6\n7\n8\n"
|
|
|
|
def test_update_basic(self, vector, unique_database):
|
|
udf_location = get_fs_path('/test-warehouse/libTestUdfs.so')
|
|
self.run_test_case('QueryTest/iceberg-update-basic', vector,
|
|
unique_database, test_file_vars={'UDF_LOCATION': udf_location})
|
|
self._test_update_basic_snapshots(unique_database)
|
|
if IS_HDFS and self.should_run_for_hive(vector):
|
|
self._update_basic_hive_tests(unique_database)
|
|
|
|
def _test_update_basic_snapshots(self, db):
|
|
"""Verifies that the tables have the expected number of snapshots, and
|
|
the parent ids match the previous snapshot ids. See IMPALA-12708."""
|
|
|
|
self.validate_snapshots(db, "single_col", 3)
|
|
self.validate_snapshots(db, "ice_alltypes", 21)
|
|
self.validate_snapshots(db, "ice_id_partitioned", 7)
|
|
|
|
def validate_snapshots(self, db, tbl, expected_snapshots):
|
|
tbl_name = "{}.{}".format(db, tbl)
|
|
snapshots = get_snapshots(self.client, tbl_name,
|
|
expected_result_size=expected_snapshots)
|
|
parent_id = None
|
|
for s in snapshots:
|
|
assert s.get_parent_id() == parent_id
|
|
parent_id = s.get_snapshot_id()
|
|
|
|
def _update_basic_hive_tests(self, db):
|
|
def get_hive_results(tbl, order_by_col):
|
|
stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
|
|
return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
|
|
|
|
hive_results = get_hive_results("single_col", "i")
|
|
assert hive_results == "1\n3\n4\n"
|
|
|
|
hive_results = get_hive_results("ice_alltypes", "bool_col")
|
|
assert hive_results == \
|
|
"false,0,111,0.0,0.0,234,123.00,2023-11-07,2000-01-01 00:00:00.0,IMPALA,zerob\n" \
|
|
"true,3,222,1.0,1.0,NULL,NULL,2023-11-08,2001-01-01 01:01:01.0,ICEBERG,oneb\n"
|
|
|
|
hive_results = get_hive_results("ice_id_partitioned", "i")
|
|
assert hive_results == \
|
|
"1,0,APACHE IMPALA\n" \
|
|
"2,0,iceberg\n" \
|
|
"3,0,hive\n" \
|
|
"5,2,Kudu\n" \
|
|
"10,1,Apache Spark\n"
|
|
|
|
hive_results = get_hive_results("ice_bucket_transform", "i")
|
|
assert hive_results == \
|
|
"2,a fairly long string value,1000,1999-09-19 12:00:01.0\n" \
|
|
"4,bbb,2030,2001-01-01 00:00:00.0\n" \
|
|
"6,cccccccccccccccccccccccccccccccccccccccc,-123,2023-11-24 17:44:30.0\n"
|
|
|
|
hive_results = get_hive_results("ice_time_transforms_timestamp", "id")
|
|
assert hive_results == \
|
|
"1.5000,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0\n" \
|
|
"2.4690,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0\n" \
|
|
"1999.9998,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0\n" # noqa: E501
|
|
|
|
hive_results = get_hive_results("ice_time_transforms_date", "id")
|
|
assert hive_results == \
|
|
"1.5000,2001-01-01,2001-01-01,2001-01-01\n" \
|
|
"2.4690,2023-11-24,2023-11-24,2023-11-24\n" \
|
|
"1999.9998,2199-12-31,2199-12-31,2199-12-31\n"
|
|
|
|
hive_results = get_hive_results("ice_part_transforms", "i")
|
|
assert hive_results == \
|
|
"1,2023-11-13 18:07:05.0,blue,1234\n" \
|
|
"3,2023-11-14 19:07:05.0,green,1700\n" \
|
|
"4,2023-11-13 18:07:23.0,gray,2500\n" \
|
|
"8,2023-11-01 00:11:11.0,black,722\n"
|
|
|
|
def test_update_partitions(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-update-partitions', vector,
|
|
unique_database)
|
|
if IS_HDFS and self.should_run_for_hive(vector):
|
|
self._update_partitions_hive_tests(unique_database)
|
|
|
|
def _update_partitions_hive_tests(self, db):
|
|
def get_hive_results(tbl, order_by_col):
|
|
stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
|
|
return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
|
|
|
|
hive_results = get_hive_results("id_part", "i, s")
|
|
assert hive_results == \
|
|
"2,FIVE\n" \
|
|
"2,FOUR\n" \
|
|
"3,SIX\n" \
|
|
"5,ONE\n" \
|
|
"10,TWO\n" \
|
|
"15,THREE\n"
|
|
|
|
hive_results = get_hive_results("trunc_part", "i")
|
|
assert hive_results == \
|
|
"1,one\n" \
|
|
"5,five\n" \
|
|
"103,three\n" \
|
|
"1004,FOURfour\n" \
|
|
"1006,SIXsix\n" \
|
|
"1102,TWOtwo\n"
|
|
|
|
hive_results = get_hive_results("multi_part", "i")
|
|
assert hive_results == \
|
|
"0,void,3.14\n" \
|
|
"0,void,3.14\n" \
|
|
"0,void,3.14\n" \
|
|
"1,one,1.1\n" \
|
|
"3,three,3.33\n" \
|
|
"5,five,5.5\n" \
|
|
"111,fox,1.1\n"
|
|
|
|
hive_results = get_hive_results("evolve_part", "i")
|
|
assert hive_results == \
|
|
"1,one,1.1\n" \
|
|
"30,thirty,30.3\n" \
|
|
"40,forty,40.4\n" \
|
|
"50,fifty,50.5\n" \
|
|
"1003,three,3.33\n" \
|
|
"1010,ten,10.1\n" \
|
|
"1020,twenty,20.2\n" \
|
|
"1222,two,2.2\n"
|
|
|
|
hive_results = get_hive_results("date_day_part", "i")
|
|
assert hive_results == \
|
|
"11,1978-01-01\n" \
|
|
"12,1979-12-31\n" \
|
|
"13,1980-01-01\n" \
|
|
"14,2033-11-15\n"
|
|
|
|
hive_results = get_hive_results("ts_hour_part", "i")
|
|
assert hive_results == \
|
|
"101,1958-01-01 01:02:03.0\n" \
|
|
"102,1959-12-31 23:59:00.0\n" \
|
|
"103,1960-01-01 00:00:00.0\n" \
|
|
"104,2013-11-15 15:31:00.0\n"
|
|
|
|
hive_results = get_hive_results("ts_evolve_part", "i")
|
|
assert hive_results == \
|
|
"1001,1988-02-02 01:02:03.0\n" \
|
|
"1002,1990-02-01 23:59:00.0\n" \
|
|
"1003,1990-02-02 00:00:00.0\n" \
|
|
"1004,2043-12-16 15:31:00.0\n" \
|
|
"1111,NULL\n"
|
|
|
|
hive_results = get_hive_results("numeric_truncate", "id")
|
|
assert hive_results == \
|
|
"11,21,2111,531,75.20\n"
|
|
|
|
# HIVE-28048: Hive cannot run ORDER BY queries for Iceberg tables partitioned by
|
|
# decimal columns, so we order the results ourselves.
|
|
hive_results = self.run_stmt_in_hive("SELECT * FROM {}.{}".format(
|
|
db, "ice_alltypes_part_v2"))
|
|
# Throw away the header line and sort the results.
|
|
hive_results = hive_results.split("\n", 1)[1]
|
|
hive_results = hive_results.strip().split("\n")
|
|
hive_results.sort()
|
|
assert hive_results == [
|
|
"2,true,2,11,1.1,2.222,123.321,2022-04-22,impala",
|
|
"3,true,3,11,1.1,2.222,123.321,2022-05-22,impala"]
|
|
|
|
def test_optimize(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database)
|
|
expected_snapshots = 19
|
|
self.validate_snapshots(unique_database, "ice_optimize", expected_snapshots)
|
|
|
|
# The last operation was an OPTIMIZE TABLE statement.
|
|
# Check that time travel to the previous snapshot returns all results correctly.
|
|
tbl_name = unique_database + ".ice_optimize"
|
|
snapshots = get_snapshots(
|
|
self.client, tbl_name, expected_result_size=expected_snapshots)
|
|
snapshot_before_last = snapshots[-2]
|
|
|
|
result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name))
|
|
result_time_travel = self.execute_query(
|
|
"select * from {0} for system_version as of {1};".format(
|
|
tbl_name, snapshot_before_last.get_snapshot_id()))
|
|
assert result_after_opt.data.sort() == result_time_travel.data.sort()
|
|
|
|
def _check_file_filtering(self, tbl_name, threshold_mb, mode, had_partition_evolution):
|
|
threshold_bytes = threshold_mb * 1024 * 1024
|
|
DATA_FILE = "0"
|
|
DELETE_FILE = "1"
|
|
FileMetadata = namedtuple('FileMetadata', 'content, path, partition, size')
|
|
metadata_query = """select content, file_path, `partition`, file_size_in_bytes
|
|
from {0}.`files`;""".format(tbl_name)
|
|
result = self.execute_query(metadata_query)
|
|
files_before = set()
|
|
files_per_partition = defaultdict(set)
|
|
for line in result.data:
|
|
file = FileMetadata._make(line.split("\t"))
|
|
partition = file.partition
|
|
files_per_partition[partition].add(file)
|
|
files_before.add(file)
|
|
|
|
selected_files = set()
|
|
partitions_with_removed_files = set()
|
|
for partition, files in files_per_partition.items():
|
|
if len(files) > 1:
|
|
num_small_files = 0
|
|
# count small datafiles
|
|
for file in files:
|
|
if file.content == DATA_FILE and int(file.size) < threshold_bytes:
|
|
num_small_files += 1
|
|
for file in files:
|
|
# We assume that a delete file in a partition references all data files in
|
|
# that partition, because we cannot differentiate between data files
|
|
# with/without deletes.
|
|
if file.content == DELETE_FILE:
|
|
selected_files.update(files)
|
|
partitions_with_removed_files.add(partition)
|
|
break
|
|
# Only merge small files if there are at least 2 of them.
|
|
elif num_small_files > 1 and int(file.size) < threshold_bytes:
|
|
selected_files.add(file)
|
|
partitions_with_removed_files.add(partition)
|
|
|
|
self.execute_query(
|
|
"OPTIMIZE TABLE {0} (file_size_threshold_mb={1});".format(tbl_name, threshold_mb))
|
|
optimized_result = self.execute_query(metadata_query)
|
|
files_after = set()
|
|
for line in optimized_result.data:
|
|
file = FileMetadata._make(line.split("\t"))
|
|
files_after.add(file)
|
|
|
|
# Check the resulting files and the modified partitions after the OPTIMIZE operation.
|
|
# files_after = files_before - selected_files + 1 new file per partition
|
|
# The result should not contain the files that were selected and should contain 1 new
|
|
# file per written partition.
|
|
unchanged_files = files_before - selected_files
|
|
# Check that files that were not selected are still present in the result.
|
|
assert unchanged_files.issubset(files_after)
|
|
# Check that selected files are rewritten and not present in the result.
|
|
assert selected_files.isdisjoint(files_after)
|
|
new_files = files_after - unchanged_files
|
|
assert new_files == files_after - files_before
|
|
|
|
if mode == "NOOP":
|
|
assert selected_files == set([])
|
|
assert files_after == files_before
|
|
elif mode == "REWRITE_ALL":
|
|
assert selected_files == files_before
|
|
assert files_after.isdisjoint(files_before)
|
|
elif mode == "PARTIAL":
|
|
assert selected_files < files_before and selected_files != set([])
|
|
assert unchanged_files < files_after and unchanged_files != set([])
|
|
assert unchanged_files == files_after.intersection(files_before)
|
|
|
|
# Check that all delete files were merged.
|
|
for file in files_after:
|
|
assert file.content == DATA_FILE
|
|
# Check that there's only one new file in every partition.
|
|
partitions_with_new_files = set()
|
|
for file in new_files:
|
|
assert file.partition not in partitions_with_new_files
|
|
partitions_with_new_files.add(file.partition)
|
|
assert len(new_files) == len(partitions_with_new_files)
|
|
|
|
# WITH PARTITION EVOLUTION
|
|
# Only new partitions are written to.
|
|
# WITHOUT PARTITION EVOLUTION
|
|
if not had_partition_evolution:
|
|
# Check that 1 new content file is written in every updated partition.
|
|
assert len(new_files) == len(partitions_with_removed_files)
|
|
assert partitions_with_new_files == partitions_with_removed_files
|
|
|
|
def test_optimize_file_filtering(self, unique_database):
|
|
tbl_name = unique_database + ".ice_optimize_filter"
|
|
self.execute_query("""CREATE TABLE {0} partitioned by spec (l_linenumber)
|
|
STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')
|
|
AS SELECT * FROM tpch_parquet.lineitem
|
|
WHERE l_quantity < 10;""".format(tbl_name))
|
|
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
|
|
WHERE l_quantity>=10 AND l_quantity<=12;""".format(tbl_name))
|
|
# There are no delete files in the table, so this should be a no-op. Check that no new
|
|
# snapshot was created.
|
|
self._check_file_filtering(tbl_name, 0, "NOOP", False)
|
|
assert len(get_snapshots(self.client, tbl_name)) == 2
|
|
self._check_file_filtering(tbl_name, 5, "PARTIAL", False)
|
|
self._check_file_filtering(tbl_name, 50, "PARTIAL", False)
|
|
# Check that the following is a no-op, since the table is already in a compact form.
|
|
self._check_file_filtering(tbl_name, 100, "NOOP", False)
|
|
self.execute_query("""UPDATE {0} SET l_linenumber=7 WHERE l_linenumber>4 AND
|
|
l_linestatus='F';""".format(tbl_name))
|
|
self._check_file_filtering(tbl_name, 6, "PARTIAL", False)
|
|
self.execute_query("""ALTER TABLE {0} SET PARTITION SPEC(l_linestatus);"""
|
|
.format(tbl_name))
|
|
self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL'
|
|
AND l_linenumber<4;""".format(tbl_name))
|
|
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
|
|
WHERE l_quantity=13 AND l_linenumber<3;""".format(tbl_name))
|
|
self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
|
|
WHERE l_quantity=14 AND l_linenumber<3;""".format(tbl_name))
|
|
# Merges the delete files and rewrites the small files.
|
|
self._check_file_filtering(tbl_name, 2, "PARTIAL", True)
|
|
# Rewrites the remaining small files (2MB <= file_size < 100MB).
|
|
self._check_file_filtering(tbl_name, 100, "PARTIAL", True)
|
|
self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL';"""
|
|
.format(tbl_name))
|
|
# All partitions have delete files, therefore the entire table is rewritten.
|
|
self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True)
|
|
|
|
def test_merge(self, vector, unique_database):
|
|
udf_location = get_fs_path('/test-warehouse/libTestUdfs.so')
|
|
self.run_test_case('QueryTest/iceberg-merge', vector, unique_database,
|
|
test_file_vars={'UDF_LOCATION': udf_location})
|
|
|
|
def test_merge_partition(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database)
|
|
|
|
def test_merge_partition_sort(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, unique_database)
|
|
|
|
def test_merge_long(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database)
|
|
|
|
def test_merge_star(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
|
|
|
|
def test_merge_equality_update(self, vector, unique_database):
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_v2_delete_equality_partitioned", "parquet",
|
|
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
|
|
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
|
|
self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database)
|
|
|
|
def test_merge_equality_insert(self, vector, unique_database):
|
|
create_iceberg_table_from_directory(self.client, unique_database,
|
|
"iceberg_v2_delete_equality_partitioned", "parquet",
|
|
table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice",
|
|
warehouse_prefix=os.getenv("FILESYSTEM_PREFIX"))
|
|
self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database)
|
|
|
|
def test_merge_duplicate_check(self, vector, unique_database):
|
|
"""Regression test for IMPALA-13932"""
|
|
# Remove 'num_nodes' option so we can set it at .test file.
|
|
vector.unset_exec_option('num_nodes')
|
|
self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, unique_database)
|
|
|
|
def test_writing_multiple_deletes_per_partition(self, vector, unique_database):
|
|
"""Test writing multiple delete files partition in a single DELETE operation."""
|
|
self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_cleanup(self, unique_database):
|
|
"""Test that all uncommitted files written by Impala are removed from the file
|
|
system when a DML commit to an Iceberg table fails, and that the effects of the
|
|
failed operation are not visible."""
|
|
table_name = "iceberg_cleanup_failure"
|
|
fq_tbl_name = unique_database + "." + table_name
|
|
# The query options that inject an iceberg validation check failure.
|
|
fail_ice_commit_options = {'debug_action':
|
|
'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@'
|
|
'ValidationException@'
|
|
'simulated validation check failure'}
|
|
# Create an iceberg table and insert a row.
|
|
self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i int)
|
|
STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')""".format(fq_tbl_name))
|
|
self.execute_query_expect_success(self.client,
|
|
"insert into {0} values (1)".format(fq_tbl_name))
|
|
|
|
# Run a query that would update a row, but pass the query options that
|
|
# will cause the iceberg validation check to fail.
|
|
err = self.execute_query_expect_failure(self.client,
|
|
"update {0} set i=2 where i=1".format(fq_tbl_name),
|
|
query_options=fail_ice_commit_options)
|
|
# Check that we get the error message.
|
|
assert error_msg_startswith(str(err),
|
|
"ImpalaRuntimeException: simulated validation check failure\n"
|
|
"CAUSED BY: ValidationException: simulated validation check failure")
|
|
# Check that the table content was not updated.
|
|
data = self.execute_query_expect_success(self.client,
|
|
"select * from {0}".format(fq_tbl_name))
|
|
assert len(data.data) == 1
|
|
assert data.data[0] == '1'
|
|
|
|
# Check that the uncommitted data and delete files are removed from the file system
|
|
# and only the first data file remains.
|
|
table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
|
|
FILESYSTEM_PREFIX, unique_database, table_name)
|
|
files_result = check_output(["hdfs", "dfs", "-ls", table_location])
|
|
assert "Found 1 items" in bytes_to_str(files_result)
|
|
|
|
def test_predicate_push_down_hint(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector,
|
|
use_db=unique_database)
|
|
|
|
def test_partitions(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/iceberg-partitions', vector, unique_database)
|
|
tbl_name = unique_database + ".ice_num_partitions"
|
|
snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4)
|
|
second_snapshot = snapshots[1]
|
|
time_travel_data = self.execute_query(
|
|
"SELECT * FROM {0} for system_version as of {1};".format(
|
|
tbl_name, second_snapshot.get_snapshot_id()))
|
|
assert "partitions=4/unknown" in time_travel_data.runtime_profile
|
|
selective_time_travel_data = self.execute_query(
|
|
"SELECT * FROM {0} for system_version as of {1} WHERE id < 5;".format(
|
|
tbl_name, second_snapshot.get_snapshot_id()))
|
|
assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
|
|
|
|
def test_table_repair(self, unique_database):
|
|
tbl_name = 'tbl_with_removed_files'
|
|
db_tbl = unique_database + "." + tbl_name
|
|
repair_query = "alter table {0} execute repair_metadata()"
|
|
with self.create_impala_client() as impalad_client:
|
|
impalad_client.execute(
|
|
"create table {0} (i int) stored as iceberg tblproperties('format-version'='2')"
|
|
.format(db_tbl))
|
|
insert_q = "insert into {0} values ({1})"
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4))
|
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5))
|
|
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
|
|
assert result.data == ['1', '2', '3', '4', '5']
|
|
|
|
TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name)
|
|
DATA_PATH = os.path.join(TABLE_PATH, "data")
|
|
|
|
# Check that table remains intact if there are no missing files
|
|
result = self.execute_query_expect_success(
|
|
impalad_client, repair_query.format(db_tbl))
|
|
assert result.data[0] == "No missing data files detected."
|
|
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
|
|
assert result.data == ['1', '2', '3', '4', '5']
|
|
|
|
# Delete 2 data files from the file system directly to corrupt the table.
|
|
data_files = self.filesystem_client.ls(DATA_PATH)
|
|
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0])
|
|
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1])
|
|
self.execute_query_expect_success(impalad_client, "invalidate metadata")
|
|
result = self.execute_query_expect_success(
|
|
impalad_client, repair_query.format(db_tbl))
|
|
assert result.data[0] == \
|
|
"Iceberg table repaired by deleting 2 manifest entries of missing data files."
|
|
result = impalad_client.execute('select * from {0} order by i'.format(db_tbl))
|
|
assert len(result.data) == 3
|
|
|
|
|
|
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most
|
|
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
|
|
# runs also with the V2 optimizations setting turned off, some tests were moved here.
|
|
class TestIcebergDirectedMode(IcebergTestSuite):
|
|
"""Tests related to Iceberg DIRECTED distribution mode."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestIcebergDirectedMode, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
|
|
@SkipIfDockerizedCluster.internal_hostname
|
|
@SkipIf.hardcoded_uris
|
|
def test_directed_mode(self, vector):
|
|
self.run_test_case('QueryTest/iceberg-v2-directed-mode', vector)
|