IMPALA-12765: Balance consecutive partitions better for Iceberg tables

During remote read scheduling Impala does the following:

Non-Iceberg tables
 * The scheduler processes the scan ranges in partition key order
 * The scheduler selects N executors as candidates
 * The scheduler chooses the executor from the candidates based on
   minimum number of assigned bytes
 * So consecutive partitions are more likely to be assigned to
   different executors

Iceberg tables
 * The scheduler processes the scan ranges in random order
 * The scheduler selects N executors as candidates
 * The scheduler chooses the executor from the candidates based on
   minimum number of assigned bytes
 * So consecutive partitions (by partition key order) are assigned
   randomly, i.e. there's a higher chance of clustering

With this patch, IcebergScanNode orders its file descriptors based on
their paths, so we will have a more balanced scheduling for consecutive
partitions. It is especially important for queries that prune partitions
via runtime filters (e.g. due to a JOIN), because it doesn't matter that
we schedule the scan ranges evenly, the scan ranges that survive the
runtime filters can still be clustered on certain executors.

E.g. TPC-DS Q22 has the following JOIN and WHERE predicates:

 inv_date_sk=d_date_sk and
 d_month_seq between 1199 and 1199 + 11

The Inventory table is partitioned by column inv_date_sk, and we filter
the rows in the joined table by 'd_month_seq between 1199 and
1199 + 11'. This means that we will only need a range of partitions from
the Inventory table, but that range will only be revealed during
runtime. Scheduling neighbouring partitions to different executors means
that the surviving partitions are spread across executors more evenly.

Testing:
 * e2e test

Change-Id: I60773965ecbb4d8e659db158f1f0ac76086d5578
Reviewed-on: http://gerrit.cloudera.org:8080/20973
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2024-01-29 19:11:52 +01:00
committed by Impala Public Jenkins
parent 46f0431321
commit 62a3168eca
2 changed files with 64 additions and 2 deletions

View File

@@ -1022,7 +1022,7 @@ class TestIcebergTable(IcebergTestSuite):
"""IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for
Iceberg tables."""
def collect_split_stats(profile):
splits = [l.strip() for l in profile.splitlines() if "Hdfs split stats" in l]
splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s]
splits.sort()
return splits
@@ -1041,6 +1041,54 @@ class TestIcebergTable(IcebergTestSuite):
split_stats = collect_split_stats(profile)
assert ref_split_stats == split_stats
def test_scheduling_partitioned_tables(self, vector, unique_database):
"""IMPALA-12765: Balance consecutive partitions better for Iceberg tables"""
# We are setting the replica_preference query option in this test, so let's create a
# local impala client.
inventory_tbl = "inventory_ice"
item_tbl = "item_ice"
date_dim_tbl = "date_dim_ice"
with self.create_impala_client() as impalad_client:
impalad_client.execute("use " + unique_database)
impalad_client.execute("set replica_preference=remote")
impalad_client.execute("""
CREATE TABLE {}
PARTITIONED BY SPEC (inv_date_sk)
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.inventory;
""".format(inventory_tbl))
impalad_client.execute("""
CREATE TABLE {}
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.item;
""".format(item_tbl))
impalad_client.execute("""
CREATE TABLE {}
STORED BY ICEBERG
AS SELECT * from tpcds_partitioned_parquet_snap.date_dim;
""".format(date_dim_tbl))
q22_result = impalad_client.execute("""
select i_product_name, i_brand, i_class, i_category,
avg(inv_quantity_on_hand) qoh
from inventory_ice, date_dim_ice, item_ice
where inv_date_sk=d_date_sk and
inv_item_sk=i_item_sk and
d_month_seq between 1199 and 1199 + 11
group by rollup(i_product_name, i_brand, i_class, i_category)
order by qoh, i_product_name, i_brand, i_class, i_category
limit 100
""")
profile = q22_result.runtime_profile
# "Files rejected:" contains the number of files being rejected by runtime
# filters. With IMPALA-12765 we should see similar numbers for each executor.
files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile)
avg_files_rejected = int(files_rejected_array[0])
THRESHOLD = 3
for files_rejected_str in files_rejected_array:
files_rejected = int(files_rejected_str)
if files_rejected != 0:
assert abs(avg_files_rejected - files_rejected) < THRESHOLD
def test_in_predicate_push_down(self, vector, unique_database):
self.execute_query("SET RUNTIME_FILTER_MODE=OFF")
self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector,