IMPALA-7448: Invalidate recently unused tables from catalogd

This patch implements an automatic invalidation mechanism in catalogd.
There are two invalidation strategies:
1. Periodically the HDFS tables that are not used in a configured
   period "invalidate_tables_timeout_s" is invalidated from catalogd.
2. If the old GC generation is almost full, a certain percentage of LRU
   tables are invalidated. This can be enabled by backend flag
   "invalidate_tables_on_memory_pressure".

The table usage is reported by impalad to catalogd when the tables are
used during planning.
Tests on time-based invalidation are added. It is manually verified that
the GC callback is called if strings are randomly stuffed into catalogd.

Change-Id: Ib549717abefcffb14d9a3814ee8cf0de8bd49e89
Reviewed-on: http://gerrit.cloudera.org:8080/11224
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Tianyi Wang <twang@cloudera.com>
This commit is contained in:
Tianyi Wang
2018-08-13 14:15:43 -07:00
committed by Tianyi Wang
parent eb3108c461
commit 49095c7e8b
27 changed files with 829 additions and 25 deletions

View File

@@ -88,7 +88,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
}
// Executes a TDdlExecRequest and returns details on the result of the operation.
virtual void ExecDdl(TDdlExecResponse& resp, const TDdlExecRequest& req) {
void ExecDdl(TDdlExecResponse& resp, const TDdlExecRequest& req) override {
VLOG_RPC << "ExecDdl(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->ExecDdl(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -99,8 +99,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
}
// Executes a TResetMetadataRequest and returns details on the result of the operation.
virtual void ResetMetadata(TResetMetadataResponse& resp,
const TResetMetadataRequest& req) {
void ResetMetadata(TResetMetadataResponse& resp, const TResetMetadataRequest& req)
override {
VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -112,8 +112,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
// Executes a TUpdateCatalogRequest and returns details on the result of the
// operation.
virtual void UpdateCatalog(TUpdateCatalogResponse& resp,
const TUpdateCatalogRequest& req) {
void UpdateCatalog(TUpdateCatalogResponse& resp, const TUpdateCatalogRequest& req)
override {
VLOG_RPC << "UpdateCatalog(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -125,8 +125,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
// Gets functions in the Catalog based on the parameters of the
// TGetFunctionsRequest.
virtual void GetFunctions(TGetFunctionsResponse& resp,
const TGetFunctionsRequest& req) {
void GetFunctions(TGetFunctionsResponse& resp, const TGetFunctionsRequest& req)
override {
VLOG_RPC << "GetFunctions(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetFunctions(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -137,8 +137,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
}
// Gets a TCatalogObject based on the parameters of the TGetCatalogObjectRequest.
virtual void GetCatalogObject(TGetCatalogObjectResponse& resp,
const TGetCatalogObjectRequest& req) {
void GetCatalogObject(TGetCatalogObjectResponse& resp,
const TGetCatalogObjectRequest& req) override {
VLOG_RPC << "GetCatalogObject(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetCatalogObject(req.object_desc,
&resp.catalog_object);
@@ -146,8 +146,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "GetCatalogObject(): response=" << ThriftDebugString(resp);
}
virtual void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& resp,
const TGetPartialCatalogObjectRequest& req) {
void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& resp,
const TGetPartialCatalogObjectRequest& req) override {
// TODO(todd): capture detailed metrics on the types of inbound requests, lock
// wait times, etc.
// TODO(todd): add some kind of limit on the number of concurrent requests here
@@ -164,8 +164,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "GetPartialCatalogObject(): response=" << ThriftDebugString(resp);
}
virtual void GetPartitionStats(
TGetPartitionStatsResponse& resp, const TGetPartitionStatsRequest& req) {
void GetPartitionStats(TGetPartitionStatsResponse& resp,
const TGetPartitionStatsRequest& req) override {
VLOG_RPC << "GetPartitionStats(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->GetPartitionStats(req, &resp);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -178,8 +178,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
// Prioritizes the loading of metadata for one or more catalog objects. Currently only
// used for loading tables/views because they are the only type of object that is loaded
// lazily.
virtual void PrioritizeLoad(TPrioritizeLoadResponse& resp,
const TPrioritizeLoadRequest& req) {
void PrioritizeLoad(TPrioritizeLoadResponse& resp, const TPrioritizeLoadRequest& req)
override {
VLOG_RPC << "PrioritizeLoad(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->PrioritizeLoad(req);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -189,8 +189,8 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "PrioritizeLoad(): response=" << ThriftDebugString(resp);
}
virtual void SentryAdminCheck(TSentryAdminCheckResponse& resp,
const TSentryAdminCheckRequest& req) {
void SentryAdminCheck(TSentryAdminCheckResponse& resp,
const TSentryAdminCheckRequest& req) override {
VLOG_RPC << "SentryAdminCheck(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->SentryAdminCheck(req);
if (!status.ok()) LOG(ERROR) << status.GetDetail();
@@ -200,6 +200,13 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
VLOG_RPC << "SentryAdminCheck(): response=" << ThriftDebugString(resp);
}
void UpdateTableUsage(TUpdateTableUsageResponse& resp,
const TUpdateTableUsageRequest& req) override {
VLOG_RPC << "UpdateTableUsage(): request=" << ThriftDebugString(req);
Status status = catalog_server_->catalog()->UpdateTableUsage(req);
if (!status.ok()) LOG(WARNING) << status.GetDetail();
}
private:
CatalogServer* catalog_server_;
};

View File

@@ -111,6 +111,13 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
recv_SentryAdminCheck(_return);
}
void UpdateTableUsage(TUpdateTableUsageResponse& _return,
const TUpdateTableUsageRequest& req, bool* send_done) {
DCHECK(!*send_done);
send_UpdateTableUsage(req);
*send_done = true;
recv_UpdateTableUsage(_return);
}
#pragma clang diagnostic pop
};

View File

@@ -68,7 +68,9 @@ Catalog::Catalog() {
{"getCatalogUsage", "()[B", &get_catalog_usage_id_},
{"getCatalogVersion", "()J", &get_catalog_version_id_},
{"prioritizeLoad", "([B)V", &prioritize_load_id_},
{"getPartitionStats", "([B)[B", &get_partition_stats_id_}};
{"getPartitionStats", "([B)[B", &get_partition_stats_id_},
{"updateTableUsage", "([B)V", &update_table_usage_id_},
};
JNIEnv* jni_env = getJNIEnv();
// Create an instance of the java class JniCatalog
@@ -173,3 +175,7 @@ Status Catalog::GetPartitionStats(
Status Catalog::SentryAdminCheck(const TSentryAdminCheckRequest& req) {
return JniUtil::CallJniMethod(catalog_, sentry_admin_check_id_, req);
}
Status Catalog::UpdateTableUsage(const TUpdateTableUsageRequest& req) {
return JniUtil::CallJniMethod(catalog_, update_table_usage_id_, req);
}

View File

@@ -122,6 +122,10 @@ class Catalog {
/// was an error executing the request.
Status SentryAdminCheck(const TSentryAdminCheckRequest& req);
/// Update recently used table names and their use counts in an impalad since the last
/// report.
Status UpdateTableUsage(const TUpdateTableUsageRequest& req);
private:
/// Descriptor of Java Catalog class itself, used to create a new instance.
jclass catalog_class_;
@@ -143,6 +147,7 @@ class Catalog {
jmethodID prioritize_load_id_; // JniCatalog.prioritizeLoad()
jmethodID sentry_admin_check_id_; // JniCatalog.checkUserSentryAdmin()
jmethodID catalog_ctor_;
jmethodID update_table_usage_id_;
};
}

View File

@@ -216,6 +216,29 @@ DEFINE_bool(pull_incremental_statistics, false,
"coordinators. If used, the flag must be set on both catalogd and all impalad "
"coordinators.");
DEFINE_int32(invalidate_tables_timeout_s, 0, "If a table has not been referenced in a "
"SQL statement for more than the configured amount of time, the catalog server will "
"automatically evict its cached metadata about this table. This has the same effect "
"as a user-initiated \"INVALIDATE METADATA\" statement on the table. Configuring "
"this to 0 disables time-based automatic invalidation of tables. This is independent "
"from memory-based invalidation configured by invalidate_tables_on_memory_pressure. "
"To enable this feature, a non-zero flag must be applied to both catalogd and "
"impalad.");
DEFINE_bool(invalidate_tables_on_memory_pressure, false, "Configure catalogd to "
"invalidate recently unused tables when the old GC generation is almost full. This "
"is independent from time-based invalidation configured by "
"invalidate_table_timeout_s. To enable this feature, a true flag must be applied to "
"both catalogd and impalad.");
DEFINE_double_hidden(invalidate_tables_gc_old_gen_full_threshold, 0.6, "The threshold "
"above which CatalogdTableInvalidator would consider the old generation to be almost "
"full and trigger an invalidation on recently unused tables");
DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
"The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
"old GC generation to be almost full.");
// ++========================++
// || Startup flag graveyard ||
// ++========================++

View File

@@ -335,3 +335,15 @@ Status CatalogOpExecutor::SentryAdminCheck(const TSentryAdminCheckRequest& req)
client.DoRpc(&CatalogServiceClientWrapper::SentryAdminCheck, req, &resp));
return Status(resp.status);
}
Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
TUpdateTableUsageResponse* resp) {
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
Status cnxn_status;
CatalogServiceConnection client(env_->catalogd_client_cache(), address, &cnxn_status);
RETURN_IF_ERROR(cnxn_status);
RETURN_IF_ERROR(
client.DoRpc(&CatalogServiceClientWrapper::UpdateTableUsage, req, resp));
return Status::OK();
}

View File

@@ -74,6 +74,11 @@ class CatalogOpExecutor {
Status GetPartitionStats(
const TGetPartitionStatsRequest& req, TGetPartitionStatsResponse* result);
/// Makes an RPC to the catalog server to report recently used tables and their use
/// counts in this impalad since the last report.
Status UpdateTableUsage(const TUpdateTableUsageRequest& req,
TUpdateTableUsageResponse* resp);
/// Makes an RPC to the CatalogServer to verify whether the specified user has privileges
/// to access the Sentry Policy Service. Returns OK if the user has privileges or
/// a bad status if the user does not have privileges (or if there was an error).

View File

@@ -524,6 +524,31 @@ Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
return result_bytes;
}
// Calls in to the catalog server to report recently used table names and the number of
// their usages in this impalad.
extern "C"
JNIEXPORT jbyteArray JNICALL
Java_org_apache_impala_service_FeSupport_NativeUpdateTableUsage(
JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
TUpdateTableUsageRequest request;
THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
JniUtil::internal_exc_class(), nullptr);
CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
TUpdateTableUsageResponse result;
Status status = catalog_op_executor.UpdateTableUsage(request, &result);
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
status.AddDetail("Error making an RPC call to Catalog server.");
status.ToThrift(&result.status);
}
jbyteArray result_bytes = nullptr;
THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
JniUtil::internal_exc_class(), result_bytes);
return result_bytes;
}
// Calls in to the catalog server to request partial information about a
// catalog object.
extern "C"
@@ -629,9 +654,13 @@ static JNINativeMethod native_methods[] = {
(void*)::Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject
},
{
const_cast<char*>("NativeGetPartitionStats"),
const_cast<char*>("NativeGetPartitionStats"), const_cast<char*>("([B)[B"),
(void*) ::Java_org_apache_impala_service_FeSupport_NativeGetPartitionStats
},
{
const_cast<char*>("NativeUpdateTableUsage"),
const_cast<char*>("([B)[B"),
(void*)::Java_org_apache_impala_service_FeSupport_NativeGetPartitionStats
(void*)::Java_org_apache_impala_service_FeSupport_NativeUpdateTableUsage
},
{
const_cast<char*>("NativeParseQueryOptions"),

View File

@@ -58,7 +58,10 @@ DECLARE_double(max_filter_error_rate);
DECLARE_int64(min_buffer_size);
DECLARE_bool(disable_catalog_data_ops_debug_only);
DECLARE_bool(pull_incremental_statistics);
DECLARE_int32(invalidate_tables_timeout_s);
DECLARE_bool(invalidate_tables_on_memory_pressure);
DECLARE_double(invalidate_tables_gc_old_gen_full_threshold);
DECLARE_double(invalidate_tables_fraction_on_memory_pressure);
namespace impala {
Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
@@ -107,6 +110,13 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
FLAGS_disable_catalog_data_ops_debug_only);
cfg.__set_pull_incremental_statistics(FLAGS_pull_incremental_statistics);
cfg.__set_catalog_topic_mode(FLAGS_catalog_topic_mode);
cfg.__set_invalidate_tables_timeout_s(FLAGS_invalidate_tables_timeout_s);
cfg.__set_invalidate_tables_on_memory_pressure(
FLAGS_invalidate_tables_on_memory_pressure);
cfg.__set_invalidate_tables_gc_old_gen_full_threshold(
FLAGS_invalidate_tables_gc_old_gen_full_threshold);
cfg.__set_invalidate_tables_fraction_on_memory_pressure(
FLAGS_invalidate_tables_fraction_on_memory_pressure);
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
return Status::OK();
}

View File

@@ -89,4 +89,12 @@ struct TBackendGflags {
31: required bool pull_incremental_statistics
32: required string catalog_topic_mode
33: required i32 invalidate_tables_timeout_s
34: required bool invalidate_tables_on_memory_pressure
35: required double invalidate_tables_gc_old_gen_full_threshold
36: required double invalidate_tables_fraction_on_memory_pressure
}

View File

@@ -461,6 +461,21 @@ struct TSentryAdminCheckResponse {
1: optional Status.TStatus status
}
struct TTableUsage {
1: required CatalogObjects.TTableName table_name
// count of usages since the last report
2: required i32 num_usages
}
struct TUpdateTableUsageRequest {
1: required list<TTableUsage> usages
}
struct TUpdateTableUsageResponse {
// The operation may fail if the catalogd is in a bad state or if there is a bug.
1: optional Status.TStatus status
}
// The CatalogService API
service CatalogService {
// Executes a DDL request and returns details on the result of the operation.
@@ -498,4 +513,8 @@ service CatalogService {
// Fetch partial information about some object in the catalog.
TGetPartialCatalogObjectResponse GetPartialCatalogObject(
1: TGetPartialCatalogObjectRequest req);
// Update recently used tables and their usage counts in an impalad since the last
// report.
TUpdateTableUsageResponse UpdateTableUsage(1: TUpdateTableUsageRequest req);
}

View File

@@ -145,6 +145,7 @@ public class StmtMetadataLoader {
timeline_.markEvent(
String.format("Metadata of all %d tables cached", loadedTbls_.size()));
}
fe_.getImpaladTableUsageTracker().recordTableUsage(loadedTbls_.keySet());
return new StmtTableCache(catalog, dbs_, loadedTbls_);
}
@@ -227,7 +228,7 @@ public class StmtMetadataLoader {
requestedTbls.size(), loadedTbls_.size(), numLoadRequestsSent_,
numCatalogUpdatesReceived_));
}
fe_.getImpaladTableUsageTracker().recordTableUsage(loadedTbls_.keySet());
return new StmtTableCache(catalog, dbs_, loadedTbls_);
}

View File

@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -66,9 +67,11 @@ import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TPrivilege;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TTableUsageMetrics;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.SentryProxy;
import org.apache.log4j.Logger;
@@ -83,6 +86,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Specialized Catalog that implements the CatalogService specific Catalog
* APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
@@ -217,6 +221,8 @@ public class CatalogServiceCatalog extends Catalog {
private final String localLibraryPath_;
private CatalogdTableInvalidator catalogdTableInvalidator_;
/**
* See the gflag definition in be/.../catalog-server.cc for details on these modes.
*/
@@ -260,6 +266,8 @@ public class CatalogServiceCatalog extends Catalog {
deleteLog_ = new CatalogDeltaLog();
topicMode_ = TopicMode.valueOf(
BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
BackendConfig.INSTANCE);
}
// Timeout for acquiring a table lock
@@ -633,7 +641,7 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Get a snapshot view of all the databases in the catalog.
*/
private List<Db> getAllDbs() {
List<Db> getAllDbs() {
versionLock_.readLock().lock();
try {
return ImmutableList.copyOf(dbCache_.get().values());
@@ -715,7 +723,7 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Get a snapshot view of all the tables in a database.
*/
private List<Table> getAllTables(Db db) {
List<Table> getAllTables(Db db) {
Preconditions.checkNotNull(db);
versionLock_.readLock().lock();
try {
@@ -2144,4 +2152,29 @@ public class CatalogServiceCatalog extends Catalog {
// TODO(todd) implement data sources and other global information.
return resp;
}
/**
* Set the last used time of specified tables to now.
* TODO: Make use of TTableUsage.num_usages.
*/
public void updateTableUsage(TUpdateTableUsageRequest req) {
for (TTableUsage usage : req.usages) {
Table table = null;
try {
table = getTable(usage.table_name.db_name, usage.table_name.table_name);
} catch (DatabaseNotFoundException e) {
// do nothing
}
if (table != null) table.refreshLastUsedTime();
}
}
CatalogdTableInvalidator getCatalogdTableInvalidator() {
return catalogdTableInvalidator_;
}
@VisibleForTesting
void setCatalogdTableInvalidator(CatalogdTableInvalidator cleaner) {
catalogdTableInvalidator_ = cleaner;
}
}

View File

@@ -0,0 +1,300 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License
package org.apache.impala.catalog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sun.management.GarbageCollectorMXBean;
import com.sun.management.GcInfo;
import org.apache.impala.common.Reference;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TTableName;
import org.apache.log4j.Logger;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Automatically invalidates recently unused tables. There are currently 2 rules
* implemented:
* 1. Invalidate a certain percentage of the least recently used tables after a GC with an
* almost full old generation. The fullness of the GC generation depends on the maximum
* heap size.
* 2. If invalidate_tables_timeout_s is set in the backend, unused tables older than the
* threshold are invalidated periodically.
*/
public class CatalogdTableInvalidator {
public static final Logger LOG = Logger.getLogger(CatalogdTableInvalidator.class);
/**
* Plugable time source for tests. Defined as static to avoid passing
* CatalogdTableInvalidator everywhere the clock is used.
*/
@VisibleForTesting
static Ticker TIME_SOURCE = Ticker.systemTicker();
private static long DAEMON_MAXIMUM_SLEEP_NANO = TimeUnit.MINUTES.toNanos(5);
final private long unusedTableTtlNano_;
final private boolean invalidateTableOnMemoryPressure_;
final private CatalogServiceCatalog catalog_;
/**
* A thread waking up periodically to check if eviction is needed.
*/
final private Thread daemonThread_;
/**
* The threshold above which the old gen is considered almost full.
*/
final private double oldGenFullThreshold_;
/**
* The ratio of tables to invalidate when the old gen is almost full.
*/
final private double gcInvalidationFraction_;
/**
* The number of times the daemon thread wakes up and scans the tables for invalidation.
* It's useful for tests to ensure that a scan happened.
*/
@VisibleForTesting
AtomicLong scanCount_ = new AtomicLong();
private GarbageCollectorMXBean oldGenGcBean_;
/**
* The name of the old gen memory pool.
*/
private String oldGcGenName_;
/**
* The value of oldGenGcBean_.getCollectionCount() when the last memory-based
* invalidation was executed.
*/
private long lastObservedGcCount_;
private boolean stopped_ = false;
/**
* Last time an time-based invalidation is executed in nanoseconds.
*/
private long lastInvalidationTime_;
CatalogdTableInvalidator(CatalogServiceCatalog catalog, final long unusedTableTtlSec,
boolean invalidateTableOnMemoryPressure, double oldGenFullThreshold,
double gcInvalidationFraction) {
catalog_ = catalog;
unusedTableTtlNano_ = TimeUnit.SECONDS.toNanos(unusedTableTtlSec);
oldGenFullThreshold_ = oldGenFullThreshold;
gcInvalidationFraction_ = gcInvalidationFraction;
lastInvalidationTime_ = TIME_SOURCE.read();
invalidateTableOnMemoryPressure_ =
invalidateTableOnMemoryPressure && tryInstallGcListener();
daemonThread_ = new Thread(new DaemonThread());
daemonThread_.setDaemon(true);
daemonThread_.setName("CatalogTableInvalidator timer");
daemonThread_.start();
}
public static CatalogdTableInvalidator create(CatalogServiceCatalog catalog,
BackendConfig config) {
final boolean invalidateTableOnMemoryPressure =
config.invalidateTablesOnMemoryPressure();
final int timeoutSec = config.getInvalidateTablesTimeoutS();
final double gcOldGenFullThreshold =
config.getInvalidateTablesGcOldGenFullThreshold();
final double fractionOnMemoryPressure =
config.getInvalidateTablesFractionOnMemoryPressure();
Preconditions.checkArgument(timeoutSec >= 0,
"invalidate_tables_timeout_s must be a non-negative integer.");
Preconditions.checkArgument(gcOldGenFullThreshold >= 0 && gcOldGenFullThreshold <= 1,
"invalidate_tables_gc_old_gen_full_threshold must be in [0, 1].");
Preconditions
.checkArgument(fractionOnMemoryPressure >= 0 && fractionOnMemoryPressure <= 1,
"invalidate_tables_fraction_on_memory_pressure must be in [0, 1].");
if (timeoutSec > 0 || invalidateTableOnMemoryPressure) {
return new CatalogdTableInvalidator(catalog, timeoutSec,
invalidateTableOnMemoryPressure, gcOldGenFullThreshold,
fractionOnMemoryPressure);
} else {
return null;
}
}
static long nanoTime() {
return TIME_SOURCE.read();
}
/**
* Try to find the old generation memory pool in the GC beans and listen to the GC bean
* which the old gen memory pool belongs to. Return whether the GC beans are supported.
* If the return value is false, the listener is not installed.
*/
private boolean tryInstallGcListener() {
String commonErrMsg = "Continuing without GC-triggered invalidation of tables.";
List<GarbageCollectorMXBean> gcbeans = java.lang.management.ManagementFactory
.getPlatformMXBeans(GarbageCollectorMXBean.class);
GcNotificationListener gcNotificationListener = new GcNotificationListener();
boolean foundOldPool = false;
for (GarbageCollectorMXBean gcbean : gcbeans) {
for (String poolName : gcbean.getMemoryPoolNames()) {
if (!poolName.contains("Old")) continue;
if (!(gcbean instanceof NotificationListener)) {
LOG.warn("GCBean " + gcbean.getClass().getName() + " is not supported " +
"because it does not implement NotificationListener. " + commonErrMsg);
return false;
}
oldGenGcBean_ = gcbean;
oldGcGenName_ = poolName;
lastObservedGcCount_ = gcbean.getCollectionCount();
foundOldPool = true;
((NotificationEmitter) gcbean)
.addNotificationListener(gcNotificationListener, null, null);
break;
}
}
if (!foundOldPool) {
LOG.warn("Cannot find old generation memory pool in the GC beans. " + commonErrMsg);
}
return foundOldPool;
}
/**
* Detect whether a GC happened since the last observation and the old generation is
* loaded more than the configured threshold. If so it returns true indicating that
* tables should be evicted because of memory pressure.
*/
private boolean shouldEvictFromFullHeapAfterGc() {
if (!invalidateTableOnMemoryPressure_) return false;
long gcCount = oldGenGcBean_.getCollectionCount();
if (gcCount > lastObservedGcCount_) {
lastObservedGcCount_ = gcCount;
GcInfo lastGcInfo = oldGenGcBean_.getLastGcInfo();
if (lastGcInfo == null) {
LOG.warn("gcBean.getLastGcInfo() returned null. Table invalidation based on " +
"memory pressure was skipped.");
return false;
}
MemoryUsage tenuredGenUsage = lastGcInfo.getMemoryUsageAfterGc().get(oldGcGenName_);
Preconditions.checkState(tenuredGenUsage != null);
return tenuredGenUsage.getMax() * oldGenFullThreshold_ < tenuredGenUsage.getUsed();
}
return false;
}
private void invalidateSome(double invalidationFraction) {
List<Table> tables = new ArrayList<>();
for (Db db : catalog_.getAllDbs()) {
for (Table table : db.getTables()) {
if (table.isLoaded()) tables.add(table);
}
}
// TODO: use quick select
Collections.sort(tables, new Comparator<Table>() {
@Override
public int compare(Table o1, Table o2) {
return Long.compare(o1.getLastUsedTime(), o2.getLastUsedTime());
}
});
for (int i = 0; i < tables.size() * invalidationFraction; ++i) {
TTableName tTableName = tables.get(i).getTableName().toThrift();
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();
catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
LOG.info("Table " + tables.get(i).getFullName() + " invalidated due to memory " +
"pressure.");
}
}
private void invalidateOlderThan(long retireAgeNano) {
long now = TIME_SOURCE.read();
for (Db db : catalog_.getAllDbs()) {
for (Table table : catalog_.getAllTables(db)) {
if (!table.isLoaded()) continue;
long inactivityTime = now - table.getLastUsedTime();
if (inactivityTime <= retireAgeNano) continue;
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();
TTableName tTableName = table.getTableName().toThrift();
catalog_.invalidateTable(tTableName, tblWasRemoved, dbWasAdded);
LOG.info(
"Invalidated " + table.getFullName() + " due to inactivity for " +
TimeUnit.NANOSECONDS.toSeconds(inactivityTime) + " seconds.");
}
}
}
void stop() {
synchronized (this) {
stopped_ = true;
notify();
}
try {
daemonThread_.join();
} catch (InterruptedException e) {
LOG.warn("stop() is interrupted", e);
}
}
@VisibleForTesting
synchronized void wakeUpForTests() {
notify();
}
private class DaemonThread implements Runnable {
@Override
public void run() {
long sleepTimeNano = Math.min(unusedTableTtlNano_ / 10, DAEMON_MAXIMUM_SLEEP_NANO);
while (true) {
try {
synchronized (CatalogdTableInvalidator.this) {
if (stopped_) return;
if (shouldEvictFromFullHeapAfterGc()) {
invalidateSome(gcInvalidationFraction_);
scanCount_.incrementAndGet();
}
long now = nanoTime();
// Wait for a fraction of unusedTableTtlNano_ if time-based invalidation is
// enabled
if (unusedTableTtlNano_ > 0 && now >= lastInvalidationTime_ + sleepTimeNano) {
invalidateOlderThan(unusedTableTtlNano_);
lastInvalidationTime_ = now;
scanCount_.incrementAndGet();
}
// Wait unusedTableTtlSec if it is configured. Otherwise wait
// indefinitely.
TimeUnit.NANOSECONDS.timedWait(CatalogdTableInvalidator.this, sleepTimeNano);
}
} catch (Exception e) {
LOG.warn("Unexpected exception thrown while attempting to automatically " +
"invalidate tables. Will retry in 5 seconds.", e);
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
}
}
}
}
private class GcNotificationListener implements NotificationListener {
@Override
public void handleNotification(Notification notification, Object handback) {
synchronized (CatalogdTableInvalidator.this) {
CatalogdTableInvalidator.this.notify();
}
}
}
}

View File

@@ -190,6 +190,7 @@ public class DataSourceTable extends Table implements FeDataSourceTable {
// Set table stats.
setTableStats(msTable_);
refreshLastUsedTime();
} catch (Exception e) {
throw new TableLoadingException("Failed to load metadata for data source table: " +
name_, e);

View File

@@ -115,6 +115,7 @@ public class HBaseTable extends Table implements FeHBaseTable {
// single clustering col
numClusteringCols_ = 1;
loadAllColumnStats(client);
refreshLastUsedTime();
} catch (Exception e) {
throw new TableLoadingException("Failed to load metadata for HBase table: " + name_,
e);

View File

@@ -1229,6 +1229,7 @@ public class HdfsTable extends Table implements FeFsTable {
if (loadTableSchema) setAvroSchema(client, msTbl);
setTableStats(msTbl);
fileMetadataStats_.unset();
refreshLastUsedTime();
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {

View File

@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.impala.analysis.TableName;
import org.apache.impala.common.JniUtil;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.thrift.TUpdateTableUsageResponse;
import org.apache.log4j.Logger;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Random;
/**
* Track the names and the number of usages of the recently used tables and report the
* data to catalogd asynchronously in order to invalidate the recently unused tables.
*/
public class ImpaladTableUsageTracker {
private static final Logger LOG = Logger.getLogger(ImpaladTableUsageTracker.class);
private final static long REPORT_INTERVAL_MS = 10000;
private HashMap<TTableName, TTableUsage> unreportedUsages;
private Thread reportThread_;
private ImpaladTableUsageTracker(boolean enabled) {
if (!enabled) return;
unreportedUsages = new HashMap<>();
reportThread_ = new Thread(new Runnable() {
@Override
public void run() {
report();
}
});
reportThread_.setDaemon(true);
reportThread_.setName("ImpaladTableUsageTracker daemon thread");
reportThread_.start();
}
public static ImpaladTableUsageTracker createFromConfig(BackendConfig config) {
final boolean invalidateTableOnMemoryPressure =
config.invalidateTablesOnMemoryPressure();
final int unusedTableTtlSec = config.getInvalidateTablesTimeoutS();
Preconditions.checkArgument(unusedTableTtlSec >= 0,
"unused_table_ttl_sec flag must be a non-negative integer.");
return new ImpaladTableUsageTracker(
unusedTableTtlSec > 0 || invalidateTableOnMemoryPressure);
}
/**
* Report used table names asynchronously. This might be called even if automatic
* invalidation is disabled, but in that case, it will be a no-op.
*/
public synchronized void recordTableUsage(Collection<TableName> tableNames) {
if (reportThread_ == null) return;
for (TableName tableName : tableNames) {
TTableName tTableName = tableName.toThrift();
if (unreportedUsages.containsKey(tTableName)) {
unreportedUsages.get(tTableName).num_usages++;
} else {
unreportedUsages.put(tTableName, new TTableUsage(tTableName, 1));
}
}
notify();
}
private void report() {
Random random = new Random();
String updateFailureMessage =
"Unable to report table usage information to catalog server. ";
while (true) {
try {
// Randomly sleep for [0.5, 1.5) * REPORT_INTERVAL_MS, to avoid flooding catalogd.
Thread.sleep((long) (REPORT_INTERVAL_MS * (0.5 + random.nextDouble())));
TUpdateTableUsageRequest reqToSend;
synchronized (this) {
if (unreportedUsages.isEmpty()) continue;
reqToSend =
new TUpdateTableUsageRequest(new ArrayList<>(unreportedUsages.values()));
unreportedUsages.clear();
}
byte[] byteResp =
FeSupport.NativeUpdateTableUsage(new TSerializer().serialize(reqToSend));
TUpdateTableUsageResponse resp = new TUpdateTableUsageResponse();
JniUtil.deserializeThrift(new TBinaryProtocol.Factory(), resp, byteResp);
if (resp.status.isSetStatus_code() &&
!resp.status.status_code.equals(TErrorCode.OK)) {
LOG.warn(
updateFailureMessage + Joiner.on("\n").join(resp.status.getError_msgs()));
}
} catch (Exception e) {
LOG.warn(updateFailureMessage, e);
}
}
}
}

View File

@@ -195,7 +195,7 @@ public class KuduTable extends Table implements FeKuduTable {
throw new TableLoadingException("Error loading metadata for Kudu table " +
kuduTableName_, e);
}
refreshLastUsedTime();
// Avoid updating HMS if the schema didn't change.
if (msTable_.equals(msTbl)) return;

View File

@@ -105,6 +105,11 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// True if this object is stored in an Impalad catalog cache.
protected boolean storedInImpaladCatalogCache_ = false;
// Last used time of this table in nanoseconds as returned by
// CatalogdTableInvalidator.nanoTime(). This is only set in catalogd and not used by
// impalad.
protected long lastUsedTime_;
// Table metrics. These metrics are applicable to all table types. Each subclass of
// Table can define additional metrics specific to that table type.
public static final String REFRESH_DURATION_METRIC = "refresh-duration";
@@ -167,6 +172,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
return storedInImpaladCatalogCache_ || RuntimeEnv.INSTANCE.isTestEnv();
}
public long getLastUsedTime() {
Preconditions.checkState(lastUsedTime_ != 0 &&
!isStoredInImpaladCatalogCache());
return lastUsedTime_;
}
/**
* Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
* valid existing metadata.
@@ -572,4 +583,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
org.apache.hadoop.hive.metastore.api.Table msTbl, String propertyKey) {
msTbl.putToParameters(propertyKey, Long.toString(System.currentTimeMillis() / 1000));
}
public void refreshLastUsedTime() {
lastUsedTime_ = CatalogdTableInvalidator.nanoTime();
}
}

View File

@@ -100,6 +100,7 @@ public class View extends Table implements FeView {
tableStats_ = new TTableStats(-1);
tableStats_.setTotal_file_bytes(-1);
queryStmt_ = parseViewDef(this);
refreshLastUsedTime();
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {

View File

@@ -95,6 +95,22 @@ public class BackendConfig {
return backendCfg_.pull_incremental_statistics;
}
public int getInvalidateTablesTimeoutS() {
return backendCfg_.invalidate_tables_timeout_s;
}
public boolean invalidateTablesOnMemoryPressure() {
return backendCfg_.invalidate_tables_on_memory_pressure;
}
public double getInvalidateTablesGcOldGenFullThreshold() {
return backendCfg_.invalidate_tables_gc_old_gen_full_threshold;
}
public double getInvalidateTablesFractionOnMemoryPressure() {
return backendCfg_.invalidate_tables_fraction_on_memory_pressure;
}
// Inits the auth_to_local configuration in the static KerberosName class.
private static void initAuthToLocal() {
// If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

View File

@@ -114,6 +114,8 @@ public class FeSupport {
// Does an RPC to the Catalog Server to fetch specified table partition statistics.
public native static byte[] NativeGetPartitionStats(byte[] thriftReq);
public native static byte[] NativeUpdateTableUsage(byte[] thriftReq);
// Parses a string of comma-separated key=value query options ('csvQueryOptions'),
// updates the existing query options ('queryOptions') with them and returns the
// resulting serialized TQueryOptions object.

View File

@@ -86,6 +86,7 @@ import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.ImpaladTableUsageTracker;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
@@ -179,6 +180,8 @@ public class Frontend {
private final ScheduledExecutorService policyReader_ =
Executors.newScheduledThreadPool(1);
private final ImpaladTableUsageTracker impaladTableUsageTracker_;
public Frontend(AuthorizationConfig authorizationConfig) {
this(authorizationConfig, FeCatalogManager.createFromBackendConfig());
}
@@ -214,6 +217,8 @@ public class Frontend {
policyReader_.scheduleAtFixedRate(policyReaderTask,
delay, AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS, TimeUnit.SECONDS);
}
impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig(
BackendConfig.INSTANCE);
}
/**
@@ -242,6 +247,10 @@ public class Frontend {
public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
public ImpaladTableUsageTracker getImpaladTableUsageTracker() {
return impaladTableUsageTracker_;
}
public TUpdateCatalogCacheResponse updateCatalogCache(
TUpdateCatalogCacheRequest req) throws CatalogException, TException {
TUpdateCatalogCacheResponse resp = catalogManager_.updateCatalogCache(req);

View File

@@ -65,6 +65,7 @@ import org.apache.impala.thrift.TStatus;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.GlogAppender;
import org.apache.impala.util.PatternMatcher;
import org.apache.thrift.TException;
@@ -313,4 +314,10 @@ public class JniCatalog {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(catalog_.getCatalogUsage());
}
public void updateTableUsage(byte[] req) throws ImpalaException {
TUpdateTableUsageRequest thriftReq = new TUpdateTableUsageRequest();
JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
catalog_.updateTableUsage(thriftReq);
}
}

View File

@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import com.google.common.base.Ticker;
import org.apache.impala.common.Reference;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.thrift.TTableName;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public class CatalogdTableInvalidatorTest {
private CatalogServiceCatalog catalog_ = CatalogServiceTestCatalog.create();
private long waitForTrigger(long previousTriggerCount) throws InterruptedException {
long triggerCount;
do {
sleep(5);
triggerCount = catalog_.getCatalogdTableInvalidator().scanCount_.get();
} while (triggerCount == previousTriggerCount);
return triggerCount;
}
/**
* Test time-based invalidation in CatalogdTableInvalidator.
*/
@Test
public void testCatalogdTableInvalidator()
throws CatalogException, InterruptedException {
Reference<Boolean> tblWasRemoved = new Reference<>();
Reference<Boolean> dbWasAdded = new Reference<>();
String dbName = "functional";
String tblName = "alltypes";
catalog_.invalidateTable(new TTableName(dbName, tblName), tblWasRemoved, dbWasAdded);
MockTicker ticker = new MockTicker();
CatalogdTableInvalidator.TIME_SOURCE = ticker;
catalog_.setCatalogdTableInvalidator(
new CatalogdTableInvalidator(catalog_, /*unusedTableTtlSec=*/
2, /*invalidateTablesOnMemoryPressure=*/false, /*oldGenFullThreshold=*/
0.6, /*gcInvalidationFraction=*/0.1));
Assert.assertFalse(catalog_.getDb(dbName).getTable(tblName).isLoaded());
Table table = catalog_.getOrLoadTable(dbName, tblName);
Assert.assertTrue(table.isLoaded());
Assert.assertEquals(ticker.now_, table.getLastUsedTime());
long previousTriggerCount = catalog_.getCatalogdTableInvalidator().scanCount_.get();
ticker.set(TimeUnit.SECONDS.toNanos(1));
table.refreshLastUsedTime();
ticker.set(TimeUnit.SECONDS.toNanos(3));
previousTriggerCount = waitForTrigger(previousTriggerCount);
// The last used time is refreshed so the table won't be invalidated
Assert.assertTrue(catalog_.getTable(dbName, tblName).isLoaded());
ticker.set(TimeUnit.SECONDS.toNanos(6));
waitForTrigger(previousTriggerCount);
// The table is now invalidated
Assert.assertFalse(catalog_.getTable(dbName, tblName).isLoaded());
}
@After
public void cleanUp() {
catalog_.getCatalogdTableInvalidator().stop();
catalog_.setCatalogdTableInvalidator(null);
CatalogdTableInvalidator.TIME_SOURCE = Ticker.systemTicker();
}
class MockTicker extends Ticker {
long now_ = 1000;
@Override
synchronized public long read() {
return now_;
}
void set(long nanoSec) {
synchronized (this) {
now_ = nanoSec;
}
catalog_.getCatalogdTableInvalidator().wakeUpForTests();
}
}
}

View File

@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestAutomaticCatalogInvalidation(CustomClusterTestSuite):
""" Test that tables are cached in the catalogd after usage for the configured time
and invalidated afterwards."""
query = "select count(*) from functional.alltypes"
# The following columns string presents in the catalog object iff the table loaded.
metadata_cache_string = "columns (list) = list&lt;struct&gt;"
url = "http://localhost:25020/catalog_object?object_type=TABLE&" \
"object_name=functional.alltypes"
@classmethod
def get_workload(cls):
return 'functional-query'
def _get_catalog_object(self):
""" Return the catalog object of functional.alltypes serialized to string. """
return self.cluster.catalogd.service.read_debug_webpage(
"catalog_object?object_type=TABLE&object_name=functional.alltypes")
def _run_test(self, cursor):
cursor.execute(self.query)
cursor.fetchall()
# The table is cached after usage.
assert self.metadata_cache_string in self._get_catalog_object()
timeout = time.time() + 20
while True:
time.sleep(1)
# The table is eventually evicted.
if self.metadata_cache_string not in self._get_catalog_object():
return
assert time.time() < timeout
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args="--invalidate_tables_timeout_s=5",
impalad_args="--invalidate_tables_timeout_s=5")
def test_v1_catalog(self, cursor):
self._run_test(cursor)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--invalidate_tables_timeout_s=5 --catalog_topic_mode=minimal",
impalad_args="--invalidate_tables_timeout_s=5 --use_local_catalog")
def test_local_catalog(self, cursor):
self._run_test(cursor)