Fix race when invalidating catalog metadata and loading a new table

There was race when the catalog was invalidated at the same time a table
was being loaded. This is because an uninitialized Table was being returned
unexpectedly to the impalad due to the concurrent invalidate.

This fixes the problem by updating the CatalogObjectCache to load when
a catalog object is uninitialized, rather than load when null. New items can
now be added in a initialized or uninitialized state; uninitialized objects
are loaded on access.

Also adds a stress test for invalidate metadata/invalidate metadata <table>/refresh

In addition, it cleans up the locking in the Catalog to make it more
straight forward. The top-level catalogLock_ is now only in CatalogServiceCatalog
and this lock is used to protect the catalogVersion_. Operations that need to
perform an atomic bulk catalog operation can use this lock (such as when the
CatalogServer needs to take a snapshot of the catalog to calculate what delta to send
to the statestore). Otherwise, the lock is not needed and objects are protected by the
synchronization at each level in the object heirarchy (Db->[Function/Table]). That is,
Dbs are synchronized by the Db cache, each Db has a Table Cache which is synchronized
independently.

Change-Id: I9e542cd39cdbef26ddf05499470c0d96bb888765
Reviewed-on: http://gerrit.ent.cloudera.com:8080/1355
Reviewed-by: Lenni Kuff <lskuff@cloudera.com>
Tested-by: jenkins
Reviewed-on: http://gerrit.ent.cloudera.com:8080/1418
This commit is contained in:
Lenni Kuff
2014-01-23 15:23:43 -08:00
committed by jenkins
parent 62338694e4
commit 7a6892dcbe
19 changed files with 630 additions and 687 deletions

View File

@@ -38,10 +38,6 @@ enum TTableType {
HDFS_TABLE,
HBASE_TABLE,
VIEW,
// A table that does not contain all needed metadata. This can be either because
// of an error loading the metadata or because the table metadata has not yet
// been loaded.
INCOMPLETE_TABLE
}
enum THdfsFileFormat {

View File

@@ -0,0 +1,33 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.catalog;
/**
* Interface that defines how new CatalogObjects are loaded into a cache.
*/
public abstract class CacheLoader<K, V extends CatalogObject> {
/**
* Loads an element into the cache for the given key name. The cachedValue
* parameter is an existing cached entry that can be reused to help speed up
* value loading. If cachedValue is null it will be ignored.
*/
public abstract V load(K key, V cachedValue, long catalogVersion);
/**
* Returns the next catalog version. This value is generally passed to
* load() so newly loaded items can get assigned a catalog version.
*/
public abstract long getNextCatalogVersion();
}

View File

@@ -16,10 +16,9 @@ package com.cloudera.impala.catalog;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,11 +36,16 @@ import com.google.common.collect.Lists;
/**
* Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
* This class caches db-, table- and column-related metadata. Although this class is
* This class provides a storage API for caching CatalogObjects: databases, tables,
* and functions and the relevant metadata to go along with them. Although this class is
* thread safe, it does not guarantee consistency with the MetaStore. It is important
* to keep in mind that there may be external (potentially conflicting) concurrent
* metastore updates occurring at any time.
* All reads and writes of catalog objects should be synchronized using the catalogLock_.
* The CatalogObject storage hierarchy is:
* Catalog -> Db -> Table
* -> Function
* Each level has its own synchronization, so the cache of Dbs is synchronized and each
* Db has a cache of tables which is synchronized independently.
*/
public abstract class Catalog {
// Initial catalog version.
@@ -51,16 +55,15 @@ public abstract class Catalog {
private static final Logger LOG = Logger.getLogger(Catalog.class);
private final MetaStoreClientPool metaStoreClientPool_ = new MetaStoreClientPool(0);
private final CatalogInitStrategy initStrategy_;
private final AtomicInteger nextTableId_ = new AtomicInteger(0);
protected final MetaStoreClientPool metaStoreClientPool_ = new MetaStoreClientPool(0);
protected final CatalogInitStrategy initStrategy_;
// Cache of database metadata.
protected final HashMap<String, Db> dbCache_ = new HashMap<String, Db>();
// Fair lock used to synchronize catalog accesses and updates.
protected final ReentrantReadWriteLock catalogLock_ =
new ReentrantReadWriteLock(true);
// Thread safe cache of database metadata. Uses an AtomicReference so reset()
// operations can atomically swap dbCache_ references.
// TODO: Update this to use a CatalogObjectCache?
protected AtomicReference<ConcurrentHashMap<String, Db>> dbCache_ =
new AtomicReference<ConcurrentHashMap<String, Db>>(
new ConcurrentHashMap<String, Db>());
// Determines how the Catalog should be initialized.
public enum CatalogInitStrategy {
@@ -80,10 +83,14 @@ public abstract class Catalog {
if (initStrategy != CatalogInitStrategy.EMPTY) {
metaStoreClientPool_.addClients(META_STORE_CLIENT_POOL_SIZE);
}
reset();
}
public Catalog() { this(CatalogInitStrategy.LAZY); }
/**
* Resets this catalog instance by clearing all cached metadata and potentially
* reloading the metadata. How the metadata is loaded is based on the
* CatalogInitStrategy that was set in the c'tor.
*/
public abstract void reset() throws CatalogException;
/**
* Adds a new database to the catalog, replacing any existing database with the same
@@ -91,12 +98,7 @@ public abstract class Catalog {
* previous database.
*/
public Db addDb(Db db) {
catalogLock_.writeLock().lock();
try {
return dbCache_.put(db.getName().toLowerCase(), db);
} finally {
catalogLock_.writeLock().unlock();
}
return dbCache_.get().put(db.getName().toLowerCase(), db);
}
/**
@@ -106,12 +108,7 @@ public abstract class Catalog {
public Db getDb(String dbName) {
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
"Null or empty database name given as argument to Catalog.getDb");
catalogLock_.readLock().lock();
try {
return dbCache_.get(dbName.toLowerCase());
} finally {
catalogLock_.readLock().unlock();
}
return dbCache_.get().get(dbName.toLowerCase());
}
/**
@@ -120,12 +117,7 @@ public abstract class Catalog {
* statements.
*/
public Db removeDb(String dbName) {
catalogLock_.writeLock().lock();
try {
return dbCache_.remove(dbName.toLowerCase());
} finally {
catalogLock_.writeLock().unlock();
}
return dbCache_.get().remove(dbName.toLowerCase());
}
/**
@@ -135,12 +127,7 @@ public abstract class Catalog {
* dbPattern may be null (and thus matches everything).
*/
public List<String> getDbNames(String dbPattern) {
catalogLock_.readLock().lock();
try {
return filterStringsByPattern(dbCache_.keySet(), dbPattern);
} finally {
catalogLock_.readLock().unlock();
}
return filterStringsByPattern(dbCache_.get().keySet(), dbPattern);
}
/**
@@ -149,16 +136,11 @@ public abstract class Catalog {
*/
public Table getTable(String dbName, String tableName) throws
DatabaseNotFoundException {
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return db.getTable(tableName);
} finally {
catalogLock_.readLock().unlock();
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return db.getTable(tableName);
}
/**
@@ -166,15 +148,10 @@ public abstract class Catalog {
* if the table/database does not exist.
*/
public Table removeTable(TTableName tableName) {
catalogLock_.writeLock().lock();
try {
// Remove the old table name from the cache and add the new table.
Db db = getDb(tableName.getDb_name());
if (db == null) return null;
return db.removeTable(tableName.getTable_name());
} finally {
catalogLock_.writeLock().unlock();
}
// Remove the old table name from the cache and add the new table.
Db db = getDb(tableName.getDb_name());
if (db == null) return null;
return db.removeTable(tableName.getTable_name());
}
/**
@@ -189,26 +166,16 @@ public abstract class Catalog {
public List<String> getTableNames(String dbName, String tablePattern)
throws DatabaseNotFoundException {
Preconditions.checkNotNull(dbName);
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return filterStringsByPattern(db.getAllTableNames(), tablePattern);
} finally {
catalogLock_.readLock().unlock();
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return filterStringsByPattern(db.getAllTableNames(), tablePattern);
}
public boolean containsTable(String dbName, String tableName) {
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
return (db == null) ? false : db.containsTable(tableName);
} finally {
catalogLock_.readLock().unlock();
}
Db db = getDb(dbName);
return (db == null) ? false : db.containsTable(tableName);
}
/**
@@ -221,14 +188,9 @@ public abstract class Catalog {
* resolve first to db.fn().
*/
public boolean addFunction(Function fn) {
catalogLock_.writeLock().lock();
try {
Db db = getDb(fn.dbName());
if (db == null) return false;
return db.addFunction(fn);
} finally {
catalogLock_.writeLock().unlock();
}
Db db = getDb(fn.dbName());
if (db == null) return false;
return db.addFunction(fn);
}
/**
@@ -237,14 +199,9 @@ public abstract class Catalog {
* in the catalog, it will return the function with the strictest matching mode.
*/
public Function getFunction(Function desc, Function.CompareMode mode) {
catalogLock_.readLock().lock();
try {
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.getFunction(desc, mode);
} finally {
catalogLock_.readLock().unlock();
}
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.getFunction(desc, mode);
}
/**
@@ -253,14 +210,9 @@ public abstract class Catalog {
* null.
*/
public Function removeFunction(Function desc) {
catalogLock_.writeLock().lock();
try {
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.removeFunction(desc);
} finally {
catalogLock_.writeLock().unlock();
}
Db db = getDb(desc.dbName());
if (db == null) return null;
return db.removeFunction(desc);
}
/**
@@ -268,16 +220,11 @@ public abstract class Catalog {
*/
public List<String> getFunctionSignatures(TFunctionType type, String dbName,
String pattern) throws DatabaseNotFoundException {
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return filterStringsByPattern(db.getAllFunctionSignatures(type), pattern);
} finally {
catalogLock_.readLock().unlock();
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
}
return filterStringsByPattern(db.getAllFunctionSignatures(type), pattern);
}
/**
@@ -285,14 +232,9 @@ public abstract class Catalog {
* are ignored.
*/
public boolean functionExists(FunctionName name) {
catalogLock_.readLock().lock();
try {
Db db = getDb(name.getDb());
if (db == null) return false;
return db.functionExists(name);
} finally {
catalogLock_.readLock().unlock();
}
Db db = getDb(name.getDb());
if (db == null) return false;
return db.functionExists(name);
}
/**
@@ -301,68 +243,12 @@ public abstract class Catalog {
*/
public void close() { metaStoreClientPool_.close(); }
/**
* Gets the next table ID and increments the table ID counter.
*/
public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); }
/**
* Returns a managed meta store client from the client connection pool.
*/
public MetaStoreClient getMetaStoreClient() { return metaStoreClientPool_.getClient(); }
/**
* Resets this catalog instance by clearing all cached metadata and potentially
* reloading the metadata. How the metadata is loaded is based on the
* CatalogInitStrategy that was set in the c'tor.
*/
public long reset() {
catalogLock_.writeLock().lock();
try {
return resetInternal();
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Executes the underlying reset logic. catalogLock_.writeLock() must
* be taken before calling this.
*/
protected long resetInternal() {
try {
nextTableId_.set(0);
dbCache_.clear();
if (initStrategy_ == CatalogInitStrategy.EMPTY) {
return CatalogServiceCatalog.getCatalogVersion();
}
MetaStoreClient msClient = metaStoreClientPool_.getClient();
try {
for (String dbName: msClient.getHiveClient().getAllDatabases()) {
Db db = new Db(dbName, this);
db.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
addDb(db);
for (final String tableName: msClient.getHiveClient().getAllTables(dbName)) {
Table incompleteTbl =
IncompleteTable.createUninitializedTable(getNextTableId(), db, tableName);
incompleteTbl.setCatalogVersion(
CatalogServiceCatalog.incrementAndGetCatalogVersion());
db.addTable(incompleteTbl);
}
}
} finally {
msClient.release();
}
return CatalogServiceCatalog.getCatalogVersion();
} catch (Exception e) {
LOG.error(e);
LOG.error("Error initializing Catalog. Catalog may be empty.");
throw new IllegalStateException(e);
}
}
/**
* Implement Hive's pattern-matching semantics for SHOW statements. The only
* metacharacters are '*' which matches any string of characters, and '|'
@@ -416,21 +302,16 @@ public abstract class Catalog {
PartitionNotFoundException, TableNotFoundException, TableLoadingException {
String partitionNotFoundMsg =
"Partition not found: " + Joiner.on(", ").join(partitionSpec);
catalogLock_.readLock().lock();
try {
Table table = getTable(dbName, tableName);
// This is not an Hdfs table, throw an error.
if (!(table instanceof HdfsTable)) {
throw new PartitionNotFoundException(partitionNotFoundMsg);
}
// Get the HdfsPartition object for the given partition spec.
HdfsPartition partition =
((HdfsTable) table).getPartitionFromThriftPartitionSpec(partitionSpec);
if (partition == null) throw new PartitionNotFoundException(partitionNotFoundMsg);
return partition;
} finally {
catalogLock_.readLock().unlock();
Table table = getTable(dbName, tableName);
// This is not an Hdfs table, throw an error.
if (!(table instanceof HdfsTable)) {
throw new PartitionNotFoundException(partitionNotFoundMsg);
}
// Get the HdfsPartition object for the given partition spec.
HdfsPartition partition =
((HdfsTable) table).getPartitionFromThriftPartitionSpec(partitionSpec);
if (partition == null) throw new PartitionNotFoundException(partitionNotFoundMsg);
return partition;
}
/**
@@ -503,4 +384,4 @@ public abstract class Catalog {
}
return result;
}
}
}

View File

@@ -71,6 +71,8 @@ public class CatalogDeltaLog {
*/
public synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject) {
Preconditions.checkNotNull(catalogObject);
if (removedCatalogObjects_.isEmpty()) return false;
// Get all the items that were removed after the catalog version of this object.
SortedMap<Long, TCatalogObject> candidateObjects =
removedCatalogObjects_.tailMap(catalogObject.getCatalog_version());

View File

@@ -31,4 +31,7 @@ public interface CatalogObject {
// Sets the version of this catalog object.
public void setCatalogVersion(long newVersion);
// Returns true if this CatalogObject has had its metadata loaded, false otherwise.
public boolean isLoaded();
}

View File

@@ -20,35 +20,33 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Lazily loads metadata on read (through getOrLoad()) and tracks the set of valid/known
* object names. This class is thread safe.
* object names. This class is thread safe. The CatalogObjectCache is created by passing
* a custom CacheLoader object which can implement its own load() logic.
*
* If a catalog object has not yet been loaded successfully, getOrLoad() will attempt to
* load its metadata. It is only possible to load metadata for objects that have
* previously been created with a call to add() or addName(). The catalog cache supports
* parallel loading/gets of different keys. While a load is in progress, any calls to
* get the same key will block until the load completes at which point the loaded value
* will be returned.
*
* Metadata can be invalidated or reloaded. The CatalogObjectCache is initialized using
* a custom CacheLoader object which can implement its own load()/reload() logic, but
* in general the behavior is:
* - reload(name) will perform a synchronous incremental refresh of the object.
* Depending on its implementation, refresh might reuse some of the existing metadata
* which could result in a partially stale object but faster load time.
* - invalidate(name) will mark the item in the metadata cache as invalid
* and the next getOrLoad() will trigger a full metadata reload.
* Items are added to the cache by calling add(T catalogObject). New objects can be
* added in an initialized or uninitialized state (determined by the isLoaded()
* property of the CatalogObject). If a catalog object is uninitialized, getOrLoad()
* will attempt to load its metadata. Otherwise, getOrLoad() will return the cached value.
* Invalidation of cache entries is done by calling add() and passing in a CatalogObject
* that has isLoaded() == false. The next access to the object will trigger a
* metadata load.
* The catalog cache supports parallel loading/gets of different keys. While a load is in
* progress, any calls to get the same key will block until the load completes at which
* point the loaded value will be returned.
* Metadata can be reloaded. Reloading an object will perform a synchronous incremental
* refresh the object's metadata. Depending on the load() implementation in the
* CacheLoader, refresh might reuse some of the existing metadata which could result in
* a partially stale object but faster load time.
*/
public class CatalogObjectCache<T extends CatalogObject> {
private static final Logger LOG = Logger.getLogger(CatalogObjectCache.class);
// Map of lower-case object name to CacheEntry objects. New CacheEntries are created
// by calling add() or addName(). If a CacheEntry does not exist "getOrLoad()" will
// by calling add(). If a CacheEntry does not exist "getOrLoad()" will
// return null.
private final ConcurrentHashMap<String, CacheEntry<T>> metadataCache_ =
new ConcurrentHashMap<String, CacheEntry<T>>();
@@ -58,48 +56,36 @@ public class CatalogObjectCache<T extends CatalogObject> {
/**
* Stores and lazily loads CatalogObjects upon read. Ensures the CatalogObject
* catalog versions are strictly increasing when updated. This class is thread safe.
* catalog versions are strictly increasing when updated with an initialized
* (loaded) value. This class is thread safe.
*/
private static class CacheEntry<T extends CatalogObject> {
private final String key_;
private final CacheLoader<String, T> cacheLoader_;
private T catalogObject_;
private CacheEntry(String key, CacheLoader<String, T> cacheLoader) {
key_ = key;
private CacheEntry(T catalogObject, CacheLoader<String, T> cacheLoader) {
catalogObject_ = catalogObject;
cacheLoader_ = cacheLoader;
}
/**
* Replaces the CatalogObject in this CacheEntry if it is newer than the
* existing value (the catalog version is greater). Returns true if the existing
* value was replaced or false if the existing value was preserved.
* existing value (the catalog version is greater) or if the existing value
* has not yet uninitialized (has not been loaded) and the new value is
* loaded.
* Returns true if the existing value was replaced or false if the existing
* value was preserved.
*/
public synchronized boolean replaceIfNewer(T catalogObject) {
Preconditions.checkNotNull(catalogObject);
if (catalogObject_ == null ||
catalogObject_.getCatalogVersion() < catalogObject.getCatalogVersion()) {
catalogObject_ = catalogObject;
public synchronized boolean replaceIfNewer(T newCatalogObject) {
Preconditions.checkNotNull(newCatalogObject);
if (catalogObject_.getCatalogVersion() < newCatalogObject.getCatalogVersion()
|| !catalogObject_.isLoaded() && newCatalogObject.isLoaded()) {
catalogObject_ = newCatalogObject;
return true;
}
return false;
}
/**
* Invalidates the current value. The next call to getOrLoad() or reload() will
* trigger a metadata load.
*/
public void invalidate() {
T tmpCatalogObject = catalogObject_;
synchronized(this) {
// Only invalidate if the reference hasn't changed. This helps reduce the
// likely-hood that a newly loaded value gets immediately wiped out
// by a concurrent invalidate().
// TODO: Consider investigating a more fair locking scheme.
if (tmpCatalogObject == catalogObject_) catalogObject_ = null;
}
}
/**
* Returns the current CatalogObject value in this CacheEntry.
*/
@@ -107,50 +93,59 @@ public class CatalogObjectCache<T extends CatalogObject> {
/**
* Gets the current catalog object for this CacheEntry, loading it if needed (if the
* existing catalog object is null). Throws a CatalogException on any error
* loading the metadata.
* existing catalog object is uninitialized).
*/
public synchronized T getOrLoad() throws CatalogException {
if (catalogObject_ != null) return catalogObject_;
try {
T loadedObject = cacheLoader_.load(key_.toLowerCase());
public T getOrLoad() {
// Get the catalog version to assign the to the loaded object. It's important to
// do this before locking, because getNextCatalogVersion() may require taking
// a top-level lock.
long targetCatalogVersion = cacheLoader_.getNextCatalogVersion();
synchronized (this) {
Preconditions.checkNotNull(catalogObject_);
if (catalogObject_.isLoaded()) return catalogObject_;
T loadedObject = cacheLoader_.load(catalogObject_.getName().toLowerCase(), null,
targetCatalogVersion);
Preconditions.checkNotNull(loadedObject);
Preconditions.checkState(loadedObject.isLoaded());
replaceIfNewer(loadedObject);
return catalogObject_;
} catch (Exception e) {
throw new CatalogException("Error loading metadata for: " + key_, e);
}
}
/**
* Reloads the value for this cache entry and replaces the existing value if the
* new object's catalog version is greater. All exceptions are logged and
* swallowed and the existing value will not be modified. This is similar to
* getOrLoad(), but can reuse the existing cached value to speedup loading time.
* TODO: Instead of serializing reload() requests, concurrent reload()'s could
* block until the in-progress reload() completes.
* Reloads the value for this cache entry and replaces the existing value. This is
* similar to load(), but can reuse the existing cached value to speedup loading
* time. Will block if any existing reloads/loads are in progress, and return the
* value the value they loaded.
*/
public synchronized T reload() {
try {
ListenableFuture<T> result = cacheLoader_.reload(key_, catalogObject_);
Preconditions.checkNotNull(result);
public T reload() {
// Get the catalog version to assign the to the reloaded object. It's important to
// do this before locking, because getNextCatalogVersion() may require taking
// a top-level lock.
long targetCatalogVersion = cacheLoader_.getNextCatalogVersion();
// Wait for the reload to complete.
T reloadedObject = result.get();
Preconditions.checkNotNull(reloadedObject);
replaceIfNewer(reloadedObject);
} catch (Exception e) {
LOG.error(e);
T tmpCatalogObject = catalogObject_;
synchronized (this) {
// Only reload if the underlying catalog object has not changed since waiting
// on the lock OR if the catalog object is uninitialized and needs to be loaded.
if (tmpCatalogObject == catalogObject_ || !catalogObject_.isLoaded()) {
T loadedObject = cacheLoader_.load(catalogObject_.getName(), catalogObject_,
targetCatalogVersion);
Preconditions.checkNotNull(loadedObject);
Preconditions.checkState(loadedObject.isLoaded());
replaceIfNewer(loadedObject);
}
return catalogObject_;
}
return catalogObject_;
}
/**
* Creates a new CacheEntry with the given key and CacheLoader.
*/
public static <T extends CatalogObject> CacheEntry<T>
create(String key, CacheLoader<String, T> cacheLoader) {
return new CacheEntry<T>(key.toLowerCase(), cacheLoader);
create(T catalogObject, CacheLoader<String, T> cacheLoader) {
return new CacheEntry<T>(catalogObject, cacheLoader);
}
}
@@ -170,10 +165,9 @@ public class CatalogObjectCache<T extends CatalogObject> {
public boolean add(T catalogObject) {
Preconditions.checkNotNull(catalogObject);
CacheEntry<T> cacheEntry =
CacheEntry.create(catalogObject.getName(), cacheLoader_);
CacheEntry.create(catalogObject, cacheLoader_);
CacheEntry<T> existingItem = metadataCache_.putIfAbsent(
catalogObject.getName().toLowerCase(),
cacheEntry);
catalogObject.getName().toLowerCase(), cacheEntry);
// When existingItem != null it indicates there was already an existing entry
// associated with the key, so apply the update to the existing entry.
@@ -182,22 +176,6 @@ public class CatalogObjectCache<T extends CatalogObject> {
return cacheEntry.replaceIfNewer(catalogObject);
}
/**
* Adds a new name to the cache, the next access to the object will trigger a
* metadata load. If an item with the same name already exists in the cache
* it will be invalidated.
* TODO: Should addName() require a catalog version associated with the operation?
*/
public void addName(String objectName) {
CacheEntry<T> cacheEntry = CacheEntry.create(objectName, cacheLoader_);
CacheEntry<T> existingItem = metadataCache_.putIfAbsent(
objectName.toLowerCase(), cacheEntry);
cacheEntry = existingItem != null ? existingItem : cacheEntry;
// This invalidate may be unnecessary if there was an existing item in the cache
// that didn't need invalidation, but it is still safe to do so.
cacheEntry.invalidate();
}
/**
* Removes an item from the metadata cache and returns the removed item, or null
* if no item was removed.
@@ -214,16 +192,6 @@ public class CatalogObjectCache<T extends CatalogObject> {
metadataCache_.clear();
}
/**
* Invalidates the CacheEntry's value for the given object name. The next access to
* this object will trigger a metadata load. Note that this does NOT remove the
* CacheEntry value or the key in the metadataCache_.
*/
public void invalidate(String name) {
CacheEntry<T> cacheEntry = metadataCache_.get(name.toLowerCase());
if (cacheEntry != null) cacheEntry.invalidate();
}
/**
* Reloads the metadata for the given object name (if the object already exists
* in the cache) or loads the object if it does not exists in the metadata
@@ -260,16 +228,21 @@ public class CatalogObjectCache<T extends CatalogObject> {
* It is important getOrLoad() not be synchronized to allow concurrent getOrLoad()
* requests on different keys.
*/
public T getOrLoad(final String name) {
public T getOrLoad(String name) {
CacheEntry<T> cacheEntry = metadataCache_.get(name.toLowerCase());
if (cacheEntry == null) return null;
try {
return cacheEntry.getOrLoad();
} catch (CatalogException e) {
// TODO: Consider throwing a CatalogException rather than an unchecked
// exception. IllegalStateException isn't really the right exception type
// either.
throw new IllegalStateException(e.getMessage(), e);
}
return cacheEntry.getOrLoad();
}
/**
* Returns the catalog object corresponding to the supplied name if it exists in the
* cache, or null if there is no CacheEntry in the metadataCache_ associated with this
* key. Will not perform a load(), so may return objects that are not initialized
* (have isLoaded() == false).
*/
public T get(String name) {
CacheEntry<T> cacheEntry = metadataCache_.get(name.toLowerCase());
if (cacheEntry == null) return null;
return cacheEntry.value();
}
}

View File

@@ -18,10 +18,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import com.cloudera.impala.common.ImpalaException;
import com.cloudera.impala.common.Pair;
import com.cloudera.impala.thrift.TCatalog;
@@ -41,19 +44,35 @@ import com.google.common.collect.Lists;
* will return the catalog version that the update will show up in. The client
* can then wait until the statestore sends an update that contains that catalog
* version.
* The CatalogServiceCatalog also manages a global "catalog version". The version
* is incremented and assigned to a CatalogObject whenever it is
* added/modified/removed from the catalog. This means each CatalogObject
* will have a unique version and assigned versions are strictly increasing.
*/
public class CatalogServiceCatalog extends Catalog {
private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
private final TUniqueId catalogServiceId_;
// Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
// protects catalogVersion_, it can be used to perform atomic bulk catalog operations
// since catalogVersion_ cannot change externally while the lock is being held.
// In addition to protecting catalogVersion_, it is currently used for the
// following bulk operations:
// * Building a delta update to send to the statestore in getAllCatalogObjects(),
// so a snapshot of the catalog can be taken without any version changes.
// * During a catalog invalidation (call to reset()), which re-reads all dbs and tables
// from the metastore.
// * During renameTable(), because a table must be removed and added to the catalog
// atomically (potentially in a different database).
private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
// Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
// with each update to the Catalog. Continued across the lifetime of the process.
// Atomic to ensure versions are always sequentially increasing, even when updated
// from different threads.
// TODO: This probably doesn't need to be atomic and can be updated while holding
// the catalogLock_.
private final static AtomicLong catalogVersion_ =
new AtomicLong(INITIAL_CATALOG_VERSION);
// with each update to the Catalog. Continued across the lifetime of the object.
// Protected by catalogLock_.
// TODO: Handle overflow of catalogVersion_ and nextTableId_.
private long catalogVersion_ = INITIAL_CATALOG_VERSION;
protected final AtomicInteger nextTableId_ = new AtomicInteger(0);
/**
* Initialize the CatalogServiceCatalog, loading all table metadata
@@ -103,7 +122,8 @@ public class CatalogServiceCatalog extends Catalog {
for (String tblName: db.getAllTableNames()) {
TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
Catalog.INITIAL_CATALOG_VERSION);
Table tbl = db.getTable(tblName);
Table tbl = db.getTableNoLoad(tblName);
if (tbl == null) {
LOG.error("Table: " + tblName + " was expected to be in the catalog " +
"cache. Skipping table for this update.");
@@ -146,12 +166,12 @@ public class CatalogServiceCatalog extends Catalog {
// By setting the catalog version to the latest catalog version at this point,
// it ensure impalads will always bump their versions, even in the case where
// an object has been dropped.
catalog.setCatalog_version(CatalogServiceCatalog.getCatalogVersion());
catalog.setCatalog_version(getCatalogVersion());
catalog.setCatalog(new TCatalog(catalogServiceId_));
resp.addToObjects(catalog);
// The max version is the max catalog version of all items in the update.
resp.setMax_catalog_version(CatalogServiceCatalog.getCatalogVersion());
resp.setMax_catalog_version(getCatalogVersion());
return resp;
} finally {
catalogLock_.readLock().unlock();
@@ -163,108 +183,128 @@ public class CatalogServiceCatalog extends Catalog {
* Functions are not returned in a defined order.
*/
public List<Function> getFunctions(String dbName) throws DatabaseNotFoundException {
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Contains map of overloaded function names to all functions matching that name.
HashMap<String, List<Function>> dbFns = db.getAllFunctions();
List<Function> fns = new ArrayList<Function>(dbFns.size());
for (List<Function> fnOverloads: dbFns.values()) {
for (Function fn: fnOverloads) {
fns.add(fn);
}
}
return fns;
} finally {
catalogLock_.readLock().unlock();
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Contains map of overloaded function names to all functions matching that name.
HashMap<String, List<Function>> dbFns = db.getAllFunctions();
List<Function> fns = new ArrayList<Function>(dbFns.size());
for (List<Function> fnOverloads: dbFns.values()) {
for (Function fn: fnOverloads) {
fns.add(fn);
}
}
return fns;
}
@Override
public long reset() {
public void reset() throws CatalogException {
catalogLock_.writeLock().lock();
try {
resetInternal();
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Executes the underlying reset logic. catalogLock_.writeLock() must
* be taken before calling this.
*/
private void resetInternal() throws CatalogException {
try {
nextTableId_.set(0);
if (initStrategy_ == CatalogInitStrategy.EMPTY) {
dbCache_.get().clear();
return;
}
// Since UDFs/UDAs are not persisted in the metastore, we won't clear
// them across reset. To do this, we store all the functions before
// clearing and restore them after.
// TODO: Everything about this. Persist them.
List<Pair<String, HashMap<String, List<Function>>>> functions =
Lists.newArrayList();
for (Db db: dbCache_.values()) {
for (Db db: dbCache_.get().values()) {
if (db.numFunctions() == 0) continue;
functions.add(Pair.create(db.getName(), db.getAllFunctions()));
}
// Reset the dbs.
resetInternal();
// Build a new DB cache, populate it, and replace the existing cache in one
// step.
ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
MetaStoreClient msClient = metaStoreClientPool_.getClient();
try {
for (String dbName: msClient.getHiveClient().getAllDatabases()) {
Db db = new Db(dbName, this);
db.setCatalogVersion(incrementAndGetCatalogVersion());
newDbCache.put(db.getName().toLowerCase(), db);
for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
Table incompleteTbl = IncompleteTable.createUninitializedTable(
getNextTableId(), db, tableName);
incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
db.addTable(incompleteTbl);
}
}
} finally {
msClient.release();
}
// Restore UDFs/UDAs.
for (Pair<String, HashMap<String, List<Function>>> dbFns: functions) {
Db db = null;
try {
db = dbCache_.get(dbFns.first);
db = newDbCache.get(dbFns.first);
} catch (Exception e) {
continue;
}
if (db == null) {
// DB no longer exists.
// TODO: We could restore this DB and then add the functions back.
// DB no longer exists - it was probably dropped externally.
// TODO: We could restore this DB and then add the functions back?
continue;
}
for (List<Function> fns: dbFns.second.values()) {
for (Function fn: fns) {
fn.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
fn.setCatalogVersion(incrementAndGetCatalogVersion());
db.addFunction(fn);
}
}
}
return getCatalogVersion();
} finally {
catalogLock_.writeLock().unlock();
dbCache_.set(newDbCache);
} catch (Exception e) {
LOG.error(e);
throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
}
}
/**
* Adds a database name to the metadata cache and returns the database's
* Thrift representation. Used by CREATE DATABASE statements.
* new Db object. Used by CREATE DATABASE statements.
*/
public TCatalogObject addDb(String dbName) throws ImpalaException {
TCatalogObject thriftDb = new TCatalogObject(TCatalogObjectType.DATABASE,
Catalog.INITIAL_CATALOG_VERSION);
catalogLock_.writeLock().lock();
try {
Db newDb = new Db(dbName, this);
newDb.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
addDb(newDb);
thriftDb.setCatalog_version(newDb.getCatalogVersion());
thriftDb.setDb(newDb.toThrift());
return thriftDb;
} finally {
catalogLock_.writeLock().unlock();
}
public Db addDb(String dbName) throws ImpalaException {
Db newDb = new Db(dbName, this);
newDb.setCatalogVersion(incrementAndGetCatalogVersion());
addDb(newDb);
return newDb;
}
/**
* Removes a database from the metadata cache. Used by DROP DATABASE statements.
* Removes a database from the metadata cache and returns the removed database,
* or null if the database did not exist in the cache.
* Used by DROP DATABASE statements.
*/
@Override
public Db removeDb(String dbName) {
catalogLock_.writeLock().lock();
try {
Db removedDb = super.removeDb(dbName);
if (removedDb != null) {
removedDb.setCatalogVersion(
CatalogServiceCatalog.incrementAndGetCatalogVersion());
}
return removedDb;
} finally {
catalogLock_.writeLock().unlock();
Db removedDb = super.removeDb(dbName);
if (removedDb != null) {
removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedDb;
}
/**
@@ -273,54 +313,38 @@ public class CatalogServiceCatalog extends Catalog {
* TODO: Should this add an IncompleteTable instead of loading the metadata?
*/
public Table addTable(String dbName, String tblName) throws TableNotFoundException {
Db db;
catalogLock_.writeLock().lock();
try {
db = getDb(dbName);
if (db == null) return null;
db.addTableName(tblName);
} finally {
catalogLock_.writeLock().unlock();
}
Db db = getDb(dbName);
if (db == null) return null;
Table incompleteTable =
IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName);
db.addTable(incompleteTable);
return db.getTable(tblName);
}
public Table removeTable(String dbName, String tblName)
throws DatabaseNotFoundException {
catalogLock_.writeLock().lock();
try {
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
Db parentDb = getDb(dbName);
if (parentDb == null) return null;
Table removedTable = parentDb.removeTable(tblName);
if (removedTable != null) {
removedTable.setCatalogVersion(
CatalogServiceCatalog.incrementAndGetCatalogVersion());
}
return removedTable;
} finally {
catalogLock_.writeLock().unlock();
Table removedTable = parentDb.removeTable(tblName);
if (removedTable != null) {
removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedTable;
}
/**
* Removes a function from the catalog. Increments the catalog version and returns
* the Function object that was removed if the function existed, otherwise returns
* null.
* the Function object that was removed. If the function did not exist, null will
* be returned.
*/
@Override
public Function removeFunction(Function desc) {
catalogLock_.writeLock().lock();
try {
Function removedFn = super.removeFunction(desc);
if (removedFn != null) {
removedFn.setCatalogVersion(
CatalogServiceCatalog.incrementAndGetCatalogVersion());
}
return removedFn;
} finally {
catalogLock_.writeLock().unlock();
Function removedFn = super.removeFunction(desc);
if (removedFn != null) {
removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
}
return removedFn;
}
/**
@@ -331,18 +355,13 @@ public class CatalogServiceCatalog extends Catalog {
*/
@Override
public boolean addFunction(Function fn) {
catalogLock_.writeLock().lock();
try {
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
if (db.addFunction(fn)) {
fn.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
return true;
}
return false;
} finally {
catalogLock_.writeLock().unlock();
Db db = getDb(fn.getFunctionName().getDb());
if (db == null) return false;
if (db.addFunction(fn)) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
return true;
}
return false;
}
/**
@@ -367,16 +386,11 @@ public class CatalogServiceCatalog extends Catalog {
* (outside of Impala) modifications to the table.
*/
public void updateLastDdlTime(TTableName tblName, long ddlTime) {
catalogLock_.writeLock().lock();
try {
Db db = getDb(tblName.getDb_name());
if (db == null) return;
Table tbl = db.getTable(tblName.getTable_name());
if (tbl == null) return;
tbl.updateLastDdlTime(ddlTime);
} finally {
catalogLock_.writeLock().unlock();
}
Db db = getDb(tblName.getDb_name());
if (db == null) return;
Table tbl = db.getTable(tblName.getTable_name());
if (tbl == null) return;
tbl.updateLastDdlTime(ddlTime);
}
/**
@@ -418,87 +432,51 @@ public class CatalogServiceCatalog extends Catalog {
Table table = getTable(tableName.getDb_name(), tableName.getTable_name());
if (table == null) return null;
LOG.debug("Refreshing table metadata: " + table.getFullName());
table.getDb().reloadTable(table.getName());
return getTable(tableName.getDb_name(), tableName.getTable_name());
return table.getDb().reloadTable(table.getName());
} else {
catalogLock_.writeLock().lock();
try {
Table existingTable = getTable(tableName.getDb_name(),
tableName.getTable_name());
if (existingTable == null) return null;
LOG.debug("Invalidating table metadata: " + existingTable.getFullName());
// Instead of calling invalidate() on the cache entry, which would mean the next
// access would trigger a metadata load, the existing table is replaced with
// an IncompleteTable. The IncompleteTable will be sent to all impalads in the
// cluster, triggering an invalidation on each node.
Table newTable = IncompleteTable.createUninitializedTable(getNextTableId(),
existingTable.getDb(), existingTable.getName());
newTable.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
existingTable.getDb().addTable(newTable);
return newTable;
} finally {
catalogLock_.writeLock().unlock();
}
}
}
Table existingTable = getTable(tableName.getDb_name(),
tableName.getTable_name());
if (existingTable == null) return null;
LOG.debug("Invalidating table metadata: " + existingTable.getFullName());
/**
* Gets a table from the catalog. This method will load any uninitialized
* IncompleteTables into the table cache. Returns the Table object or null if no
* table existed in the cache with this name.
*/
public Table getOrReloadTable(String dbName, String tblName)
throws DatabaseNotFoundException {
Table tbl = getTable(dbName, tblName);
if (tbl == null) return null;
// Check if this table needs to have its metadata loaded (is an uninitialized
// IncompleteTable). To avoid having the same table reloaded many times for
// concurrent requests to the same object, synchronize on the Table and then
// check if it is still incomplete while holding the lock. This allows concurrent
// loads for different tables while still protecting against a flood of reloads
// of the same table.
synchronized (tbl) {
// Read the table again while holding the lock.
Table reReadTbl = getTable(dbName, tblName);
if (reReadTbl == null) return null;
if (reReadTbl instanceof IncompleteTable
&& ((IncompleteTable) reReadTbl).isUninitialized()) {
// Perform the reload
return reReadTbl.getDb().reloadTable(tblName);
} else {
return reReadTbl;
}
// The existing table is replaced with an uninitialized IncompleteTable, which
// effectively invalidates the existing cached value. The IncompleteTable
// will then be sent to all impalads in the cluster, triggering an invalidation on
// each node.
Table newTable = IncompleteTable.createUninitializedTable(getNextTableId(),
existingTable.getDb(), existingTable.getName());
newTable.setCatalogVersion(incrementAndGetCatalogVersion());
existingTable.getDb().addTable(newTable);
return newTable;
}
}
/**
* See comment in Catalog.java.
* This method will load any uninitialized IncompleteTables into the table cache.
*/
@Override
public TCatalogObject getTCatalogObject(TCatalogObject objectDesc)
throws CatalogException {
if (objectDesc.isSetTable()) {
Table tbl = getOrReloadTable(objectDesc.getTable().getDb_name(),
objectDesc.getTable().getTbl_name());
if (tbl == null) {
throw new TableNotFoundException("Table not found: " +
objectDesc.getTable().getTbl_name());
}
}
return super.getTCatalogObject(objectDesc);
}
/**
* Increments the current Catalog version and returns the new value.
*/
public static long incrementAndGetCatalogVersion() {
return catalogVersion_.incrementAndGet();
public long incrementAndGetCatalogVersion() {
catalogLock_.writeLock().lock();
try {
return ++catalogVersion_;
} finally {
catalogLock_.writeLock().unlock();
}
}
/**
* Returns the current Catalog version.
*/
public static long getCatalogVersion() { return catalogVersion_.get(); }
public long getCatalogVersion() {
catalogLock_.readLock().lock();
try {
return catalogVersion_;
} finally {
catalogLock_.readLock().unlock();
}
}
/**
* Gets the next table ID and increments the table ID counter.
*/
public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); }
}

View File

@@ -50,7 +50,8 @@ public class Db implements CatalogObject {
// All of the registered user functions. The key is the user facing name (e.g. "myUdf"),
// and the values are all the overloaded variants (e.g. myUdf(double), myUdf(string))
// This includes both UDFs and UDAs
// This includes both UDFs and UDAs. Updates are made thread safe by synchronizing
// on this map.
private final HashMap<String, List<Function>> functions_;
public Db(String name, Catalog catalog) {
@@ -73,6 +74,16 @@ public class Db implements CatalogObject {
return TCatalogObjectType.DATABASE;
}
/**
* Adds a table to the table cache.
*/
public void addTable(Table table) {
tableCache_.add(table);
}
/**
* Gets all table names in the table cache.
*/
public List<String> getAllTableNames() {
return tableCache_.getAllNames();
}
@@ -85,25 +96,17 @@ public class Db implements CatalogObject {
* Returns the Table with the given name if present in the table cache or loads the
* table if it does not already exist in the cache. Returns null if the table does not
* exist in the cache or if there was an error loading the table metadata.
* TODO: Should we bubble this exception up?
*/
public Table getTable(String tblName) {
return tableCache_.getOrLoad(tblName);
}
/**
* Adds a table to the table cache.
* Gets a table from the table cache, but does not perform a metadata load if the
* table is uninitialized.
*/
public void addTable(Table table) {
tableCache_.add(table);
}
/**
* Adds a given table name to the table cache. The next call to refreshTable() or
* getOrLoadTable() will trigger a metadata load.
*/
public void addTableName(String tableName) {
tableCache_.addName(tableName);
public Table getTableNoLoad(String tblName) {
return tableCache_.get(tblName);
}
/**
@@ -113,14 +116,6 @@ public class Db implements CatalogObject {
return tableCache_.reload(tableName);
}
/**
* Invalidates the table's metadata, forcing a reload on the next access. Does
* not remove the table from table cache's name set.
*/
public void invalidateTable(String tableName) {
tableCache_.invalidate(tableName);
}
/**
* Removes the table name and any cached metadata from the Table cache.
*/
@@ -263,4 +258,7 @@ public class Db implements CatalogObject {
@Override
public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
public Catalog getParentCatalog() { return parentCatalog_; }
@Override
public boolean isLoaded() { return true; }
}

View File

@@ -267,4 +267,7 @@ public class Function implements CatalogObject {
function.setHasVarArgs(fn.isHas_var_args());
return function;
}
@Override
public boolean isLoaded() { return true; }
}

View File

@@ -53,17 +53,18 @@ import com.google.common.base.Preconditions;
* Thread safe Catalog for an Impalad. The Impalad Catalog provides an interface to
* access Catalog objects that this Impalad knows about and authorizes access requests
* to these objects. It also manages reading and updating the authorization policy file
* from HDFS.
* from HDFS. The Impalad Catalog provides APIs for checking whether a user is authorized
* to access a particular catalog object. Any catalog access that requires privilege
* checks should go through this class.
* TODO: The CatalogService should also handle updating and disseminating the
* authorization policy.
* The Impalad catalog can be updated either via a StateStore heartbeat or by directly
* applying the result of a catalog operation to the CatalogCache. Updates applied using
* the updateCatalog() function which takes the catalogLock_.writeLock() for the duration
* of its execution to ensure all updates are applied atomically.
* applying the result of a catalog operation to the CatalogCache. All updates are
* applied using the updateCatalog() function.
* Table metadata is loaded lazily. The CatalogServer initially broadcasts (via the
* statestore) the known table names (as IncompleteTables). These table names are added
* to the Impalad catalog cache and when one of the tables is accessed, the impalad will
* make an RPC to the CatalogServer to load the complete table metadata.
* make an RPC to the CatalogServer to request loading the complete table metadata.
* In both cases, we need to ensure that work from one update is not "undone" by another
* update. To handle this the ImpaladCatalog does the following:
* - Tracks the overall catalog version last received in a state store heartbeat, this
@@ -78,17 +79,23 @@ import com.google.common.base.Preconditions;
* - Before dropping any catalog object, see if the object already exists in the catalog
* cache. If it does, only drop the object if the version of the drop is > that
* object's catalog version.
* Additionally, the Impalad Catalog provides interfaces for checking whether
* a user is authorized to access a particular object. Any catalog access that requires
* privilege checks should go through this class.
* The CatalogServiceId is also tracked to detect if a different instance of the catalog
* service has been started, in which case a full topic update is required.
*/
public class ImpaladCatalog extends Catalog {
private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class);
private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
// The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer
// has restarted.
private TUniqueId catalogServiceId_ = INITIAL_CATALOG_SERVICE_ID;
// The catalog version received in the last StateStore heartbeat. It is guaranteed
// all objects in the catalog have at a minimum, this version. Because updates may
// be applied out of band of a StateStore heartbeat, it is possible the catalog
// contains some objects > than this version.
private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
//TODO: Make the reload interval configurable.
private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
@@ -98,13 +105,9 @@ public class ImpaladCatalog extends Catalog {
// Lock used to synchronize refreshing the AuthorizationChecker.
private final ReentrantReadWriteLock authzCheckerLock_ = new ReentrantReadWriteLock();
private AuthorizationChecker authzChecker_;
// Flag to determine if the Catalog is ready to accept user requests. See isReady().
private final AtomicBoolean isReady_ = new AtomicBoolean(false);
// The catalog version received in the last StateStore heartbeat. It is guaranteed
// all objects in the catalog have at a minimum, this version. Because updates may
// be applied out of band of a StateStore heartbeat, it is possible the catalog
// contains some objects > than this version.
private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
// Tracks modifications to this Impalad's catalog from direct updates to the cache.
private final CatalogDeltaLog catalogDeltaLog_ = new CatalogDeltaLog();
@@ -138,13 +141,38 @@ public class ImpaladCatalog extends Catalog {
// loaded (from the catalog server) on the next access.
// TODO: Clean up how we bootstrap the FE tests.
if (loadStrategy != CatalogInitStrategy.EMPTY) {
try {
reset();
} catch (CatalogException e) {
// Re-throw as an illegal state exception.
throw new IllegalStateException(e);
}
isReady_.set(true);
for (String dbName: getDbNames(null)) {
Db db = getDb(dbName);
for (String tblName: db.getAllTableNames()) {
db.invalidateTable(tblName);
}
}
/**
* Used only for testing. Implementation of reset() that allows for bootstrapping the
* Impalad catalog without a running statestore daemon.
*/
@Override
public void reset() throws CatalogException {
MetaStoreClient msClient = metaStoreClientPool_.getClient();
try {
for (String dbName: msClient.getHiveClient().getAllDatabases()) {
Db db = new Db(dbName, this);
dbCache_.get().put(db.getName().toLowerCase(), db);
for (String tableName: msClient.getHiveClient().getAllTables(dbName)) {
Table incompleteTbl = IncompleteTable.createUninitializedTable(
TableId.createInvalidId(), db, tableName);
db.addTable(incompleteTbl);
}
}
} catch (Exception e) {
throw new CatalogException(e.getMessage(), e);
} finally {
msClient.release();
}
}
@@ -210,55 +238,50 @@ public class ImpaladCatalog extends Catalog {
*
* This method is called once per statestore heartbeat and is guaranteed the same
* object will not be in both the "updated" list and the "removed" list (it is
* a detail handled by the statestore). This method takes the catalogLock_ writeLock
* for the duration of the method to ensure all updates are applied atomically. Since
* updates are sent from the statestore as deltas, this should generally not block
* execution for a significant amount of time.
* a detail handled by the statestore).
* Catalog updates are ordered by the object type with the dependent objects coming
* first. That is, database "foo" will always come before table "foo.bar".
* Synchronized because updateCatalog() can be called by during a statestore update or
* during a direct-DDL operation and catalogServiceId_ and lastSyncedCatalogVersion_
* must be protected.
*/
public TUpdateCatalogCacheResponse updateCatalog(
TUpdateCatalogCacheRequest req) throws CatalogException {
catalogLock_.writeLock().lock();
try {
// Check for changes in the catalog service ID.
if (!catalogServiceId_.equals(req.getCatalog_service_id())) {
boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
catalogServiceId_ = req.getCatalog_service_id();
if (!firstRun) {
// Throw an exception which will trigger a full topic update request.
throw new CatalogException("Detected catalog service ID change. Aborting " +
"updateCatalog()");
}
public synchronized TUpdateCatalogCacheResponse updateCatalog(
TUpdateCatalogCacheRequest req) throws CatalogException {
// Check for changes in the catalog service ID.
if (!catalogServiceId_.equals(req.getCatalog_service_id())) {
boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID);
catalogServiceId_ = req.getCatalog_service_id();
if (!firstRun) {
// Throw an exception which will trigger a full topic update request.
throw new CatalogException("Detected catalog service ID change. Aborting " +
"updateCatalog()");
}
// First process all updates
long newCatalogVersion = lastSyncedCatalogVersion_;
for (TCatalogObject catalogObject: req.getUpdated_objects()) {
if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
newCatalogVersion = catalogObject.getCatalog_version();
} else {
try {
addCatalogObject(catalogObject);
} catch (Exception e) {
LOG.error("Error adding catalog object: " + e.getMessage(), e);
}
}
}
// Now remove all objects from the catalog. Removing a database before removing
// its child tables/functions is fine. If that happens, the removal of the child
// object will be a no-op.
for (TCatalogObject catalogObject: req.getRemoved_objects()) {
removeCatalogObject(catalogObject, newCatalogVersion);
}
lastSyncedCatalogVersion_ = newCatalogVersion;
// Cleanup old entries in the log.
catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
isReady_.set(true);
} finally {
catalogLock_.writeLock().unlock();
}
// First process all updates
long newCatalogVersion = lastSyncedCatalogVersion_;
for (TCatalogObject catalogObject: req.getUpdated_objects()) {
if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
newCatalogVersion = catalogObject.getCatalog_version();
} else {
try {
addCatalogObject(catalogObject);
} catch (Exception e) {
LOG.error("Error adding catalog object: " + e.getMessage(), e);
}
}
}
// Now remove all objects from the catalog. Removing a database before removing
// its child tables/functions is fine. If that happens, the removal of the child
// object will be a no-op.
for (TCatalogObject catalogObject: req.getRemoved_objects()) {
removeCatalogObject(catalogObject, newCatalogVersion);
}
lastSyncedCatalogVersion_ = newCatalogVersion;
// Cleanup old entries in the log.
catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
isReady_.set(true);
return new TUpdateCatalogCacheResponse(catalogServiceId_);
}
@@ -318,16 +341,11 @@ public class ImpaladCatalog extends Catalog {
checkAccess(user, new PrivilegeRequestBuilder()
.allOf(privilege).onTable(dbName, tableName).toRequest());
catalogLock_.readLock().lock();
try {
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database not found: " + dbName);
}
return db.containsTable(tableName);
} finally {
catalogLock_.readLock().unlock();
Db db = getDb(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database not found: " + dbName);
}
return db.containsTable(tableName);
}
/**
@@ -343,8 +361,8 @@ public class ImpaladCatalog extends Catalog {
Table table = getTable(dbName, tableName);
if (table instanceof IncompleteTable) {
// We should never get back an uninitialized IncompleteTable.
Preconditions.checkState(!((IncompleteTable) table).isUninitialized());
// We should never get back an IncompleteTable that is uninitialized.
Preconditions.checkState(table.isLoaded());
// If there were problems loading this table's metadata, throw an exception
// when it is accessed.
@@ -534,15 +552,7 @@ public class ImpaladCatalog extends Catalog {
Table newTable = Table.fromThrift(db, thriftTable);
newTable.setCatalogVersion(catalogVersion);
// If this is an uninitialized table, don't just add the table name to
// the metadata cache. The next access will trigger a metadata load.
if (newTable instanceof IncompleteTable
&& ((IncompleteTable) newTable).isUninitialized()) {
db.addTableName(newTable.getName());
} else {
db.addTable(newTable);
}
db.addTable(newTable);
}
private void addFunction(TFunction fn, long catalogVersion) {

View File

@@ -25,7 +25,6 @@ import com.cloudera.impala.thrift.TStatus;
import com.cloudera.impala.thrift.TStatusCode;
import com.cloudera.impala.thrift.TTable;
import com.cloudera.impala.thrift.TTableDescriptor;
import com.cloudera.impala.thrift.TTableType;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@@ -55,7 +54,8 @@ public class IncompleteTable extends Table {
/**
* See comment on cause_.
*/
public boolean isUninitialized() { return cause_ == null; }
@Override
public boolean isLoaded() { return cause_ != null; }
@Override
public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
@@ -82,7 +82,6 @@ public class IncompleteTable extends Table {
public TTable toThrift() {
TTable table = new TTable(db_.getName(), name_);
table.setId(id_.asInt());
table.setTable_type(TTableType.INCOMPLETE_TABLE);
if (cause_ != null) {
table.setLoad_status(new TStatus(TStatusCode.INTERNAL_ERROR,
Lists.newArrayList(JniUtil.throwableToString(cause_),

View File

@@ -28,8 +28,6 @@ import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.log4j.Logger;
import com.cloudera.impala.analysis.ColumnType;
import com.cloudera.impala.common.JniUtil;
import com.cloudera.impala.thrift.TAccessLevel;
import com.cloudera.impala.thrift.TCatalogObject;
import com.cloudera.impala.thrift.TCatalogObjectType;
@@ -37,7 +35,6 @@ import com.cloudera.impala.thrift.TColumn;
import com.cloudera.impala.thrift.TTable;
import com.cloudera.impala.thrift.TTableDescriptor;
import com.cloudera.impala.thrift.TTableStats;
import com.cloudera.impala.thrift.TTableType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -209,8 +206,7 @@ public abstract class Table implements CatalogObject {
public static Table fromThrift(Db parentDb, TTable thriftTable)
throws TableLoadingException {
Table newTable;
if (thriftTable.getTable_type() != TTableType.INCOMPLETE_TABLE &&
thriftTable.isSetMetastore_table()) {
if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) {
newTable = Table.fromMetastoreTable(new TableId(thriftTable.getId()),
parentDb, thriftTable.getMetastore_table());
} else {
@@ -375,4 +371,7 @@ public abstract class Table implements CatalogObject {
public void setCatalogVersion(long catalogVersion) {
catalogVersion_ = catalogVersion;
}
@Override
public boolean isLoaded() { return true; }
}

View File

@@ -18,18 +18,16 @@ import java.util.EnumSet;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import com.cloudera.impala.common.ImpalaException;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.service.FeSupport;
import com.cloudera.impala.thrift.TCatalogObject;
import com.cloudera.impala.thrift.TCatalogObjectType;
import com.cloudera.impala.thrift.TTable;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
/**
* A cache loader that defines how table metadata should be loaded. Impalads
@@ -37,6 +35,8 @@ import com.google.common.util.concurrent.SettableFuture;
* loads its table metadata by contacting the Hive Metastore / HDFS / etc.
*/
public abstract class TableLoader extends CacheLoader<String, Table> {
private final static Logger LOG = LoggerFactory.getLogger(TableLoader.class);
// Parent database of this table.
protected final Db db_;
@@ -59,41 +59,6 @@ public abstract class TableLoader extends CacheLoader<String, Table> {
parentDb.getParentCatalog().getClass().getName());
}
/**
* Load the given table name.
*/
@Override
public Table load(String tblName) throws Exception {
return loadTable(tblName, null);
}
/**
* Computes a replacement value for the Table based on an already-cached value.
*
* Returns (as a ListenableFuture) the future new value of the table.
* The returned Future should not be null. Using a Future allows for a
* synchronous or asynchronous implementation or reload().
*/
@Override
public ListenableFuture<Table> reload(String tableName, Table cachedValue)
throws ImpalaException {
SettableFuture<Table> newValue = SettableFuture.create();
Table table = loadTable(tableName, cachedValue);
newValue.set(table);
return newValue;
}
/**
* Implementation for how a table is loaded. Generally this is the
* only method that needs to be implemented.
* @param tblName - The name of the table.
* @param cachedEntry - An existing cached table that can be reused to speed up
* the loading process for a "reload" operation.
*/
protected abstract Table loadTable(String tableName, Table cachedValue)
throws TableNotFoundException;
/**
* TableLoader that loads table metadata from the Hive Metastore. Used by
* the CatalogServer and updates the object's catalog version on successful
@@ -116,18 +81,21 @@ public abstract class TableLoader extends CacheLoader<String, Table> {
/**
* Creates the Impala representation of Hive/HBase metadata for one table.
* Calls load() on the appropriate instance of Table subclass.
* oldCacheEntry is the existing cache entry and might still contain valid info to
* help speed up metadata loading. oldCacheEntry is null if there is no existing
* cachedValue is the existing cache entry and might still contain valid info to
* help speed up metadata loading. cachedValue is null if there is no existing
* cache entry (i.e. during fresh load).
* @return new instance of HdfsTable or HBaseTable
* null if the table does not exist
* @throws TableLoadingException if there was an error loading the table.
* @throws TableNotFoundException if the table was not found
* The catalogVersion parameter specifies what version will be assigned
* to the newly loaded object.
* Returns new instance of Table, or null if the table does not exist. If there
* were any errors loading the table metadata an IncompleteTable will be returned
* that contains details on the error.
*/
@Override
protected Table loadTable(String tblName, Table cacheEntry)
throws TableNotFoundException {
Catalog catalog = db_.getParentCatalog();
public Table load(String tblName, Table cachedValue, long catalogVersion) {
String fullTblName = db_.getName() + "." + tblName;
LOG.info("Loading metadata for: " + fullTblName);
Preconditions.checkState(db_.getParentCatalog() instanceof CatalogServiceCatalog);
CatalogServiceCatalog catalog = (CatalogServiceCatalog) db_.getParentCatalog();
MetaStoreClient msClient = catalog.getMetaStoreClient();
Table table;
// turn all exceptions into TableLoadingException
@@ -141,22 +109,26 @@ public abstract class TableLoader extends CacheLoader<String, Table> {
TableType tableType = TableType.valueOf(msTbl.getTableType());
if (!SUPPORTED_TABLE_TYPES.contains(tableType)) {
throw new TableLoadingException(String.format(
"Unsupported table type '%s' for: %s.%s",
tableType, db_.getName(), tblName));
"Unsupported table type '%s' for: %s", tableType, fullTblName));
}
// Create a table of appropriate type and have it load itself
table = Table.fromMetastoreTable(catalog.getNextTableId(), db_, msTbl);
if (table == null) {
throw new TableLoadingException(
"Unrecognized table type for table: " + msTbl.getTableName());
"Unrecognized table type for table: " + fullTblName);
}
table.load(cacheEntry, msClient.getHiveClient(), msTbl);
table.load(cachedValue, msClient.getHiveClient(), msTbl);
} catch (TableLoadingException e) {
table = IncompleteTable.createFailedMetadataLoadTable(
catalog.getNextTableId(), db_, tblName, e);
} catch (NoSuchObjectException e) {
throw new TableNotFoundException("Table not found: " + tblName, e);
TableLoadingException tableDoesNotExist = new TableLoadingException(
"Table " + fullTblName + " no longer exists in the Hive MetaStore. " +
"Run 'invalidate metadata " + fullTblName + "' to update the Impala " +
"catalog.");
table = IncompleteTable.createFailedMetadataLoadTable(
catalog.getNextTableId(), db_, tblName, tableDoesNotExist);
} catch (Exception e) {
table = IncompleteTable.createFailedMetadataLoadTable(
catalog.getNextTableId(), db_, tblName, new TableLoadingException(
@@ -165,32 +137,31 @@ public abstract class TableLoader extends CacheLoader<String, Table> {
msClient.release();
}
// Set the new catalog version for the table and return it.
table.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion());
table.setCatalogVersion(catalogVersion);
return table;
}
@Override
public long getNextCatalogVersion() {
CatalogServiceCatalog catalog = (CatalogServiceCatalog) db_.getParentCatalog();
return catalog.incrementAndGetCatalogVersion();
}
}
/**
* A TableLoader that loads metadata from the CatalogServer.
*/
private static class CatalogServiceTableLoader extends TableLoader {
private static final String FAILED_TBL_LOAD_MSG =
"Unexpected error loading table metadata. Please run 'invalidate metadata %s'";
public CatalogServiceTableLoader(Db db) {
super(db);
}
@Override
public ListenableFuture<Table> reload(String tableName, Table cachedValue)
throws ImpalaException {
// To protect against a concurrency issue between add(CatalogObject) and
// reload(), reload() is not supported on a CatalogServiceTableLoader. See comment
// in the CatalogObjectCache for more details.
throw new IllegalStateException("Calling reload() on a CatalogServiceTableLoader" +
" is not supported.");
}
@Override
protected Table loadTable(String tblName, Table cacheEntry)
throws TableNotFoundException {
public Table load(String tblName, Table cachedValue, long catalogVersion) {
LOG.info("Loading metadata for: " + db_.getName() + "." + tblName);
TCatalogObject objectDesc = new TCatalogObject();
objectDesc.setType(TCatalogObjectType.TABLE);
objectDesc.setTable(new TTable());
@@ -204,20 +175,36 @@ public abstract class TableLoader extends CacheLoader<String, Table> {
TableId.createInvalidId(), db_, tblName, e);
}
Table newTable;
if (!catalogObject.isSetTable()) {
throw new TableNotFoundException(
String.format("Table not found: %s.%s", db_.getName(), tblName));
newTable = IncompleteTable.createFailedMetadataLoadTable(
TableId.createInvalidId(), db_, tblName,
new TableLoadingException(FAILED_TBL_LOAD_MSG));
}
Table newTable;
try {
newTable = Table.fromThrift(db_, catalogObject.getTable());
} catch (TableLoadingException e) {
newTable = IncompleteTable.createFailedMetadataLoadTable(
TableId.createInvalidId(), db_, tblName, e);
}
if (newTable == null || !newTable.isLoaded()) {
newTable = IncompleteTable.createFailedMetadataLoadTable(
TableId.createInvalidId(), db_, tblName,
new TableLoadingException(FAILED_TBL_LOAD_MSG));
}
newTable.setCatalogVersion(catalogObject.getCatalog_version());
return newTable;
}
@Override
public long getNextCatalogVersion() {
// Tables should already have catalog versions assigned catalog
// by the CatalogServer. Always return zero here.
return 0;
}
}
}
}

View File

@@ -323,8 +323,7 @@ public class DdlExecutor {
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
LOG.info(String.format("Updating table stats for %s", tableName));
Table table = catalog_.getOrReloadTable(tableName.getDb(),
tableName.getTbl());
Table table = catalog_.getTable(tableName.getDb(), tableName.getTbl());
// Deep copy the msTbl to avoid updating our cache before successfully persisting
// the results to the metastore.
org.apache.hadoop.hive.metastore.api.Table msTbl =
@@ -501,7 +500,7 @@ public class DdlExecutor {
if (params.if_not_exists && catalog_.getDb(dbName) != null) {
LOG.debug("Skipping database creation because " + dbName + " already exists and " +
"IF NOT EXISTS was specified.");
resp.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion());
resp.getResult().setVersion(catalog_.getCatalogVersion());
return;
}
org.apache.hadoop.hive.metastore.api.Database db =
@@ -527,7 +526,13 @@ public class DdlExecutor {
} finally {
msClient.release();
}
resp.result.setUpdated_catalog_object(catalog_.addDb(dbName));
Db newDb = catalog_.addDb(dbName);
TCatalogObject thriftDb = new TCatalogObject(TCatalogObjectType.DATABASE,
Catalog.INITIAL_CATALOG_VERSION);
thriftDb.setDb(newDb.toThrift());
thriftDb.setCatalog_version(newDb.getCatalogVersion());
resp.result.setUpdated_catalog_object(thriftDb);
}
resp.result.setVersion(resp.result.getUpdated_catalog_object().getCatalog_version());
}
@@ -582,7 +587,7 @@ public class DdlExecutor {
// If no db was removed as part of this operation just return the current catalog
// version.
if (removedDb == null) {
removedObject.setCatalog_version(CatalogServiceCatalog.getCatalogVersion());
removedObject.setCatalog_version(catalog_.getCatalogVersion());
} else {
removedObject.setCatalog_version(removedDb.getCatalogVersion());
}
@@ -619,7 +624,7 @@ public class DdlExecutor {
if (table != null) {
resp.result.setVersion(table.getCatalogVersion());
} else {
resp.result.setVersion(CatalogServiceCatalog.getCatalogVersion());
resp.result.setVersion(catalog_.getCatalogVersion());
}
}
removedObject.setType(TCatalogObjectType.TABLE);
@@ -647,7 +652,7 @@ public class DdlExecutor {
}
// The user specified IF NOT EXISTS and the function didn't exist, just
// return the current catalog version.
resp.result.setVersion(CatalogServiceCatalog.getCatalogVersion());
resp.result.setVersion(catalog_.getCatalogVersion());
} else {
TCatalogObject removedObject = new TCatalogObject();
removedObject.setType(TCatalogObjectType.FUNCTION);
@@ -678,7 +683,7 @@ public class DdlExecutor {
catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
LOG.debug(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tableName));
response.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion());
response.getResult().setVersion(catalog_.getCatalogVersion());
return false;
}
org.apache.hadoop.hive.metastore.api.Table tbl =
@@ -738,10 +743,10 @@ public class DdlExecutor {
catalog_.containsTable(tblName.getDb(), tblName.getTbl())) {
LOG.debug(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tblName));
response.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion());
response.getResult().setVersion(catalog_.getCatalogVersion());
return;
}
Table srcTable = catalog_.getOrReloadTable(srcTblName.getDb(), srcTblName.getTbl());
Table srcTable = catalog_.getTable(srcTblName.getDb(), srcTblName.getTbl());
org.apache.hadoop.hive.metastore.api.Table tbl =
srcTable.getMetaStoreTable().deepCopy();
tbl.setDbName(tblName.getDb());
@@ -1216,7 +1221,7 @@ public class DdlExecutor {
TableName tableName) throws DatabaseNotFoundException, TableNotFoundException,
TableLoadingException {
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
return catalog_.getOrReloadTable(tableName.getDb(), tableName.getTbl())
return catalog_.getTable(tableName.getDb(), tableName.getTbl())
.getMetaStoreTable().deepCopy();
}
@@ -1390,7 +1395,7 @@ public class DdlExecutor {
throws ImpalaException {
TUpdateCatalogResponse response = new TUpdateCatalogResponse();
// Only update metastore for Hdfs tables.
Table table = catalog_.getOrReloadTable(update.getDb_name(),
Table table = catalog_.getTable(update.getDb_name(),
update.getTarget_table());
if (!(table instanceof HdfsTable)) {
throw new InternalException("Unexpected table type: " +

View File

@@ -24,6 +24,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.impala.catalog.CatalogException;
import com.cloudera.impala.catalog.CatalogServiceCatalog;
import com.cloudera.impala.catalog.Function;
import com.cloudera.impala.catalog.Table;
@@ -77,6 +78,11 @@ public class JniCatalog {
GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
TLogLevel.values()[otherLogLevel]);
catalog_ = new CatalogServiceCatalog(getServiceId());
try {
catalog_.reset();
} catch (CatalogException e) {
LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e);
}
ddlExecutor_ = new DdlExecutor(catalog_);
}
@@ -127,7 +133,8 @@ public class JniCatalog {
} else {
// Invalidate the catalog if no table name is provided.
Preconditions.checkArgument(!req.isIs_refresh());
resp.result.setVersion(catalog_.reset());
catalog_.reset();
resp.result.setVersion(catalog_.getCatalogVersion());
}
resp.getResult().setStatus(
new TStatus(TStatusCode.OK, new ArrayList<String>()));

View File

@@ -0,0 +1,24 @@
====
---- QUERY : STRESS-Q1
select count(*) from (select * from lineitem limit 10) p
---- RESULTS
10
---- TYPES
bigint
====
---- QUERY : STRESS-INVALIDATE_METADATA
invalidate metadata
====
---- QUERY : STRESS-INVALIDATE_TABLE
invalidate metadata lineitem
====
---- QUERY : REFRESH-TABLE
refresh lineitem
====
---- QUERY : STRESS-Q2
select count(*) from (select * from lineitem limit 10) p
---- RESULTS
10
---- TYPES
bigint
====

View File

@@ -1,10 +1,8 @@
====
---- QUERY : STRESS-Q1
select count(*) from (select * from lineitem limit 10) p
# TODO: The second limit should not be needed, but getting wrong results due to product
# bug IMPALA-20. Once that is fixed the second limit can be removed.
#---- RESULTS
#10
---- RESULTS
10
---- TYPES
bigint
====

View File

@@ -163,6 +163,11 @@ class ImpaladService(BaseImpalaService):
client.connect()
return client
def get_catalog_object_dump(self, object_type, object_name):
return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
(object_type, object_name))
# Allows for interacting with the StateStore service to perform operations such as
# accessing the debug webpage.
class StateStoredService(BaseImpalaService):
@@ -180,3 +185,7 @@ class CatalogdService(BaseImpalaService):
def __init__(self, hostname, webserver_port, service_port):
super(CatalogdService, self).__init__(hostname, webserver_port)
self.service_port = service_port
def get_catalog_object_dump(self, object_type, object_name):
return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
(object_type, object_name))

View File

@@ -2,7 +2,9 @@
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
import pytest
import re
from time import sleep
from tests.common.impala_cluster import ImpalaCluster
from tests.common.test_vector import TestDimension
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.test_file_parser import QueryTestSectionReader
@@ -30,3 +32,39 @@ class TestMiniStress(ImpalaTestSuite):
def test_mini_stress(self, vector):
for i in xrange(NUM_ITERATIONS):
self.run_test_case('stress', vector)
@pytest.mark.stress
def test_run_invalidate_refresh(self, vector):
"""Verifies that running concurrent invalidate table/catalog and refresh commands
don't cause failures with other running workloads and ensures catalog versions
are strictly increasing."""
target_db = self.execute_scalar('select current_database()', vector=vector)
impala_cluster = ImpalaCluster()
impalad = impala_cluster.impalads[0].service
catalogd = impala_cluster.catalogd.service
for i in xrange(NUM_ITERATIONS):
# Get the catalog versions for the table before running the workload
before_versions = dict()
before_versions['catalogd'] =\
self.get_table_version(catalogd, target_db, 'lineitem')
before_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem')
self.run_test_case('stress-with-invalidate-refresh', vector)
# Get the catalog versions for the table after running the workload
after_versions = dict()
after_versions['catalogd'] = self.get_table_version(catalogd, target_db, 'lineitem')
after_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem')
# Catalog versions should be strictly increasing
assert before_versions['impalad'] < after_versions['impalad']
assert before_versions['catalogd'] < after_versions['catalogd']
def get_table_version(self, impala_service, db_name, tbl_name):
"""Gets the given table's catalog version using the given impalad/catalogd service"""
obj_dump = impala_service.get_catalog_object_dump('table', db_name + '.' + tbl_name)
result = re.search(r'catalog_version \(i64\) = (\d+)', obj_dump)
assert result, 'Unable to find catalog version in object dump: ' + obj_dump
catalog_version = result.group(1)
return int(catalog_version)