mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13829: Postpone catalog deleteLog GC for waitForHmsEvent requests
When a db/table is removed in the catalog cache, catalogd assigns it a new catalog version and put it into the deleteLog. This is used for the catalog update thread to collect deletion updates. Once the catalog update thread collects a range of updates, it triggers GC in the deleteLog to clear items older than the last sent catalog version. The deletions will be broadcasted by statestore to all the coordinators eventually. However, waitForHmsEvent requests is also a consumer of the deleteLog and could be impacted by these GCs. waitForHmsEvent is a catalogd RPC used by coordinators when a query wants to wait until the related metadata is in synced with HMS. The response of waitForHmsEvent returns the latest metadata including the deletions on related dbs/tables. If the related deletions in deleteLog is GCed just before the waitForHmsEvent request collects the results, they will be missing in the response. Coordinator might keep using stale metadata of non-existing dbs/tables. This is a quick fix for the issue by postponing deleteLog GC in a configurable number of topic updates, similar to what we have done on the TopicUpdateLog. A thorough fix might need to carefully choose the version to GC or let impalad waits for the deletions from statestore to arrive. A new flag, catalog_delete_log_ttl, is added for this. The deleteLog items can survive for catalog_delete_log_ttl catalog updates. The default is 60 so a deletion can survive for at least 120s. It should be safe enough, i.e. the GCed deletions must have arrived in the impalad side after 60 rounds of catalog updates, otherwise that's an abnormal impalad and already has other more severe issues, e.g. lots of stale tables due to metadata out of sync with catalogd. Note that postponing deleteLog GCs might increase the memory consumption. But since most of its memory is used by db/table/partition names, the memory usage might still be trivial comparing to other metadata like file descriptors and incremental stats in lived catalog objects. This patch also removed some unused imports. Tests: - Added e2e test with a debug action to reproduce the issue. Ran the test 100 times. Without the fix, it consistently fails when runs for 2-3 times. Change-Id: I2441440bca2b928205dd514047ba742a5e8bf05e Reviewed-on: http://gerrit.cloudera.org:8080/22816 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
0e3ae5c339
commit
56b465d91f
@@ -174,6 +174,19 @@ DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the entrie
|
||||
"of the catalog topic update log are garbage collected. An entry may survive "
|
||||
"for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.");
|
||||
|
||||
DEFINE_int32(catalog_delete_log_ttl, 60, "Number of catalog topic updates that an entry "
|
||||
"in the catalog delete log can survive for.");
|
||||
|
||||
static bool ValidatePositiveInt(const char* flagname, int32_t value) {
|
||||
if (value > 0) return true;
|
||||
LOG(ERROR) << Substitute(
|
||||
"$0 must be a positive integer, value $1 is invalid", flagname, value);
|
||||
return false;
|
||||
}
|
||||
|
||||
DEFINE_validator(topic_update_log_gc_frequency, &ValidatePositiveInt);
|
||||
DEFINE_validator(catalog_delete_log_ttl, &ValidatePositiveInt);
|
||||
|
||||
DEFINE_bool(invalidate_metadata_on_event_processing_failure, true,
|
||||
"This configuration is used to invalidate metadata for table(s) upon event process "
|
||||
"failure other than HMS connection issues. The default value is true. When enabled, "
|
||||
|
||||
@@ -136,6 +136,7 @@ DECLARE_int32(dbcp_data_source_idle_timeout_s);
|
||||
DECLARE_bool(enable_catalogd_ha);
|
||||
DECLARE_string(injected_group_members_debug_only);
|
||||
DECLARE_int32(hms_event_sync_sleep_interval_ms);
|
||||
DECLARE_int32(catalog_delete_log_ttl);
|
||||
|
||||
// HS2 SAML2.0 configuration
|
||||
// Defined here because TAG_FLAG caused issues in global-flags.cc
|
||||
@@ -517,6 +518,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
|
||||
#endif
|
||||
cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
|
||||
cfg.__set_hms_event_sync_sleep_interval_ms(FLAGS_hms_event_sync_sleep_interval_ms);
|
||||
cfg.__set_catalog_delete_log_ttl(FLAGS_catalog_delete_log_ttl);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -325,4 +325,6 @@ struct TBackendGflags {
|
||||
146: required string catalog_config_dir
|
||||
|
||||
147: required i32 catalog_partial_fetch_max_files
|
||||
|
||||
148: required i32 catalog_delete_log_ttl
|
||||
}
|
||||
|
||||
@@ -794,6 +794,9 @@ struct TWaitForHmsEventRequest {
|
||||
|
||||
// Whether to check tables used by views.
|
||||
7: optional bool should_expand_views = false
|
||||
|
||||
// Passes the debug actions to catalogd if the query option is set.
|
||||
8: optional string debug_action
|
||||
}
|
||||
|
||||
struct TWaitForHmsEventResponse {
|
||||
|
||||
@@ -17,17 +17,22 @@
|
||||
|
||||
package org.apache.impala.catalog;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.impala.service.BackendConfig;
|
||||
import org.apache.impala.thrift.TCatalogObject;
|
||||
import org.apache.impala.thrift.TCatalogObjectType;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Represents a log of deleted catalog objects.
|
||||
@@ -53,11 +58,18 @@ import com.google.common.collect.ImmutableList;
|
||||
*
|
||||
* b) Building catalog topic updates in the catalogd
|
||||
* The catalogd uses this log to identify deleted catalog objects that have been deleted
|
||||
* since the last catalog topic update. Once the catalog topic update is constructed,
|
||||
* the old entries in the log are garbage collected to prevent the log from growing
|
||||
* indefinitely.
|
||||
* since the last deleteLogTtl_ catalog topic updates. Once the catalog topic update is
|
||||
* constructed, the aged out entries in the log are garbage collected to prevent the log
|
||||
* from growing indefinitely.
|
||||
*/
|
||||
public class CatalogDeltaLog {
|
||||
private final static Logger LOG = LoggerFactory.getLogger(CatalogDeltaLog.class);
|
||||
// Times of catalog updates an entry can survive for.
|
||||
// BackendConfig.INSTANCE could be null in some FE tests that don't care what this is.
|
||||
private final int deleteLogTtl_ = BackendConfig.INSTANCE == null ?
|
||||
60 : BackendConfig.INSTANCE.getBackendCfg().catalog_delete_log_ttl;
|
||||
private final Queue<Long> catalogUpdateVersions_ = new ArrayDeque<>(deleteLogTtl_);
|
||||
|
||||
// Map of the catalog version an object was removed from the catalog
|
||||
// to the catalog object, ordered by catalog version.
|
||||
private SortedMap<Long, TCatalogObject> removedCatalogObjects_ = new TreeMap<>();
|
||||
@@ -113,19 +125,34 @@ public class CatalogDeltaLog {
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the current catalog version, removes all items with catalogVersion <
|
||||
* currectCatalogVersion. Such objects do not need to be tracked in the delta
|
||||
* log anymore because they are consistent with the state store's view of the
|
||||
* catalog.
|
||||
* Garbage-collects delete log entries older than the last deleteLogTtl_ topic updates,
|
||||
* in case some deleteLog readers still need the item.
|
||||
*/
|
||||
public synchronized void garbageCollect(long currentCatalogVersion) {
|
||||
public synchronized void garbageCollect(long lastTopicUpdateVersion) {
|
||||
if (!catalogUpdateVersions_.isEmpty()
|
||||
&& catalogUpdateVersions_.size() >= deleteLogTtl_) {
|
||||
garbageCollectInternal(catalogUpdateVersions_.poll());
|
||||
}
|
||||
catalogUpdateVersions_.offer(lastTopicUpdateVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a catalog version, removes all items with catalogVersion < it.
|
||||
*/
|
||||
public void garbageCollectInternal(long catalogVersion) {
|
||||
// Nothing will be garbage collected so avoid creating a new object.
|
||||
if (!removedCatalogObjects_.isEmpty() &&
|
||||
removedCatalogObjects_.firstKey() < currentCatalogVersion) {
|
||||
removedCatalogObjects_.firstKey() < catalogVersion) {
|
||||
int originalSize = removedCatalogObjects_.size();
|
||||
removedCatalogObjects_ = new TreeMap<>(
|
||||
removedCatalogObjects_.tailMap(currentCatalogVersion));
|
||||
removedCatalogObjects_.tailMap(catalogVersion));
|
||||
latestRemovedVersions_.entrySet().removeIf(
|
||||
e -> e.getValue() < currentCatalogVersion);
|
||||
e -> e.getValue() < catalogVersion);
|
||||
int numCleared = originalSize - removedCatalogObjects_.size();
|
||||
if (numCleared > 0) {
|
||||
LOG.info("Cleared {} removed items older than version {}", numCleared,
|
||||
catalogVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4364,6 +4364,8 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
res.setResult(new TCatalogUpdateResult());
|
||||
res.getResult().setCatalog_service_id(JniCatalog.getServiceId());
|
||||
|
||||
DebugUtils.executeDebugAction(req.debug_action,
|
||||
DebugUtils.COLLECT_CATALOG_RESULTS_DELAY);
|
||||
// Collect catalog objects required by the query
|
||||
boolean wantMinimalResponse = req.header.want_minimal_response;
|
||||
if (req.isSetObject_descs()) {
|
||||
|
||||
@@ -53,7 +53,6 @@ import org.apache.impala.thrift.TSymbolLookupResult;
|
||||
import org.apache.impala.thrift.TTable;
|
||||
import org.apache.impala.thrift.TUniqueId;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventRequest;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventResponse;
|
||||
import org.apache.impala.util.NativeLibUtil;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
@@ -238,7 +238,6 @@ import org.apache.impala.thrift.TUnit;
|
||||
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
|
||||
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventRequest;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventResponse;
|
||||
import org.apache.impala.util.AcidUtils;
|
||||
import org.apache.impala.util.DebugUtils;
|
||||
import org.apache.impala.util.EventSequence;
|
||||
@@ -2321,6 +2320,9 @@ public class Frontend {
|
||||
req.setHeader(createCatalogServiceRequestHeader(
|
||||
TSessionStateUtil.getEffectiveUser(queryCtx.session), queryCtx));
|
||||
collectRequiredObjects(req, stmt, queryCtx.session.database);
|
||||
if (queryOptions.isSetDebug_action()) {
|
||||
req.setDebug_action(queryOptions.debug_action);
|
||||
}
|
||||
// TODO: share 'timeline' to BE so we know when the updates are applied
|
||||
TStatus status = FeSupport.WaitForHmsEvents(req, queryOptions);
|
||||
if (status.status_code != TErrorCode.OK) {
|
||||
|
||||
@@ -64,7 +64,6 @@ import org.apache.impala.thrift.TCatalogObject;
|
||||
import org.apache.impala.thrift.TDatabase;
|
||||
import org.apache.impala.thrift.TDdlExecRequest;
|
||||
import org.apache.impala.thrift.TErrorCode;
|
||||
import org.apache.impala.thrift.TEventProcessorCmdParams;
|
||||
import org.apache.impala.thrift.TGetCatalogDeltaRequest;
|
||||
import org.apache.impala.thrift.TGetCatalogDeltaResponse;
|
||||
import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
|
||||
@@ -92,13 +91,10 @@ import org.apache.impala.thrift.TUpdateCatalogRequest;
|
||||
import org.apache.impala.thrift.TUpdateTableUsageRequest;
|
||||
import org.apache.impala.thrift.TGetAllHadoopConfigsResponse;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventRequest;
|
||||
import org.apache.impala.thrift.TWaitForHmsEventResponse;
|
||||
import org.apache.impala.util.AuthorizationUtil;
|
||||
import org.apache.impala.util.CatalogOpUtil;
|
||||
import org.apache.impala.util.EventSequence;
|
||||
import org.apache.impala.util.GlogAppender;
|
||||
import org.apache.impala.util.MetaStoreUtil;
|
||||
import org.apache.impala.util.NoOpEventSequence;
|
||||
import org.apache.impala.util.PatternMatcher;
|
||||
import org.apache.impala.util.TUniqueIdUtil;
|
||||
import org.apache.impala.util.ThreadNameAnnotator;
|
||||
@@ -107,7 +103,6 @@ import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.protocol.TSimpleJSONProtocol;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
@@ -111,6 +111,11 @@ public class DebugUtils {
|
||||
// the write lock.
|
||||
public static final String RESET_METADATA_LOOP_LOCKED = "reset_metadata_loop_locked";
|
||||
|
||||
// debug action label for introducing delay before collecting catalog results for
|
||||
// TWaitForHmsEventResponse.
|
||||
public static final String COLLECT_CATALOG_RESULTS_DELAY =
|
||||
"collect_catalog_results_delay";
|
||||
|
||||
/**
|
||||
* Returns true if the label of action is set in the debugActions
|
||||
*/
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
# under the License.
|
||||
from __future__ import absolute_import, division, print_function
|
||||
from subprocess import check_call
|
||||
import logging
|
||||
import pytest
|
||||
import re
|
||||
import time
|
||||
@@ -31,7 +32,7 @@ from tests.metadata.test_event_processing_base import TestEventProcessingBase
|
||||
from tests.util.event_processor_utils import EventProcessorUtils
|
||||
|
||||
PROCESSING_TIMEOUT_S = 10
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@SkipIfFS.hive
|
||||
@SkipIfCatalogV2.hms_event_polling_disabled()
|
||||
@@ -612,3 +613,64 @@ class TestEventSyncWaiting(ImpalaTestSuite):
|
||||
finally:
|
||||
self.run_stmt_in_hive(
|
||||
"drop database if exists {0} cascade".format(db))
|
||||
|
||||
def test_hms_event_sync_on_deletion(self, vector, unique_name):
|
||||
"""Regression test for IMPALA-13829: TWaitForHmsEventResponse not able to collect
|
||||
removed objects due to their items in deleteLog being GCed."""
|
||||
client = self.create_impala_client_from_vector(vector)
|
||||
# Set a sleep time so catalogd has time to GC the deleteLog.
|
||||
client.execute("set debug_action='collect_catalog_results_delay:SLEEP@1000'")
|
||||
db = unique_name + "_db"
|
||||
tbl = db + ".foo"
|
||||
self.execute_query("drop database if exists {} cascade".format(db))
|
||||
self.execute_query("drop database if exists {}_2 cascade".format(db))
|
||||
self.execute_query("create database {}".format(db))
|
||||
self.execute_query("create database {}_2".format(db))
|
||||
# Create HMS Thrift clients to drop db/tables in the fastest way
|
||||
hive_clients = []
|
||||
hive_transports = []
|
||||
for _ in range(2):
|
||||
c, t = ImpalaTestSuite.create_hive_client()
|
||||
hive_clients.append(c)
|
||||
hive_transports.append(t)
|
||||
|
||||
try:
|
||||
# Drop 2 dbs concurrently. So their DROP_DATABASE events are processed together (in
|
||||
# the same event batch). We need more than one db to be dropped so one of them in
|
||||
# catalogd's deleteLog can be GCed since its version < latest catalog version.
|
||||
# Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829,
|
||||
# but it can be used to trigger the issue before this fix.
|
||||
def drop_db_in_hive(i, db_name):
|
||||
hive_clients[i].drop_database(db_name, deleteData=True, cascade=True)
|
||||
LOG.info("Dropped database {} in Hive".format(db_name))
|
||||
ts = [threading.Thread(target=drop_db_in_hive, args=params)
|
||||
for params in [[0, db], [1, db + "_2"]]]
|
||||
for t in ts:
|
||||
t.start()
|
||||
for t in ts:
|
||||
t.join()
|
||||
client.execute("create database " + db)
|
||||
|
||||
self.execute_query("create table {}(i int)".format(tbl))
|
||||
self.execute_query("create table {}_2(i int)".format(tbl))
|
||||
|
||||
# Drop 2 tables concurrently. So their DROP_TABLE events are processed together (in
|
||||
# the same event batch). We need more than one table to be dropped so one of them in
|
||||
# catalogd's deleteLog can be GCed since its version < latest catalog version.
|
||||
# Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829,
|
||||
# but it can be used to trigger the issue before this fix.
|
||||
def drop_table_in_hive(i, tbl_name):
|
||||
hive_clients[i].drop_table(db, tbl_name, deleteData=True)
|
||||
LOG.info("Dropped table {}.{} in Hive".format(db, tbl_name))
|
||||
ts = [threading.Thread(target=drop_table_in_hive, args=params)
|
||||
for params in [[0, "foo"], [1, "foo_2"]]]
|
||||
for t in ts:
|
||||
t.start()
|
||||
for t in ts:
|
||||
t.join()
|
||||
client.execute("create table {}(i int)".format(tbl))
|
||||
finally:
|
||||
for t in hive_transports:
|
||||
t.close()
|
||||
self.execute_query("drop database if exists {} cascade".format(db))
|
||||
self.execute_query("drop database if exists {}_2 cascade".format(db))
|
||||
|
||||
Reference in New Issue
Block a user