mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-11402: Add limit on files fetched by a single getPartialCatalogObject request
getPartialCatalogObject is a catalogd RPC used by local catalog mode coordinators to fetch metadata on-demand from catalogd. For a table with a huge number (e.g. 6M) of files, catalogd might hit OOM of exceeding the JVM array limit when serializing the response of a getPartialCatalogObject request for all partitions (thus all files). This patch adds a new flag, catalog_partial_fetch_max_files, to define the max number of file descriptors allowed in a response of getPartialCatalogObject. Catalogd will truncate the response in partition level when it's too big, and only return a subset of the requested partitions. Coordinator should send new requests to fetch the remaining partitions. Note that it's possible that table metadata changes between the requests. Coordinator will detect the catalog version changes and throws an InconsistentMetadataFetchException for the planner to replan the query. This is an existing mechanism for other kinds of table metadata. Here are some metrics of the number of files in a single response and the corresponding byte array size and duration of a single response: * 1000000: 371.71MB, 1s487ms * 2000000: 744.51MB, 4s035ms * 3000000: 1.09GB, 6s643ms * 4000000: 1.46GB, duration not measured due to GC pauses * 5000000: 1.82GB, duration not measured due to GC pauses * 6000000: >2GB (hit OOM) Choose 1000000 as the default value for now. We can tune it in the future. Tests: - Added custom-cluster test - Ran e2e tests in local-catalog mode with catalog_partial_fetch_max_files=1000 so the new codes are used. Change-Id: Ibb13fec20de5a17e7fc33613ca5cdebb9ac1a1e5 Reviewed-on: http://gerrit.cloudera.org:8080/22559 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
b62de19c12
commit
4ddacac14f
@@ -76,6 +76,12 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
|
||||
"(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
|
||||
"to run. Must be set to a value greater than zero.");
|
||||
|
||||
DEFINE_int32(catalog_partial_fetch_max_files, 1000000, "Maximum number of file "
|
||||
"descriptors allowed to return in a single getPartialCatalogObject RPC. Used to "
|
||||
"avoid hitting the JVM array limit when catalogd serializing the thrift response. "
|
||||
"Note that getPartialCatalogObject RPCs are only used in local catalog mode "
|
||||
"coordinators so this is unrelated to the legacy catalog mode.");
|
||||
|
||||
DEFINE_int32(catalog_max_lock_skipped_topic_updates, 3, "Maximum number of topic "
|
||||
"updates skipped for a table due to lock contention in catalogd after which it must"
|
||||
"be added to the topic the update log. This limit only applies to distinct lock "
|
||||
@@ -379,9 +385,12 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
|
||||
status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
|
||||
}
|
||||
if (!status.ok()) LOG(ERROR) << status.GetDetail();
|
||||
TStatus thrift_status;
|
||||
status.ToThrift(&thrift_status);
|
||||
resp.__set_status(thrift_status);
|
||||
// Don't overwrite the non-OK status returned from catalogd
|
||||
if (!resp.__isset.status || resp.status.status_code == TErrorCode::OK) {
|
||||
TStatus thrift_status;
|
||||
status.ToThrift(&thrift_status);
|
||||
resp.__set_status(thrift_status);
|
||||
}
|
||||
VLOG_RPC << "GetPartialCatalogObject(): response=" << ThriftDebugStringNoThrow(resp);
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,7 @@ DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
|
||||
DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
|
||||
DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
|
||||
DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
|
||||
DECLARE_int32(catalog_partial_fetch_max_files);
|
||||
DECLARE_int64(exchg_node_buffer_size_bytes);
|
||||
DECLARE_int32(kudu_mutation_buffer_size);
|
||||
DECLARE_int32(kudu_error_buffer_size);
|
||||
@@ -380,6 +381,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
|
||||
FLAGS_catalog_max_parallel_partial_fetch_rpc);
|
||||
cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
|
||||
FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
|
||||
cfg.__set_catalog_partial_fetch_max_files(FLAGS_catalog_partial_fetch_max_files);
|
||||
cfg.__set_exchg_node_buffer_size_bytes(
|
||||
FLAGS_exchg_node_buffer_size_bytes);
|
||||
cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
|
||||
|
||||
@@ -323,4 +323,6 @@ struct TBackendGflags {
|
||||
145: required bool catalogd_deployed
|
||||
|
||||
146: required string catalog_config_dir
|
||||
|
||||
147: required i32 catalog_partial_fetch_max_files
|
||||
}
|
||||
|
||||
@@ -595,6 +595,12 @@ public class HdfsPartition extends CatalogObjectImpl
|
||||
encodedDeleteFileDescriptors_.size();
|
||||
}
|
||||
|
||||
public static int getNumFds(TPartialPartitionInfo partInfo) {
|
||||
return partInfo.getFile_descriptorsSize() +
|
||||
partInfo.getInsert_file_descriptorsSize() +
|
||||
partInfo.getDelete_file_descriptorsSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the requested partitions info and the number of files filtered out based on
|
||||
* the ACID writeIdList.
|
||||
|
||||
@@ -73,6 +73,7 @@ import org.apache.impala.thrift.TAccessLevel;
|
||||
import org.apache.impala.thrift.TCatalogObject;
|
||||
import org.apache.impala.thrift.TCatalogObjectType;
|
||||
import org.apache.impala.thrift.TColumn;
|
||||
import org.apache.impala.thrift.TErrorCode;
|
||||
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
|
||||
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
|
||||
import org.apache.impala.thrift.THdfsPartition;
|
||||
@@ -83,6 +84,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
|
||||
import org.apache.impala.thrift.TResultSet;
|
||||
import org.apache.impala.thrift.TResultSetMetadata;
|
||||
import org.apache.impala.thrift.TSqlConstraints;
|
||||
import org.apache.impala.thrift.TStatus;
|
||||
import org.apache.impala.thrift.TTable;
|
||||
import org.apache.impala.thrift.TTableDescriptor;
|
||||
import org.apache.impala.thrift.TTableType;
|
||||
@@ -2288,6 +2290,7 @@ public class HdfsTable extends Table implements FeFsTable {
|
||||
Counter misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC);
|
||||
Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC);
|
||||
int numFilesFiltered = 0;
|
||||
int numFilesCollected = 0;
|
||||
if (partIds != null) {
|
||||
resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size());
|
||||
for (long partId : partIds) {
|
||||
@@ -2302,6 +2305,9 @@ public class HdfsTable extends Table implements FeFsTable {
|
||||
part.getPartialPartitionInfo(req, reqWriteIdList);
|
||||
if (partInfoStatus.second != null) {
|
||||
hits.inc();
|
||||
int numFds = HdfsPartition.getNumFds(partInfoStatus.first);
|
||||
if (hitNumFilesLimit(resp, partIds, part, numFds, numFilesCollected)) break;
|
||||
numFilesCollected += numFds;
|
||||
numFilesFiltered += partInfoStatus.second;
|
||||
} else {
|
||||
misses.inc();
|
||||
@@ -2309,6 +2315,7 @@ public class HdfsTable extends Table implements FeFsTable {
|
||||
}
|
||||
resp.table_info.partitions.add(partInfoStatus.first);
|
||||
}
|
||||
if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) return resp;
|
||||
}
|
||||
// In most of the cases, the prefix map only contains one item for the table location.
|
||||
// Here we always send it since it's small.
|
||||
@@ -2338,6 +2345,33 @@ public class HdfsTable extends Table implements FeFsTable {
|
||||
return resp;
|
||||
}
|
||||
|
||||
private boolean hitNumFilesLimit(TGetPartialCatalogObjectResponse resp,
|
||||
Collection<Long> partIds, HdfsPartition part, int numFds, int numFilesCollected) {
|
||||
if (numFilesCollected + numFds >
|
||||
BackendConfig.INSTANCE.getCatalogPartialFetchMaxFiles()) {
|
||||
if (numFilesCollected == 0) {
|
||||
// Even collecting the first partition will exceed the limit which means no files
|
||||
// can be returned. Return an unrecoverable error to the coordinator.
|
||||
String err = String.format("Too many files to collect in table %s%s: %d. " +
|
||||
"Current limit is %d configured by startup flag " +
|
||||
"'catalog_partial_fetch_max_files'. Consider compacting files of the table.",
|
||||
full_name_, isPartitioned() ? " partition " + part.getPartitionName() : "",
|
||||
numFds, BackendConfig.INSTANCE.getCatalogPartialFetchMaxFiles());
|
||||
LOG.error(err);
|
||||
resp.setStatus(new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(err)));
|
||||
} else {
|
||||
LOG.warn("Returning {} files from {}/{} requested partitions for table {}. " +
|
||||
"Coordinator will fetch the remaining partitions in another request " +
|
||||
"but this impacts metadata performance. Consider compacting files to " +
|
||||
"improve it.",
|
||||
numFilesCollected, resp.table_info.partitions.size(), partIds.size(),
|
||||
full_name_);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private double getFileMetadataCacheHitRate() {
|
||||
long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount();
|
||||
long misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount();
|
||||
|
||||
@@ -466,9 +466,8 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
}
|
||||
resp = new TGetPartialCatalogObjectResponse();
|
||||
new TDeserializer().deserialize(resp, ret);
|
||||
if (resp.status.status_code != TErrorCode.OK) {
|
||||
// TODO(todd) do reasonable error handling
|
||||
throw new TException(resp.toString());
|
||||
if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) {
|
||||
throw new TException(String.join("\n", resp.status.error_msgs));
|
||||
}
|
||||
|
||||
// If we get a "not found" response, then we assume that this was a case of an
|
||||
@@ -746,7 +745,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
return res.values();
|
||||
}
|
||||
|
||||
private TGetPartialCatalogObjectRequest newReqForTable(String dbName,
|
||||
private static TGetPartialCatalogObjectRequest newReqForTable(String dbName,
|
||||
String tableName) {
|
||||
TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
|
||||
req.object_desc = new TCatalogObject();
|
||||
@@ -756,7 +755,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
return req;
|
||||
}
|
||||
|
||||
private TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) {
|
||||
private static TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) {
|
||||
Preconditions.checkArgument(table instanceof TableMetaRefImpl,
|
||||
"table ref %s was not created by CatalogdMetaProvider", table);
|
||||
TGetPartialCatalogObjectRequest req = newReqForTable(
|
||||
@@ -766,6 +765,20 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
return req;
|
||||
}
|
||||
|
||||
private static TGetPartialCatalogObjectRequest newReqForPartitions(
|
||||
TableMetaRefImpl table, List<Long> partIds) {
|
||||
TGetPartialCatalogObjectRequest req = newReqForTable(table);
|
||||
req.table_info_selector.partition_ids = partIds;
|
||||
req.table_info_selector.want_partition_metadata = true;
|
||||
req.table_info_selector.want_partition_files = true;
|
||||
if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
|
||||
req.table_info_selector.valid_write_ids = table.validWriteIds_;
|
||||
}
|
||||
// TODO(IMPALA-7535): fetch incremental stats on-demand
|
||||
req.table_info_selector.want_partition_stats = true;
|
||||
return req;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String tblName) {
|
||||
TableCacheKey cacheKey = new TableCacheKey(dbName.toLowerCase(),
|
||||
@@ -967,7 +980,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
List<String> partitionColumnNames,
|
||||
ListMap<TNetworkAddress> hostIndex,
|
||||
List<PartitionRef> partitionRefs)
|
||||
throws MetaException, TException {
|
||||
throws CatalogException, TException {
|
||||
Preconditions.checkArgument(table instanceof TableMetaRefImpl);
|
||||
TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
@@ -1071,31 +1084,39 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
*/
|
||||
private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCatalogd(
|
||||
TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
|
||||
List<PartitionRef> partRefs) throws TException {
|
||||
List<PartitionRef> partRefs) throws CatalogException, TException {
|
||||
List<Long> ids = Lists.newArrayListWithCapacity(partRefs.size());
|
||||
for (PartitionRef partRef: partRefs) {
|
||||
ids.add(((PartitionRefImpl)partRef).getId());
|
||||
}
|
||||
|
||||
TGetPartialCatalogObjectRequest req = newReqForTable(table);
|
||||
req.table_info_selector.partition_ids = ids;
|
||||
req.table_info_selector.want_partition_metadata = true;
|
||||
req.table_info_selector.want_partition_files = true;
|
||||
if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
|
||||
req.table_info_selector.valid_write_ids = table.validWriteIds_;
|
||||
}
|
||||
// TODO(todd): fetch incremental stats on-demand for compute-incremental-stats.
|
||||
req.table_info_selector.want_partition_stats = true;
|
||||
TGetPartialCatalogObjectRequest req = newReqForPartitions(table, ids);
|
||||
TGetPartialCatalogObjectResponse resp = sendRequest(req);
|
||||
checkResponse(resp.table_info != null && resp.table_info.partitions != null,
|
||||
req, "missing partition list result");
|
||||
checkResponse(resp.table_info.network_addresses != null,
|
||||
req, "missing network addresses");
|
||||
checkResponse(resp.table_info.partitions.size() == ids.size(),
|
||||
req, "returned %d partitions instead of expected %d",
|
||||
resp.table_info.partitions.size(), ids.size());
|
||||
addTableMetadatStorageLoadTimeToProfile(
|
||||
resp.table_info.storage_metadata_load_time_ns);
|
||||
boolean logProgress = false;
|
||||
while (resp.table_info.partitions.size() < ids.size()) {
|
||||
logProgress = true;
|
||||
int numFetchedParts = resp.table_info.partitions.size();
|
||||
LOG.info("Fetched {}/{} partitions for {}. Sending new requests.",
|
||||
numFetchedParts, ids.size(), table);
|
||||
List<Long> remainingIds = Lists.newArrayListWithCapacity(
|
||||
ids.size() - numFetchedParts);
|
||||
for (int i = numFetchedParts; i < ids.size(); i++) {
|
||||
remainingIds.add(ids.get(i));
|
||||
}
|
||||
|
||||
TGetPartialCatalogObjectRequest nextReq = newReqForPartitions(table, remainingIds);
|
||||
TGetPartialCatalogObjectResponse nextResp = sendRequest(nextReq);
|
||||
resp.table_info.partitions.addAll(nextResp.table_info.partitions);
|
||||
}
|
||||
if (logProgress) {
|
||||
LOG.info("Fetched {} partitions for {}", ids.size(), table);
|
||||
}
|
||||
Map<PartitionRef, PartitionMetadata> ret = new HashMap<>();
|
||||
for (int i = 0; i < ids.size(); i++) {
|
||||
PartitionRef partRef = partRefs.get(i);
|
||||
@@ -1142,7 +1163,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
|
||||
PartitionMetadata oldVal = ret.put(partRef, metaImpl);
|
||||
if (oldVal != null) {
|
||||
throw new RuntimeException("catalogd returned partition " + part.id +
|
||||
throw new CatalogException("catalogd returned partition " + part.id +
|
||||
" multiple times");
|
||||
}
|
||||
}
|
||||
@@ -1254,7 +1275,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
TGetPartialCatalogObjectRequest req, String msg, Object... args) throws TException {
|
||||
if (condition) return;
|
||||
throw new TException(String.format("Invalid response from catalogd for request " +
|
||||
req.toString() + ": " + msg, args));
|
||||
StringUtils.abbreviate(req.toString(), 1000) + ": " + msg, args));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -139,6 +139,10 @@ public class BackendConfig {
|
||||
return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
|
||||
}
|
||||
|
||||
public int getCatalogPartialFetchMaxFiles() {
|
||||
return backendCfg_.catalog_partial_fetch_max_files;
|
||||
}
|
||||
|
||||
public long getHMSPollingIntervalInSeconds() {
|
||||
return backendCfg_.hms_event_polling_interval_s;
|
||||
}
|
||||
|
||||
@@ -682,3 +682,56 @@ class TestReusePartitionMetadata(CustomClusterTestSuite):
|
||||
match = re.search(r"CatalogFetch.Partitions.Misses: (\d+)", ret.runtime_profile)
|
||||
assert len(match.groups()) == 1
|
||||
assert match.group(1) == str(partition_misses)
|
||||
|
||||
|
||||
class TestAllowIncompleteData(CustomClusterTestSuite):
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal --catalog_partial_fetch_max_files=1000")
|
||||
def test_incomplete_partition_list(self):
|
||||
"""Test that coordinator can fetch the missing partitions when catalogd decides to
|
||||
truncate the partition list in the response"""
|
||||
res = self.execute_query_expect_success(
|
||||
self.client, "show files in tpcds.store_sales")
|
||||
assert len(res.data) == 1824
|
||||
|
||||
self.assert_catalogd_log_contains(
|
||||
"WARNING", "Returning 1000 files from 1000/1824 requested partitions for table "
|
||||
"tpcds.store_sales. Coordinator will fetch the remaining partitions in another "
|
||||
"request but this impacts metadata performance. Consider compacting files to "
|
||||
"improve it.")
|
||||
self.assert_impalad_log_contains(
|
||||
"INFO", r"Fetched 1000/1824 partitions for TableMetaRef tpcds.store_sales@\d+. "
|
||||
"Sending new requests.")
|
||||
self.assert_impalad_log_contains(
|
||||
"INFO", r"Fetched 1824 partitions for TableMetaRef tpcds.store_sales@\d+")
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal --catalog_partial_fetch_max_files=1")
|
||||
def test_too_many_files(self, unique_database):
|
||||
"""Test the error reporting the limit is too small"""
|
||||
exception = self.execute_query_expect_failure(
|
||||
self.client, "show files in tpch_parquet.lineitem")
|
||||
err = ("Too many files to collect in table tpch_parquet.lineitem: 3. Current limit "
|
||||
"is 1 configured by startup flag 'catalog_partial_fetch_max_files'. Consider "
|
||||
"compacting files of the table.")
|
||||
assert err in str(exception)
|
||||
self.assert_catalogd_log_contains("ERROR", err)
|
||||
|
||||
# Create a partitioned table with multiple files
|
||||
tbl = unique_database + ".foo"
|
||||
self.execute_query("create table {0} partitioned by (year, month) as "
|
||||
"select * from functional.alltypestiny".format(tbl))
|
||||
self.execute_query("insert into {0} partition(year, month) select * from "
|
||||
"functional.alltypestiny".format(tbl))
|
||||
exception = self.execute_query_expect_failure(
|
||||
self.client, "show files in {0}.foo".format(unique_database))
|
||||
err = ("Too many files to collect in table {0} partition year=2009/month=1: 2. "
|
||||
"Current limit is 1 configured by startup flag "
|
||||
"'catalog_partial_fetch_max_files'. Consider compacting files of the table."
|
||||
).format(tbl)
|
||||
assert err in str(exception)
|
||||
self.assert_catalogd_log_contains("ERROR", err)
|
||||
|
||||
Reference in New Issue
Block a user