IMPALA-14075: Add CatalogOpExecutor.icebergExecutorService_

Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a
single thread. It can be really slow on cloud storage systems,
especially if the operation needs to remove lots of files.

This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize
Iceberg API call that supports passing ExecutorService, such as
ExpireSnapshots.executeDeleteWith(). Number of threads for this executor
service is controlled by CatalogD flag --iceberg_catalog_num_threads. It
is default to 16, same as --num_metadata_loading_threads default value.

Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match
with other validators in backend-gflag-util.cc.

Testing:
- Lower sleep time between insert queries from 5s to 1s in
  test_expire_snapshots and test_describe_history_params to speed up
  tests.
- Manually verify that 'IcebergCatalogThread' threads are visible in
  /jvm-threadz page of CatalogD.
- Pass test_iceberg.py.

Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485
Reviewed-on: http://gerrit.cloudera.org:8080/22980
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-06-04 11:33:26 -07:00
committed by Impala Public Jenkins
parent d630d6f8af
commit ccb8eac10a
6 changed files with 37 additions and 10 deletions

View File

@@ -293,6 +293,10 @@ DEFINE_int64_hidden(data_stream_sender_buffer_size_used_by_planner, -1,
"With default -1 the planner uses the old logic that is different"
"than how the backend actually works (see IMPALA-12594)");
DEFINE_int32(iceberg_catalog_num_threads, 16,
"Maximum number of threads to use for Iceberg catalog operations. These threads are "
"shared among concurrent Iceberg catalog operation (ie., ExpireSnapshot).");
using strings::Substitute;
namespace impala {
@@ -308,7 +312,7 @@ static bool ValidatePositiveDouble(const char* flagname, double value) {
return false;
}
static bool ValidateMinProcessingPerThread(const char* flagname, int64_t value) {
static bool ValidatePositiveInt64(const char* flagname, int64_t value) {
if (0 < value) {
return true;
}
@@ -317,9 +321,14 @@ static bool ValidateMinProcessingPerThread(const char* flagname, int64_t value)
return false;
}
static bool ValidatePositiveInt32(const char* flagname, int32_t value) {
return ValidatePositiveInt64(flagname, value);
}
DEFINE_validator(query_cpu_count_divisor, &ValidatePositiveDouble);
DEFINE_validator(min_processing_per_thread, &ValidateMinProcessingPerThread);
DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64);
DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble);
DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32);
Status GetConfigFromCommand(const string& flag_cmd, string& result) {
result.clear();
@@ -534,6 +543,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_max_outstanding_events_on_executors(
FLAGS_max_outstanding_events_on_executors);
cfg.__set_consolidate_grant_revoke_requests(FLAGS_consolidate_grant_revoke_requests);
cfg.__set_iceberg_catalog_num_threads(FLAGS_iceberg_catalog_num_threads);
return Status::OK();
}

View File

@@ -339,4 +339,6 @@ struct TBackendGflags {
153: required i32 max_outstanding_events_on_executors
154: required bool consolidate_grant_revoke_requests
155: required i32 iceberg_catalog_num_threads
}

View File

@@ -577,4 +577,8 @@ public class BackendConfig {
public boolean consolidateGrantRevokeRequests() {
return backendCfg_.consolidate_grant_revoke_requests;
}
public int icebergCatalogNumThreads() {
return backendCfg_.iceberg_catalog_num_threads;
}
}

View File

@@ -30,6 +30,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -49,6 +50,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -408,6 +411,7 @@ public class CatalogOpExecutor {
private final AuthorizationConfig authzConfig_;
private final AuthorizationManager authzManager_;
private final HiveJavaFunctionFactory hiveJavaFuncFactory_;
private final ExecutorService icebergExecutorService_;
// A singleton monitoring class that keeps track of the catalog operations.
private final CatalogOperationTracker catalogOpTracker_ =
@@ -425,6 +429,10 @@ public class CatalogOpExecutor {
authzConfig_ = Preconditions.checkNotNull(authzConfig);
authzManager_ = Preconditions.checkNotNull(authzManager);
hiveJavaFuncFactory_ = Preconditions.checkNotNull(hiveJavaFuncFactory);
icebergExecutorService_ =
Executors.newFixedThreadPool(BackendConfig.INSTANCE.icebergCatalogNumThreads(),
new ThreadFactoryBuilder().setNameFormat("IcebergCatalogThread-%d")
.build());
}
public CatalogServiceCatalog getCatalog() { return catalog_; }
@@ -1611,8 +1619,9 @@ public class CatalogOpExecutor {
addSummary(response, rollbackSummary);
} else if (setExecuteParams.isSetExpire_snapshots_params()) {
String expireSummary =
IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
iceTxn, setExecuteParams.getExpire_snapshots_params());
IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(iceTxn,
setExecuteParams.getExpire_snapshots_params(),
icebergExecutorService_);
addSummary(response, expireSummary);
} else {
// Cannot happen, but throw just in case.

View File

@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
@@ -211,11 +212,12 @@ public class IcebergCatalogOpExecutor {
* TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
* should be retained even when all snapshots are selected by expireOlderThan().
*/
public static String alterTableExecuteExpireSnapshots(
Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
public static String alterTableExecuteExpireSnapshots(Transaction txn,
TAlterTableExecuteExpireSnapshotsParams params, ExecutorService executors) {
ExpireSnapshots expireApi = txn.expireSnapshots();
Preconditions.checkState(params.isSetOlder_than_millis());
expireApi.expireOlderThan(params.older_than_millis);
expireApi.executeDeleteWith(executors);
expireApi.commit();
return "Snapshots have been expired.";
}

View File

@@ -92,9 +92,9 @@ class TestIcebergTable(IcebergTestSuite):
ts_0 = datetime.datetime.now()
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(5)
time.sleep(1)
impalad_client.execute(insert_q)
time.sleep(5)
time.sleep(1)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
impalad_client.execute(insert_q)
@@ -472,9 +472,9 @@ class TestIcebergTable(IcebergTestSuite):
.format(tbl_name))
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(5)
time.sleep(1)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(5)
time.sleep(1)
ts_3 = self.execute_query_ts(impalad_client, insert_q)
# Describe history without predicate
data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))