diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index f0f1d4feb..50ddee0a0 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -38,10 +38,6 @@ enum TTableType { HDFS_TABLE, HBASE_TABLE, VIEW, - // A table that does not contain all needed metadata. This can be either because - // of an error loading the metadata or because the table metadata has not yet - // been loaded. - INCOMPLETE_TABLE } enum THdfsFileFormat { diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CacheLoader.java b/fe/src/main/java/com/cloudera/impala/catalog/CacheLoader.java new file mode 100644 index 000000000..b24eb75a7 --- /dev/null +++ b/fe/src/main/java/com/cloudera/impala/catalog/CacheLoader.java @@ -0,0 +1,33 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.cloudera.impala.catalog; + +/** + * Interface that defines how new CatalogObjects are loaded into a cache. + */ +public abstract class CacheLoader { + /** + * Loads an element into the cache for the given key name. The cachedValue + * parameter is an existing cached entry that can be reused to help speed up + * value loading. If cachedValue is null it will be ignored. + */ + public abstract V load(K key, V cachedValue, long catalogVersion); + + /** + * Returns the next catalog version. This value is generally passed to + * load() so newly loaded items can get assigned a catalog version. + */ + public abstract long getNextCatalogVersion(); +} diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java index e9811bb4e..100d9efc2 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java @@ -16,10 +16,9 @@ package com.cloudera.impala.catalog; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -37,11 +36,16 @@ import com.google.common.collect.Lists; /** * Thread safe interface for reading and updating metadata stored in the Hive MetaStore. - * This class caches db-, table- and column-related metadata. Although this class is + * This class provides a storage API for caching CatalogObjects: databases, tables, + * and functions and the relevant metadata to go along with them. Although this class is * thread safe, it does not guarantee consistency with the MetaStore. It is important * to keep in mind that there may be external (potentially conflicting) concurrent * metastore updates occurring at any time. - * All reads and writes of catalog objects should be synchronized using the catalogLock_. + * The CatalogObject storage hierarchy is: + * Catalog -> Db -> Table + * -> Function + * Each level has its own synchronization, so the cache of Dbs is synchronized and each + * Db has a cache of tables which is synchronized independently. */ public abstract class Catalog { // Initial catalog version. @@ -51,16 +55,15 @@ public abstract class Catalog { private static final Logger LOG = Logger.getLogger(Catalog.class); - private final MetaStoreClientPool metaStoreClientPool_ = new MetaStoreClientPool(0); - private final CatalogInitStrategy initStrategy_; - private final AtomicInteger nextTableId_ = new AtomicInteger(0); + protected final MetaStoreClientPool metaStoreClientPool_ = new MetaStoreClientPool(0); + protected final CatalogInitStrategy initStrategy_; - // Cache of database metadata. - protected final HashMap dbCache_ = new HashMap(); - - // Fair lock used to synchronize catalog accesses and updates. - protected final ReentrantReadWriteLock catalogLock_ = - new ReentrantReadWriteLock(true); + // Thread safe cache of database metadata. Uses an AtomicReference so reset() + // operations can atomically swap dbCache_ references. + // TODO: Update this to use a CatalogObjectCache? + protected AtomicReference> dbCache_ = + new AtomicReference>( + new ConcurrentHashMap()); // Determines how the Catalog should be initialized. public enum CatalogInitStrategy { @@ -80,10 +83,14 @@ public abstract class Catalog { if (initStrategy != CatalogInitStrategy.EMPTY) { metaStoreClientPool_.addClients(META_STORE_CLIENT_POOL_SIZE); } - reset(); } - public Catalog() { this(CatalogInitStrategy.LAZY); } + /** + * Resets this catalog instance by clearing all cached metadata and potentially + * reloading the metadata. How the metadata is loaded is based on the + * CatalogInitStrategy that was set in the c'tor. + */ + public abstract void reset() throws CatalogException; /** * Adds a new database to the catalog, replacing any existing database with the same @@ -91,12 +98,7 @@ public abstract class Catalog { * previous database. */ public Db addDb(Db db) { - catalogLock_.writeLock().lock(); - try { - return dbCache_.put(db.getName().toLowerCase(), db); - } finally { - catalogLock_.writeLock().unlock(); - } + return dbCache_.get().put(db.getName().toLowerCase(), db); } /** @@ -106,12 +108,7 @@ public abstract class Catalog { public Db getDb(String dbName) { Preconditions.checkState(dbName != null && !dbName.isEmpty(), "Null or empty database name given as argument to Catalog.getDb"); - catalogLock_.readLock().lock(); - try { - return dbCache_.get(dbName.toLowerCase()); - } finally { - catalogLock_.readLock().unlock(); - } + return dbCache_.get().get(dbName.toLowerCase()); } /** @@ -120,12 +117,7 @@ public abstract class Catalog { * statements. */ public Db removeDb(String dbName) { - catalogLock_.writeLock().lock(); - try { - return dbCache_.remove(dbName.toLowerCase()); - } finally { - catalogLock_.writeLock().unlock(); - } + return dbCache_.get().remove(dbName.toLowerCase()); } /** @@ -135,12 +127,7 @@ public abstract class Catalog { * dbPattern may be null (and thus matches everything). */ public List getDbNames(String dbPattern) { - catalogLock_.readLock().lock(); - try { - return filterStringsByPattern(dbCache_.keySet(), dbPattern); - } finally { - catalogLock_.readLock().unlock(); - } + return filterStringsByPattern(dbCache_.get().keySet(), dbPattern); } /** @@ -149,16 +136,11 @@ public abstract class Catalog { */ public Table getTable(String dbName, String tableName) throws DatabaseNotFoundException { - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); - } - return db.getTable(tableName); - } finally { - catalogLock_.readLock().unlock(); + Db db = getDb(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); } + return db.getTable(tableName); } /** @@ -166,15 +148,10 @@ public abstract class Catalog { * if the table/database does not exist. */ public Table removeTable(TTableName tableName) { - catalogLock_.writeLock().lock(); - try { - // Remove the old table name from the cache and add the new table. - Db db = getDb(tableName.getDb_name()); - if (db == null) return null; - return db.removeTable(tableName.getTable_name()); - } finally { - catalogLock_.writeLock().unlock(); - } + // Remove the old table name from the cache and add the new table. + Db db = getDb(tableName.getDb_name()); + if (db == null) return null; + return db.removeTable(tableName.getTable_name()); } /** @@ -189,26 +166,16 @@ public abstract class Catalog { public List getTableNames(String dbName, String tablePattern) throws DatabaseNotFoundException { Preconditions.checkNotNull(dbName); - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); - } - return filterStringsByPattern(db.getAllTableNames(), tablePattern); - } finally { - catalogLock_.readLock().unlock(); + Db db = getDb(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); } + return filterStringsByPattern(db.getAllTableNames(), tablePattern); } public boolean containsTable(String dbName, String tableName) { - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - return (db == null) ? false : db.containsTable(tableName); - } finally { - catalogLock_.readLock().unlock(); - } + Db db = getDb(dbName); + return (db == null) ? false : db.containsTable(tableName); } /** @@ -221,14 +188,9 @@ public abstract class Catalog { * resolve first to db.fn(). */ public boolean addFunction(Function fn) { - catalogLock_.writeLock().lock(); - try { - Db db = getDb(fn.dbName()); - if (db == null) return false; - return db.addFunction(fn); - } finally { - catalogLock_.writeLock().unlock(); - } + Db db = getDb(fn.dbName()); + if (db == null) return false; + return db.addFunction(fn); } /** @@ -237,14 +199,9 @@ public abstract class Catalog { * in the catalog, it will return the function with the strictest matching mode. */ public Function getFunction(Function desc, Function.CompareMode mode) { - catalogLock_.readLock().lock(); - try { - Db db = getDb(desc.dbName()); - if (db == null) return null; - return db.getFunction(desc, mode); - } finally { - catalogLock_.readLock().unlock(); - } + Db db = getDb(desc.dbName()); + if (db == null) return null; + return db.getFunction(desc, mode); } /** @@ -253,14 +210,9 @@ public abstract class Catalog { * null. */ public Function removeFunction(Function desc) { - catalogLock_.writeLock().lock(); - try { - Db db = getDb(desc.dbName()); - if (db == null) return null; - return db.removeFunction(desc); - } finally { - catalogLock_.writeLock().unlock(); - } + Db db = getDb(desc.dbName()); + if (db == null) return null; + return db.removeFunction(desc); } /** @@ -268,16 +220,11 @@ public abstract class Catalog { */ public List getFunctionSignatures(TFunctionType type, String dbName, String pattern) throws DatabaseNotFoundException { - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); - } - return filterStringsByPattern(db.getAllFunctionSignatures(type), pattern); - } finally { - catalogLock_.readLock().unlock(); + Db db = getDb(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); } + return filterStringsByPattern(db.getAllFunctionSignatures(type), pattern); } /** @@ -285,14 +232,9 @@ public abstract class Catalog { * are ignored. */ public boolean functionExists(FunctionName name) { - catalogLock_.readLock().lock(); - try { - Db db = getDb(name.getDb()); - if (db == null) return false; - return db.functionExists(name); - } finally { - catalogLock_.readLock().unlock(); - } + Db db = getDb(name.getDb()); + if (db == null) return false; + return db.functionExists(name); } /** @@ -301,68 +243,12 @@ public abstract class Catalog { */ public void close() { metaStoreClientPool_.close(); } - /** - * Gets the next table ID and increments the table ID counter. - */ - public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); } /** * Returns a managed meta store client from the client connection pool. */ public MetaStoreClient getMetaStoreClient() { return metaStoreClientPool_.getClient(); } - /** - * Resets this catalog instance by clearing all cached metadata and potentially - * reloading the metadata. How the metadata is loaded is based on the - * CatalogInitStrategy that was set in the c'tor. - */ - public long reset() { - catalogLock_.writeLock().lock(); - try { - return resetInternal(); - } finally { - catalogLock_.writeLock().unlock(); - } - } - - /** - * Executes the underlying reset logic. catalogLock_.writeLock() must - * be taken before calling this. - */ - protected long resetInternal() { - try { - nextTableId_.set(0); - dbCache_.clear(); - - if (initStrategy_ == CatalogInitStrategy.EMPTY) { - return CatalogServiceCatalog.getCatalogVersion(); - } - MetaStoreClient msClient = metaStoreClientPool_.getClient(); - - try { - for (String dbName: msClient.getHiveClient().getAllDatabases()) { - Db db = new Db(dbName, this); - db.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); - addDb(db); - for (final String tableName: msClient.getHiveClient().getAllTables(dbName)) { - Table incompleteTbl = - IncompleteTable.createUninitializedTable(getNextTableId(), db, tableName); - incompleteTbl.setCatalogVersion( - CatalogServiceCatalog.incrementAndGetCatalogVersion()); - db.addTable(incompleteTbl); - } - } - } finally { - msClient.release(); - } - return CatalogServiceCatalog.getCatalogVersion(); - } catch (Exception e) { - LOG.error(e); - LOG.error("Error initializing Catalog. Catalog may be empty."); - throw new IllegalStateException(e); - } - } - /** * Implement Hive's pattern-matching semantics for SHOW statements. The only * metacharacters are '*' which matches any string of characters, and '|' @@ -416,21 +302,16 @@ public abstract class Catalog { PartitionNotFoundException, TableNotFoundException, TableLoadingException { String partitionNotFoundMsg = "Partition not found: " + Joiner.on(", ").join(partitionSpec); - catalogLock_.readLock().lock(); - try { - Table table = getTable(dbName, tableName); - // This is not an Hdfs table, throw an error. - if (!(table instanceof HdfsTable)) { - throw new PartitionNotFoundException(partitionNotFoundMsg); - } - // Get the HdfsPartition object for the given partition spec. - HdfsPartition partition = - ((HdfsTable) table).getPartitionFromThriftPartitionSpec(partitionSpec); - if (partition == null) throw new PartitionNotFoundException(partitionNotFoundMsg); - return partition; - } finally { - catalogLock_.readLock().unlock(); + Table table = getTable(dbName, tableName); + // This is not an Hdfs table, throw an error. + if (!(table instanceof HdfsTable)) { + throw new PartitionNotFoundException(partitionNotFoundMsg); } + // Get the HdfsPartition object for the given partition spec. + HdfsPartition partition = + ((HdfsTable) table).getPartitionFromThriftPartitionSpec(partitionSpec); + if (partition == null) throw new PartitionNotFoundException(partitionNotFoundMsg); + return partition; } /** @@ -503,4 +384,4 @@ public abstract class Catalog { } return result; } -} \ No newline at end of file +} diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogDeltaLog.java index 4ee039c54..89f9ea5d0 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogDeltaLog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogDeltaLog.java @@ -71,6 +71,8 @@ public class CatalogDeltaLog { */ public synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject) { Preconditions.checkNotNull(catalogObject); + if (removedCatalogObjects_.isEmpty()) return false; + // Get all the items that were removed after the catalog version of this object. SortedMap candidateObjects = removedCatalogObjects_.tailMap(catalogObject.getCatalog_version()); diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogObject.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogObject.java index c63fba0da..9846f5d78 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogObject.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogObject.java @@ -31,4 +31,7 @@ public interface CatalogObject { // Sets the version of this catalog object. public void setCatalogVersion(long newVersion); + + // Returns true if this CatalogObject has had its metadata loaded, false otherwise. + public boolean isLoaded(); } \ No newline at end of file diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogObjectCache.java index fec595628..211e08f92 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogObjectCache.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogObjectCache.java @@ -20,35 +20,33 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheLoader; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; /** * Lazily loads metadata on read (through getOrLoad()) and tracks the set of valid/known - * object names. This class is thread safe. + * object names. This class is thread safe. The CatalogObjectCache is created by passing + * a custom CacheLoader object which can implement its own load() logic. * - * If a catalog object has not yet been loaded successfully, getOrLoad() will attempt to - * load its metadata. It is only possible to load metadata for objects that have - * previously been created with a call to add() or addName(). The catalog cache supports - * parallel loading/gets of different keys. While a load is in progress, any calls to - * get the same key will block until the load completes at which point the loaded value - * will be returned. - * - * Metadata can be invalidated or reloaded. The CatalogObjectCache is initialized using - * a custom CacheLoader object which can implement its own load()/reload() logic, but - * in general the behavior is: - * - reload(name) will perform a synchronous incremental refresh of the object. - * Depending on its implementation, refresh might reuse some of the existing metadata - * which could result in a partially stale object but faster load time. - * - invalidate(name) will mark the item in the metadata cache as invalid - * and the next getOrLoad() will trigger a full metadata reload. + * Items are added to the cache by calling add(T catalogObject). New objects can be + * added in an initialized or uninitialized state (determined by the isLoaded() + * property of the CatalogObject). If a catalog object is uninitialized, getOrLoad() + * will attempt to load its metadata. Otherwise, getOrLoad() will return the cached value. + * Invalidation of cache entries is done by calling add() and passing in a CatalogObject + * that has isLoaded() == false. The next access to the object will trigger a + * metadata load. + * The catalog cache supports parallel loading/gets of different keys. While a load is in + * progress, any calls to get the same key will block until the load completes at which + * point the loaded value will be returned. + * Metadata can be reloaded. Reloading an object will perform a synchronous incremental + * refresh the object's metadata. Depending on the load() implementation in the + * CacheLoader, refresh might reuse some of the existing metadata which could result in + * a partially stale object but faster load time. */ public class CatalogObjectCache { private static final Logger LOG = Logger.getLogger(CatalogObjectCache.class); // Map of lower-case object name to CacheEntry objects. New CacheEntries are created - // by calling add() or addName(). If a CacheEntry does not exist "getOrLoad()" will + // by calling add(). If a CacheEntry does not exist "getOrLoad()" will // return null. private final ConcurrentHashMap> metadataCache_ = new ConcurrentHashMap>(); @@ -58,48 +56,36 @@ public class CatalogObjectCache { /** * Stores and lazily loads CatalogObjects upon read. Ensures the CatalogObject - * catalog versions are strictly increasing when updated. This class is thread safe. + * catalog versions are strictly increasing when updated with an initialized + * (loaded) value. This class is thread safe. */ private static class CacheEntry { - private final String key_; private final CacheLoader cacheLoader_; private T catalogObject_; - private CacheEntry(String key, CacheLoader cacheLoader) { - key_ = key; + private CacheEntry(T catalogObject, CacheLoader cacheLoader) { + catalogObject_ = catalogObject; cacheLoader_ = cacheLoader; } /** * Replaces the CatalogObject in this CacheEntry if it is newer than the - * existing value (the catalog version is greater). Returns true if the existing - * value was replaced or false if the existing value was preserved. + * existing value (the catalog version is greater) or if the existing value + * has not yet uninitialized (has not been loaded) and the new value is + * loaded. + * Returns true if the existing value was replaced or false if the existing + * value was preserved. */ - public synchronized boolean replaceIfNewer(T catalogObject) { - Preconditions.checkNotNull(catalogObject); - if (catalogObject_ == null || - catalogObject_.getCatalogVersion() < catalogObject.getCatalogVersion()) { - catalogObject_ = catalogObject; + public synchronized boolean replaceIfNewer(T newCatalogObject) { + Preconditions.checkNotNull(newCatalogObject); + if (catalogObject_.getCatalogVersion() < newCatalogObject.getCatalogVersion() + || !catalogObject_.isLoaded() && newCatalogObject.isLoaded()) { + catalogObject_ = newCatalogObject; return true; } return false; } - /** - * Invalidates the current value. The next call to getOrLoad() or reload() will - * trigger a metadata load. - */ - public void invalidate() { - T tmpCatalogObject = catalogObject_; - synchronized(this) { - // Only invalidate if the reference hasn't changed. This helps reduce the - // likely-hood that a newly loaded value gets immediately wiped out - // by a concurrent invalidate(). - // TODO: Consider investigating a more fair locking scheme. - if (tmpCatalogObject == catalogObject_) catalogObject_ = null; - } - } - /** * Returns the current CatalogObject value in this CacheEntry. */ @@ -107,50 +93,59 @@ public class CatalogObjectCache { /** * Gets the current catalog object for this CacheEntry, loading it if needed (if the - * existing catalog object is null). Throws a CatalogException on any error - * loading the metadata. + * existing catalog object is uninitialized). */ - public synchronized T getOrLoad() throws CatalogException { - if (catalogObject_ != null) return catalogObject_; - try { - T loadedObject = cacheLoader_.load(key_.toLowerCase()); + public T getOrLoad() { + // Get the catalog version to assign the to the loaded object. It's important to + // do this before locking, because getNextCatalogVersion() may require taking + // a top-level lock. + long targetCatalogVersion = cacheLoader_.getNextCatalogVersion(); + + synchronized (this) { + Preconditions.checkNotNull(catalogObject_); + if (catalogObject_.isLoaded()) return catalogObject_; + T loadedObject = cacheLoader_.load(catalogObject_.getName().toLowerCase(), null, + targetCatalogVersion); Preconditions.checkNotNull(loadedObject); + Preconditions.checkState(loadedObject.isLoaded()); replaceIfNewer(loadedObject); return catalogObject_; - } catch (Exception e) { - throw new CatalogException("Error loading metadata for: " + key_, e); } } /** - * Reloads the value for this cache entry and replaces the existing value if the - * new object's catalog version is greater. All exceptions are logged and - * swallowed and the existing value will not be modified. This is similar to - * getOrLoad(), but can reuse the existing cached value to speedup loading time. - * TODO: Instead of serializing reload() requests, concurrent reload()'s could - * block until the in-progress reload() completes. + * Reloads the value for this cache entry and replaces the existing value. This is + * similar to load(), but can reuse the existing cached value to speedup loading + * time. Will block if any existing reloads/loads are in progress, and return the + * value the value they loaded. */ - public synchronized T reload() { - try { - ListenableFuture result = cacheLoader_.reload(key_, catalogObject_); - Preconditions.checkNotNull(result); + public T reload() { + // Get the catalog version to assign the to the reloaded object. It's important to + // do this before locking, because getNextCatalogVersion() may require taking + // a top-level lock. + long targetCatalogVersion = cacheLoader_.getNextCatalogVersion(); - // Wait for the reload to complete. - T reloadedObject = result.get(); - Preconditions.checkNotNull(reloadedObject); - replaceIfNewer(reloadedObject); - } catch (Exception e) { - LOG.error(e); + T tmpCatalogObject = catalogObject_; + synchronized (this) { + // Only reload if the underlying catalog object has not changed since waiting + // on the lock OR if the catalog object is uninitialized and needs to be loaded. + if (tmpCatalogObject == catalogObject_ || !catalogObject_.isLoaded()) { + T loadedObject = cacheLoader_.load(catalogObject_.getName(), catalogObject_, + targetCatalogVersion); + Preconditions.checkNotNull(loadedObject); + Preconditions.checkState(loadedObject.isLoaded()); + replaceIfNewer(loadedObject); + } + return catalogObject_; } - return catalogObject_; } /** * Creates a new CacheEntry with the given key and CacheLoader. */ public static CacheEntry - create(String key, CacheLoader cacheLoader) { - return new CacheEntry(key.toLowerCase(), cacheLoader); + create(T catalogObject, CacheLoader cacheLoader) { + return new CacheEntry(catalogObject, cacheLoader); } } @@ -170,10 +165,9 @@ public class CatalogObjectCache { public boolean add(T catalogObject) { Preconditions.checkNotNull(catalogObject); CacheEntry cacheEntry = - CacheEntry.create(catalogObject.getName(), cacheLoader_); + CacheEntry.create(catalogObject, cacheLoader_); CacheEntry existingItem = metadataCache_.putIfAbsent( - catalogObject.getName().toLowerCase(), - cacheEntry); + catalogObject.getName().toLowerCase(), cacheEntry); // When existingItem != null it indicates there was already an existing entry // associated with the key, so apply the update to the existing entry. @@ -182,22 +176,6 @@ public class CatalogObjectCache { return cacheEntry.replaceIfNewer(catalogObject); } - /** - * Adds a new name to the cache, the next access to the object will trigger a - * metadata load. If an item with the same name already exists in the cache - * it will be invalidated. - * TODO: Should addName() require a catalog version associated with the operation? - */ - public void addName(String objectName) { - CacheEntry cacheEntry = CacheEntry.create(objectName, cacheLoader_); - CacheEntry existingItem = metadataCache_.putIfAbsent( - objectName.toLowerCase(), cacheEntry); - cacheEntry = existingItem != null ? existingItem : cacheEntry; - // This invalidate may be unnecessary if there was an existing item in the cache - // that didn't need invalidation, but it is still safe to do so. - cacheEntry.invalidate(); - } - /** * Removes an item from the metadata cache and returns the removed item, or null * if no item was removed. @@ -214,16 +192,6 @@ public class CatalogObjectCache { metadataCache_.clear(); } - /** - * Invalidates the CacheEntry's value for the given object name. The next access to - * this object will trigger a metadata load. Note that this does NOT remove the - * CacheEntry value or the key in the metadataCache_. - */ - public void invalidate(String name) { - CacheEntry cacheEntry = metadataCache_.get(name.toLowerCase()); - if (cacheEntry != null) cacheEntry.invalidate(); - } - /** * Reloads the metadata for the given object name (if the object already exists * in the cache) or loads the object if it does not exists in the metadata @@ -260,16 +228,21 @@ public class CatalogObjectCache { * It is important getOrLoad() not be synchronized to allow concurrent getOrLoad() * requests on different keys. */ - public T getOrLoad(final String name) { + public T getOrLoad(String name) { CacheEntry cacheEntry = metadataCache_.get(name.toLowerCase()); if (cacheEntry == null) return null; - try { - return cacheEntry.getOrLoad(); - } catch (CatalogException e) { - // TODO: Consider throwing a CatalogException rather than an unchecked - // exception. IllegalStateException isn't really the right exception type - // either. - throw new IllegalStateException(e.getMessage(), e); - } + return cacheEntry.getOrLoad(); + } + + /** + * Returns the catalog object corresponding to the supplied name if it exists in the + * cache, or null if there is no CacheEntry in the metadataCache_ associated with this + * key. Will not perform a load(), so may return objects that are not initialized + * (have isLoaded() == false). + */ + public T get(String name) { + CacheEntry cacheEntry = metadataCache_.get(name.toLowerCase()); + if (cacheEntry == null) return null; + return cacheEntry.value(); } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java index 976ba8073..0e2bbb325 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java @@ -18,10 +18,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.log4j.Logger; +import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient; import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.Pair; import com.cloudera.impala.thrift.TCatalog; @@ -41,19 +44,35 @@ import com.google.common.collect.Lists; * will return the catalog version that the update will show up in. The client * can then wait until the statestore sends an update that contains that catalog * version. + * The CatalogServiceCatalog also manages a global "catalog version". The version + * is incremented and assigned to a CatalogObject whenever it is + * added/modified/removed from the catalog. This means each CatalogObject + * will have a unique version and assigned versions are strictly increasing. */ public class CatalogServiceCatalog extends Catalog { private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class); private final TUniqueId catalogServiceId_; + // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock + // protects catalogVersion_, it can be used to perform atomic bulk catalog operations + // since catalogVersion_ cannot change externally while the lock is being held. + // In addition to protecting catalogVersion_, it is currently used for the + // following bulk operations: + // * Building a delta update to send to the statestore in getAllCatalogObjects(), + // so a snapshot of the catalog can be taken without any version changes. + // * During a catalog invalidation (call to reset()), which re-reads all dbs and tables + // from the metastore. + // * During renameTable(), because a table must be removed and added to the catalog + // atomically (potentially in a different database). + private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true); + // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented - // with each update to the Catalog. Continued across the lifetime of the process. - // Atomic to ensure versions are always sequentially increasing, even when updated - // from different threads. - // TODO: This probably doesn't need to be atomic and can be updated while holding - // the catalogLock_. - private final static AtomicLong catalogVersion_ = - new AtomicLong(INITIAL_CATALOG_VERSION); + // with each update to the Catalog. Continued across the lifetime of the object. + // Protected by catalogLock_. + // TODO: Handle overflow of catalogVersion_ and nextTableId_. + private long catalogVersion_ = INITIAL_CATALOG_VERSION; + + protected final AtomicInteger nextTableId_ = new AtomicInteger(0); /** * Initialize the CatalogServiceCatalog, loading all table metadata @@ -103,7 +122,8 @@ public class CatalogServiceCatalog extends Catalog { for (String tblName: db.getAllTableNames()) { TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION); - Table tbl = db.getTable(tblName); + + Table tbl = db.getTableNoLoad(tblName); if (tbl == null) { LOG.error("Table: " + tblName + " was expected to be in the catalog " + "cache. Skipping table for this update."); @@ -146,12 +166,12 @@ public class CatalogServiceCatalog extends Catalog { // By setting the catalog version to the latest catalog version at this point, // it ensure impalads will always bump their versions, even in the case where // an object has been dropped. - catalog.setCatalog_version(CatalogServiceCatalog.getCatalogVersion()); + catalog.setCatalog_version(getCatalogVersion()); catalog.setCatalog(new TCatalog(catalogServiceId_)); resp.addToObjects(catalog); // The max version is the max catalog version of all items in the update. - resp.setMax_catalog_version(CatalogServiceCatalog.getCatalogVersion()); + resp.setMax_catalog_version(getCatalogVersion()); return resp; } finally { catalogLock_.readLock().unlock(); @@ -163,108 +183,128 @@ public class CatalogServiceCatalog extends Catalog { * Functions are not returned in a defined order. */ public List getFunctions(String dbName) throws DatabaseNotFoundException { - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database does not exist: " + dbName); - } - - // Contains map of overloaded function names to all functions matching that name. - HashMap> dbFns = db.getAllFunctions(); - List fns = new ArrayList(dbFns.size()); - for (List fnOverloads: dbFns.values()) { - for (Function fn: fnOverloads) { - fns.add(fn); - } - } - return fns; - } finally { - catalogLock_.readLock().unlock(); + Db db = getDb(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database does not exist: " + dbName); } + + // Contains map of overloaded function names to all functions matching that name. + HashMap> dbFns = db.getAllFunctions(); + List fns = new ArrayList(dbFns.size()); + for (List fnOverloads: dbFns.values()) { + for (Function fn: fnOverloads) { + fns.add(fn); + } + } + return fns; } @Override - public long reset() { + public void reset() throws CatalogException { catalogLock_.writeLock().lock(); try { + resetInternal(); + } finally { + catalogLock_.writeLock().unlock(); + } + } + + /** + * Executes the underlying reset logic. catalogLock_.writeLock() must + * be taken before calling this. + */ + private void resetInternal() throws CatalogException { + try { + nextTableId_.set(0); + if (initStrategy_ == CatalogInitStrategy.EMPTY) { + dbCache_.get().clear(); + return; + } + // Since UDFs/UDAs are not persisted in the metastore, we won't clear // them across reset. To do this, we store all the functions before // clearing and restore them after. // TODO: Everything about this. Persist them. List>>> functions = Lists.newArrayList(); - for (Db db: dbCache_.values()) { + for (Db db: dbCache_.get().values()) { if (db.numFunctions() == 0) continue; functions.add(Pair.create(db.getName(), db.getAllFunctions())); } - // Reset the dbs. - resetInternal(); + // Build a new DB cache, populate it, and replace the existing cache in one + // step. + ConcurrentHashMap newDbCache = new ConcurrentHashMap(); + + MetaStoreClient msClient = metaStoreClientPool_.getClient(); + try { + for (String dbName: msClient.getHiveClient().getAllDatabases()) { + Db db = new Db(dbName, this); + db.setCatalogVersion(incrementAndGetCatalogVersion()); + newDbCache.put(db.getName().toLowerCase(), db); + + for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { + Table incompleteTbl = IncompleteTable.createUninitializedTable( + getNextTableId(), db, tableName); + incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); + db.addTable(incompleteTbl); + } + } + } finally { + msClient.release(); + } // Restore UDFs/UDAs. for (Pair>> dbFns: functions) { Db db = null; try { - db = dbCache_.get(dbFns.first); + db = newDbCache.get(dbFns.first); } catch (Exception e) { continue; } if (db == null) { - // DB no longer exists. - // TODO: We could restore this DB and then add the functions back. + // DB no longer exists - it was probably dropped externally. + // TODO: We could restore this DB and then add the functions back? continue; } for (List fns: dbFns.second.values()) { for (Function fn: fns) { - fn.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); + fn.setCatalogVersion(incrementAndGetCatalogVersion()); db.addFunction(fn); } } } - return getCatalogVersion(); - } finally { - catalogLock_.writeLock().unlock(); + dbCache_.set(newDbCache); + } catch (Exception e) { + LOG.error(e); + throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e); } } /** * Adds a database name to the metadata cache and returns the database's - * Thrift representation. Used by CREATE DATABASE statements. + * new Db object. Used by CREATE DATABASE statements. */ - public TCatalogObject addDb(String dbName) throws ImpalaException { - TCatalogObject thriftDb = new TCatalogObject(TCatalogObjectType.DATABASE, - Catalog.INITIAL_CATALOG_VERSION); - catalogLock_.writeLock().lock(); - try { - Db newDb = new Db(dbName, this); - newDb.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); - addDb(newDb); - thriftDb.setCatalog_version(newDb.getCatalogVersion()); - thriftDb.setDb(newDb.toThrift()); - return thriftDb; - } finally { - catalogLock_.writeLock().unlock(); - } + public Db addDb(String dbName) throws ImpalaException { + Db newDb = new Db(dbName, this); + newDb.setCatalogVersion(incrementAndGetCatalogVersion()); + addDb(newDb); + return newDb; } /** - * Removes a database from the metadata cache. Used by DROP DATABASE statements. + * Removes a database from the metadata cache and returns the removed database, + * or null if the database did not exist in the cache. + * Used by DROP DATABASE statements. */ @Override public Db removeDb(String dbName) { - catalogLock_.writeLock().lock(); - try { - Db removedDb = super.removeDb(dbName); - if (removedDb != null) { - removedDb.setCatalogVersion( - CatalogServiceCatalog.incrementAndGetCatalogVersion()); - } - return removedDb; - } finally { - catalogLock_.writeLock().unlock(); + Db removedDb = super.removeDb(dbName); + if (removedDb != null) { + removedDb.setCatalogVersion(incrementAndGetCatalogVersion()); } + return removedDb; } /** @@ -273,54 +313,38 @@ public class CatalogServiceCatalog extends Catalog { * TODO: Should this add an IncompleteTable instead of loading the metadata? */ public Table addTable(String dbName, String tblName) throws TableNotFoundException { - Db db; - catalogLock_.writeLock().lock(); - try { - db = getDb(dbName); - if (db == null) return null; - db.addTableName(tblName); - } finally { - catalogLock_.writeLock().unlock(); - } + Db db = getDb(dbName); + if (db == null) return null; + Table incompleteTable = + IncompleteTable.createUninitializedTable(getNextTableId(), db, tblName); + db.addTable(incompleteTable); return db.getTable(tblName); } public Table removeTable(String dbName, String tblName) throws DatabaseNotFoundException { - catalogLock_.writeLock().lock(); - try { - Db parentDb = getDb(dbName); - if (parentDb == null) return null; + Db parentDb = getDb(dbName); + if (parentDb == null) return null; - Table removedTable = parentDb.removeTable(tblName); - if (removedTable != null) { - removedTable.setCatalogVersion( - CatalogServiceCatalog.incrementAndGetCatalogVersion()); - } - return removedTable; - } finally { - catalogLock_.writeLock().unlock(); + Table removedTable = parentDb.removeTable(tblName); + if (removedTable != null) { + removedTable.setCatalogVersion(incrementAndGetCatalogVersion()); } + return removedTable; } /** * Removes a function from the catalog. Increments the catalog version and returns - * the Function object that was removed if the function existed, otherwise returns - * null. + * the Function object that was removed. If the function did not exist, null will + * be returned. */ @Override public Function removeFunction(Function desc) { - catalogLock_.writeLock().lock(); - try { - Function removedFn = super.removeFunction(desc); - if (removedFn != null) { - removedFn.setCatalogVersion( - CatalogServiceCatalog.incrementAndGetCatalogVersion()); - } - return removedFn; - } finally { - catalogLock_.writeLock().unlock(); + Function removedFn = super.removeFunction(desc); + if (removedFn != null) { + removedFn.setCatalogVersion(incrementAndGetCatalogVersion()); } + return removedFn; } /** @@ -331,18 +355,13 @@ public class CatalogServiceCatalog extends Catalog { */ @Override public boolean addFunction(Function fn) { - catalogLock_.writeLock().lock(); - try { - Db db = getDb(fn.getFunctionName().getDb()); - if (db == null) return false; - if (db.addFunction(fn)) { - fn.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); - return true; - } - return false; - } finally { - catalogLock_.writeLock().unlock(); + Db db = getDb(fn.getFunctionName().getDb()); + if (db == null) return false; + if (db.addFunction(fn)) { + fn.setCatalogVersion(incrementAndGetCatalogVersion()); + return true; } + return false; } /** @@ -367,16 +386,11 @@ public class CatalogServiceCatalog extends Catalog { * (outside of Impala) modifications to the table. */ public void updateLastDdlTime(TTableName tblName, long ddlTime) { - catalogLock_.writeLock().lock(); - try { - Db db = getDb(tblName.getDb_name()); - if (db == null) return; - Table tbl = db.getTable(tblName.getTable_name()); - if (tbl == null) return; - tbl.updateLastDdlTime(ddlTime); - } finally { - catalogLock_.writeLock().unlock(); - } + Db db = getDb(tblName.getDb_name()); + if (db == null) return; + Table tbl = db.getTable(tblName.getTable_name()); + if (tbl == null) return; + tbl.updateLastDdlTime(ddlTime); } /** @@ -418,87 +432,51 @@ public class CatalogServiceCatalog extends Catalog { Table table = getTable(tableName.getDb_name(), tableName.getTable_name()); if (table == null) return null; LOG.debug("Refreshing table metadata: " + table.getFullName()); - table.getDb().reloadTable(table.getName()); - return getTable(tableName.getDb_name(), tableName.getTable_name()); + return table.getDb().reloadTable(table.getName()); } else { - catalogLock_.writeLock().lock(); - try { - Table existingTable = getTable(tableName.getDb_name(), - tableName.getTable_name()); - if (existingTable == null) return null; - LOG.debug("Invalidating table metadata: " + existingTable.getFullName()); - // Instead of calling invalidate() on the cache entry, which would mean the next - // access would trigger a metadata load, the existing table is replaced with - // an IncompleteTable. The IncompleteTable will be sent to all impalads in the - // cluster, triggering an invalidation on each node. - Table newTable = IncompleteTable.createUninitializedTable(getNextTableId(), - existingTable.getDb(), existingTable.getName()); - newTable.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); - existingTable.getDb().addTable(newTable); - return newTable; - } finally { - catalogLock_.writeLock().unlock(); - } - } - } + Table existingTable = getTable(tableName.getDb_name(), + tableName.getTable_name()); + if (existingTable == null) return null; + LOG.debug("Invalidating table metadata: " + existingTable.getFullName()); - /** - * Gets a table from the catalog. This method will load any uninitialized - * IncompleteTables into the table cache. Returns the Table object or null if no - * table existed in the cache with this name. - */ - public Table getOrReloadTable(String dbName, String tblName) - throws DatabaseNotFoundException { - Table tbl = getTable(dbName, tblName); - if (tbl == null) return null; - - // Check if this table needs to have its metadata loaded (is an uninitialized - // IncompleteTable). To avoid having the same table reloaded many times for - // concurrent requests to the same object, synchronize on the Table and then - // check if it is still incomplete while holding the lock. This allows concurrent - // loads for different tables while still protecting against a flood of reloads - // of the same table. - synchronized (tbl) { - // Read the table again while holding the lock. - Table reReadTbl = getTable(dbName, tblName); - if (reReadTbl == null) return null; - if (reReadTbl instanceof IncompleteTable - && ((IncompleteTable) reReadTbl).isUninitialized()) { - // Perform the reload - return reReadTbl.getDb().reloadTable(tblName); - } else { - return reReadTbl; - } + // The existing table is replaced with an uninitialized IncompleteTable, which + // effectively invalidates the existing cached value. The IncompleteTable + // will then be sent to all impalads in the cluster, triggering an invalidation on + // each node. + Table newTable = IncompleteTable.createUninitializedTable(getNextTableId(), + existingTable.getDb(), existingTable.getName()); + newTable.setCatalogVersion(incrementAndGetCatalogVersion()); + existingTable.getDb().addTable(newTable); + return newTable; } } - /** - * See comment in Catalog.java. - * This method will load any uninitialized IncompleteTables into the table cache. - */ - @Override - public TCatalogObject getTCatalogObject(TCatalogObject objectDesc) - throws CatalogException { - if (objectDesc.isSetTable()) { - Table tbl = getOrReloadTable(objectDesc.getTable().getDb_name(), - objectDesc.getTable().getTbl_name()); - if (tbl == null) { - throw new TableNotFoundException("Table not found: " + - objectDesc.getTable().getTbl_name()); - } - } - return super.getTCatalogObject(objectDesc); - } - /** * Increments the current Catalog version and returns the new value. */ - public static long incrementAndGetCatalogVersion() { - return catalogVersion_.incrementAndGet(); + public long incrementAndGetCatalogVersion() { + catalogLock_.writeLock().lock(); + try { + return ++catalogVersion_; + } finally { + catalogLock_.writeLock().unlock(); + } } /** * Returns the current Catalog version. */ - public static long getCatalogVersion() { return catalogVersion_.get(); } + public long getCatalogVersion() { + catalogLock_.readLock().lock(); + try { + return catalogVersion_; + } finally { + catalogLock_.readLock().unlock(); + } + } + + /** + * Gets the next table ID and increments the table ID counter. + */ + public TableId getNextTableId() { return new TableId(nextTableId_.getAndIncrement()); } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Db.java b/fe/src/main/java/com/cloudera/impala/catalog/Db.java index 1a94dd7f0..059ecafa9 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Db.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Db.java @@ -50,7 +50,8 @@ public class Db implements CatalogObject { // All of the registered user functions. The key is the user facing name (e.g. "myUdf"), // and the values are all the overloaded variants (e.g. myUdf(double), myUdf(string)) - // This includes both UDFs and UDAs + // This includes both UDFs and UDAs. Updates are made thread safe by synchronizing + // on this map. private final HashMap> functions_; public Db(String name, Catalog catalog) { @@ -73,6 +74,16 @@ public class Db implements CatalogObject { return TCatalogObjectType.DATABASE; } + /** + * Adds a table to the table cache. + */ + public void addTable(Table table) { + tableCache_.add(table); + } + + /** + * Gets all table names in the table cache. + */ public List getAllTableNames() { return tableCache_.getAllNames(); } @@ -85,25 +96,17 @@ public class Db implements CatalogObject { * Returns the Table with the given name if present in the table cache or loads the * table if it does not already exist in the cache. Returns null if the table does not * exist in the cache or if there was an error loading the table metadata. - * TODO: Should we bubble this exception up? */ public Table getTable(String tblName) { return tableCache_.getOrLoad(tblName); } /** - * Adds a table to the table cache. + * Gets a table from the table cache, but does not perform a metadata load if the + * table is uninitialized. */ - public void addTable(Table table) { - tableCache_.add(table); - } - - /** - * Adds a given table name to the table cache. The next call to refreshTable() or - * getOrLoadTable() will trigger a metadata load. - */ - public void addTableName(String tableName) { - tableCache_.addName(tableName); + public Table getTableNoLoad(String tblName) { + return tableCache_.get(tblName); } /** @@ -113,14 +116,6 @@ public class Db implements CatalogObject { return tableCache_.reload(tableName); } - /** - * Invalidates the table's metadata, forcing a reload on the next access. Does - * not remove the table from table cache's name set. - */ - public void invalidateTable(String tableName) { - tableCache_.invalidate(tableName); - } - /** * Removes the table name and any cached metadata from the Table cache. */ @@ -263,4 +258,7 @@ public class Db implements CatalogObject { @Override public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; } public Catalog getParentCatalog() { return parentCatalog_; } + + @Override + public boolean isLoaded() { return true; } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Function.java b/fe/src/main/java/com/cloudera/impala/catalog/Function.java index f64bb7683..938694943 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Function.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Function.java @@ -267,4 +267,7 @@ public class Function implements CatalogObject { function.setHasVarArgs(fn.isHas_var_args()); return function; } + + @Override + public boolean isLoaded() { return true; } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java index 28e552e89..ad6d15077 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/ImpaladCatalog.java @@ -53,17 +53,18 @@ import com.google.common.base.Preconditions; * Thread safe Catalog for an Impalad. The Impalad Catalog provides an interface to * access Catalog objects that this Impalad knows about and authorizes access requests * to these objects. It also manages reading and updating the authorization policy file - * from HDFS. + * from HDFS. The Impalad Catalog provides APIs for checking whether a user is authorized + * to access a particular catalog object. Any catalog access that requires privilege + * checks should go through this class. * TODO: The CatalogService should also handle updating and disseminating the * authorization policy. * The Impalad catalog can be updated either via a StateStore heartbeat or by directly - * applying the result of a catalog operation to the CatalogCache. Updates applied using - * the updateCatalog() function which takes the catalogLock_.writeLock() for the duration - * of its execution to ensure all updates are applied atomically. + * applying the result of a catalog operation to the CatalogCache. All updates are + * applied using the updateCatalog() function. * Table metadata is loaded lazily. The CatalogServer initially broadcasts (via the * statestore) the known table names (as IncompleteTables). These table names are added * to the Impalad catalog cache and when one of the tables is accessed, the impalad will - * make an RPC to the CatalogServer to load the complete table metadata. + * make an RPC to the CatalogServer to request loading the complete table metadata. * In both cases, we need to ensure that work from one update is not "undone" by another * update. To handle this the ImpaladCatalog does the following: * - Tracks the overall catalog version last received in a state store heartbeat, this @@ -78,17 +79,23 @@ import com.google.common.base.Preconditions; * - Before dropping any catalog object, see if the object already exists in the catalog * cache. If it does, only drop the object if the version of the drop is > that * object's catalog version. - * Additionally, the Impalad Catalog provides interfaces for checking whether - * a user is authorized to access a particular object. Any catalog access that requires - * privilege checks should go through this class. * The CatalogServiceId is also tracked to detect if a different instance of the catalog * service has been started, in which case a full topic update is required. */ public class ImpaladCatalog extends Catalog { private static final Logger LOG = Logger.getLogger(ImpaladCatalog.class); private static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L); + + // The last known Catalog Service ID. If the ID changes, it indicates the CatalogServer + // has restarted. private TUniqueId catalogServiceId_ = INITIAL_CATALOG_SERVICE_ID; + // The catalog version received in the last StateStore heartbeat. It is guaranteed + // all objects in the catalog have at a minimum, this version. Because updates may + // be applied out of band of a StateStore heartbeat, it is possible the catalog + // contains some objects > than this version. + private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; + //TODO: Make the reload interval configurable. private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60; @@ -98,13 +105,9 @@ public class ImpaladCatalog extends Catalog { // Lock used to synchronize refreshing the AuthorizationChecker. private final ReentrantReadWriteLock authzCheckerLock_ = new ReentrantReadWriteLock(); private AuthorizationChecker authzChecker_; + // Flag to determine if the Catalog is ready to accept user requests. See isReady(). private final AtomicBoolean isReady_ = new AtomicBoolean(false); - // The catalog version received in the last StateStore heartbeat. It is guaranteed - // all objects in the catalog have at a minimum, this version. Because updates may - // be applied out of band of a StateStore heartbeat, it is possible the catalog - // contains some objects > than this version. - private long lastSyncedCatalogVersion_ = Catalog.INITIAL_CATALOG_VERSION; // Tracks modifications to this Impalad's catalog from direct updates to the cache. private final CatalogDeltaLog catalogDeltaLog_ = new CatalogDeltaLog(); @@ -138,13 +141,38 @@ public class ImpaladCatalog extends Catalog { // loaded (from the catalog server) on the next access. // TODO: Clean up how we bootstrap the FE tests. if (loadStrategy != CatalogInitStrategy.EMPTY) { + try { + reset(); + } catch (CatalogException e) { + // Re-throw as an illegal state exception. + throw new IllegalStateException(e); + } isReady_.set(true); - for (String dbName: getDbNames(null)) { - Db db = getDb(dbName); - for (String tblName: db.getAllTableNames()) { - db.invalidateTable(tblName); + } + } + + /** + * Used only for testing. Implementation of reset() that allows for bootstrapping the + * Impalad catalog without a running statestore daemon. + */ + @Override + public void reset() throws CatalogException { + MetaStoreClient msClient = metaStoreClientPool_.getClient(); + try { + for (String dbName: msClient.getHiveClient().getAllDatabases()) { + Db db = new Db(dbName, this); + dbCache_.get().put(db.getName().toLowerCase(), db); + + for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { + Table incompleteTbl = IncompleteTable.createUninitializedTable( + TableId.createInvalidId(), db, tableName); + db.addTable(incompleteTbl); } } + } catch (Exception e) { + throw new CatalogException(e.getMessage(), e); + } finally { + msClient.release(); } } @@ -210,55 +238,50 @@ public class ImpaladCatalog extends Catalog { * * This method is called once per statestore heartbeat and is guaranteed the same * object will not be in both the "updated" list and the "removed" list (it is - * a detail handled by the statestore). This method takes the catalogLock_ writeLock - * for the duration of the method to ensure all updates are applied atomically. Since - * updates are sent from the statestore as deltas, this should generally not block - * execution for a significant amount of time. + * a detail handled by the statestore). * Catalog updates are ordered by the object type with the dependent objects coming * first. That is, database "foo" will always come before table "foo.bar". + * Synchronized because updateCatalog() can be called by during a statestore update or + * during a direct-DDL operation and catalogServiceId_ and lastSyncedCatalogVersion_ + * must be protected. */ - public TUpdateCatalogCacheResponse updateCatalog( - TUpdateCatalogCacheRequest req) throws CatalogException { - catalogLock_.writeLock().lock(); - try { - // Check for changes in the catalog service ID. - if (!catalogServiceId_.equals(req.getCatalog_service_id())) { - boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID); - catalogServiceId_ = req.getCatalog_service_id(); - if (!firstRun) { - // Throw an exception which will trigger a full topic update request. - throw new CatalogException("Detected catalog service ID change. Aborting " + - "updateCatalog()"); - } + public synchronized TUpdateCatalogCacheResponse updateCatalog( + TUpdateCatalogCacheRequest req) throws CatalogException { + // Check for changes in the catalog service ID. + if (!catalogServiceId_.equals(req.getCatalog_service_id())) { + boolean firstRun = catalogServiceId_.equals(INITIAL_CATALOG_SERVICE_ID); + catalogServiceId_ = req.getCatalog_service_id(); + if (!firstRun) { + // Throw an exception which will trigger a full topic update request. + throw new CatalogException("Detected catalog service ID change. Aborting " + + "updateCatalog()"); } - - // First process all updates - long newCatalogVersion = lastSyncedCatalogVersion_; - for (TCatalogObject catalogObject: req.getUpdated_objects()) { - if (catalogObject.getType() == TCatalogObjectType.CATALOG) { - newCatalogVersion = catalogObject.getCatalog_version(); - } else { - try { - addCatalogObject(catalogObject); - } catch (Exception e) { - LOG.error("Error adding catalog object: " + e.getMessage(), e); - } - } - } - - // Now remove all objects from the catalog. Removing a database before removing - // its child tables/functions is fine. If that happens, the removal of the child - // object will be a no-op. - for (TCatalogObject catalogObject: req.getRemoved_objects()) { - removeCatalogObject(catalogObject, newCatalogVersion); - } - lastSyncedCatalogVersion_ = newCatalogVersion; - // Cleanup old entries in the log. - catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_); - isReady_.set(true); - } finally { - catalogLock_.writeLock().unlock(); } + + // First process all updates + long newCatalogVersion = lastSyncedCatalogVersion_; + for (TCatalogObject catalogObject: req.getUpdated_objects()) { + if (catalogObject.getType() == TCatalogObjectType.CATALOG) { + newCatalogVersion = catalogObject.getCatalog_version(); + } else { + try { + addCatalogObject(catalogObject); + } catch (Exception e) { + LOG.error("Error adding catalog object: " + e.getMessage(), e); + } + } + } + + // Now remove all objects from the catalog. Removing a database before removing + // its child tables/functions is fine. If that happens, the removal of the child + // object will be a no-op. + for (TCatalogObject catalogObject: req.getRemoved_objects()) { + removeCatalogObject(catalogObject, newCatalogVersion); + } + lastSyncedCatalogVersion_ = newCatalogVersion; + // Cleanup old entries in the log. + catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_); + isReady_.set(true); return new TUpdateCatalogCacheResponse(catalogServiceId_); } @@ -318,16 +341,11 @@ public class ImpaladCatalog extends Catalog { checkAccess(user, new PrivilegeRequestBuilder() .allOf(privilege).onTable(dbName, tableName).toRequest()); - catalogLock_.readLock().lock(); - try { - Db db = getDb(dbName); - if (db == null) { - throw new DatabaseNotFoundException("Database not found: " + dbName); - } - return db.containsTable(tableName); - } finally { - catalogLock_.readLock().unlock(); + Db db = getDb(dbName); + if (db == null) { + throw new DatabaseNotFoundException("Database not found: " + dbName); } + return db.containsTable(tableName); } /** @@ -343,8 +361,8 @@ public class ImpaladCatalog extends Catalog { Table table = getTable(dbName, tableName); if (table instanceof IncompleteTable) { - // We should never get back an uninitialized IncompleteTable. - Preconditions.checkState(!((IncompleteTable) table).isUninitialized()); + // We should never get back an IncompleteTable that is uninitialized. + Preconditions.checkState(table.isLoaded()); // If there were problems loading this table's metadata, throw an exception // when it is accessed. @@ -534,15 +552,7 @@ public class ImpaladCatalog extends Catalog { Table newTable = Table.fromThrift(db, thriftTable); newTable.setCatalogVersion(catalogVersion); - - // If this is an uninitialized table, don't just add the table name to - // the metadata cache. The next access will trigger a metadata load. - if (newTable instanceof IncompleteTable - && ((IncompleteTable) newTable).isUninitialized()) { - db.addTableName(newTable.getName()); - } else { - db.addTable(newTable); - } + db.addTable(newTable); } private void addFunction(TFunction fn, long catalogVersion) { diff --git a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java index bb11650a0..5d48d1c4a 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/IncompleteTable.java @@ -25,7 +25,6 @@ import com.cloudera.impala.thrift.TStatus; import com.cloudera.impala.thrift.TStatusCode; import com.cloudera.impala.thrift.TTable; import com.cloudera.impala.thrift.TTableDescriptor; -import com.cloudera.impala.thrift.TTableType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -55,7 +54,8 @@ public class IncompleteTable extends Table { /** * See comment on cause_. */ - public boolean isUninitialized() { return cause_ == null; } + @Override + public boolean isLoaded() { return cause_ != null; } @Override public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } @@ -82,7 +82,6 @@ public class IncompleteTable extends Table { public TTable toThrift() { TTable table = new TTable(db_.getName(), name_); table.setId(id_.asInt()); - table.setTable_type(TTableType.INCOMPLETE_TABLE); if (cause_ != null) { table.setLoad_status(new TStatus(TStatusCode.INTERNAL_ERROR, Lists.newArrayList(JniUtil.throwableToString(cause_), diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Table.java b/fe/src/main/java/com/cloudera/impala/catalog/Table.java index fdcedbfa2..14abe40c3 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Table.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Table.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.log4j.Logger; import com.cloudera.impala.analysis.ColumnType; -import com.cloudera.impala.common.JniUtil; - import com.cloudera.impala.thrift.TAccessLevel; import com.cloudera.impala.thrift.TCatalogObject; import com.cloudera.impala.thrift.TCatalogObjectType; @@ -37,7 +35,6 @@ import com.cloudera.impala.thrift.TColumn; import com.cloudera.impala.thrift.TTable; import com.cloudera.impala.thrift.TTableDescriptor; import com.cloudera.impala.thrift.TTableStats; -import com.cloudera.impala.thrift.TTableType; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -209,8 +206,7 @@ public abstract class Table implements CatalogObject { public static Table fromThrift(Db parentDb, TTable thriftTable) throws TableLoadingException { Table newTable; - if (thriftTable.getTable_type() != TTableType.INCOMPLETE_TABLE && - thriftTable.isSetMetastore_table()) { + if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) { newTable = Table.fromMetastoreTable(new TableId(thriftTable.getId()), parentDb, thriftTable.getMetastore_table()); } else { @@ -375,4 +371,7 @@ public abstract class Table implements CatalogObject { public void setCatalogVersion(long catalogVersion) { catalogVersion_ = catalogVersion; } + + @Override + public boolean isLoaded() { return true; } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java index 391a756f5..f6b05fd7c 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/TableLoader.java @@ -18,18 +18,16 @@ import java.util.EnumSet; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.cloudera.impala.catalog.MetaStoreClientPool.MetaStoreClient; -import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; import com.cloudera.impala.service.FeSupport; import com.cloudera.impala.thrift.TCatalogObject; import com.cloudera.impala.thrift.TCatalogObjectType; import com.cloudera.impala.thrift.TTable; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheLoader; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; /** * A cache loader that defines how table metadata should be loaded. Impalads @@ -37,6 +35,8 @@ import com.google.common.util.concurrent.SettableFuture; * loads its table metadata by contacting the Hive Metastore / HDFS / etc. */ public abstract class TableLoader extends CacheLoader { + private final static Logger LOG = LoggerFactory.getLogger(TableLoader.class); + // Parent database of this table. protected final Db db_; @@ -59,41 +59,6 @@ public abstract class TableLoader extends CacheLoader { parentDb.getParentCatalog().getClass().getName()); } - /** - * Load the given table name. - */ - @Override - public Table load(String tblName) throws Exception { - return loadTable(tblName, null); - } - - /** - * Computes a replacement value for the Table based on an already-cached value. - * - * Returns (as a ListenableFuture) the future new value of the table. - * The returned Future should not be null. Using a Future allows for a - * synchronous or asynchronous implementation or reload(). - */ - @Override - public ListenableFuture reload(String tableName, Table cachedValue) - throws ImpalaException { - SettableFuture
newValue = SettableFuture.create(); - Table table = loadTable(tableName, cachedValue); - newValue.set(table); - return newValue; - } - - /** - * Implementation for how a table is loaded. Generally this is the - * only method that needs to be implemented. - * @param tblName - The name of the table. - * @param cachedEntry - An existing cached table that can be reused to speed up - * the loading process for a "reload" operation. - */ - protected abstract Table loadTable(String tableName, Table cachedValue) - throws TableNotFoundException; - - /** * TableLoader that loads table metadata from the Hive Metastore. Used by * the CatalogServer and updates the object's catalog version on successful @@ -116,18 +81,21 @@ public abstract class TableLoader extends CacheLoader { /** * Creates the Impala representation of Hive/HBase metadata for one table. * Calls load() on the appropriate instance of Table subclass. - * oldCacheEntry is the existing cache entry and might still contain valid info to - * help speed up metadata loading. oldCacheEntry is null if there is no existing + * cachedValue is the existing cache entry and might still contain valid info to + * help speed up metadata loading. cachedValue is null if there is no existing * cache entry (i.e. during fresh load). - * @return new instance of HdfsTable or HBaseTable - * null if the table does not exist - * @throws TableLoadingException if there was an error loading the table. - * @throws TableNotFoundException if the table was not found + * The catalogVersion parameter specifies what version will be assigned + * to the newly loaded object. + * Returns new instance of Table, or null if the table does not exist. If there + * were any errors loading the table metadata an IncompleteTable will be returned + * that contains details on the error. */ @Override - protected Table loadTable(String tblName, Table cacheEntry) - throws TableNotFoundException { - Catalog catalog = db_.getParentCatalog(); + public Table load(String tblName, Table cachedValue, long catalogVersion) { + String fullTblName = db_.getName() + "." + tblName; + LOG.info("Loading metadata for: " + fullTblName); + Preconditions.checkState(db_.getParentCatalog() instanceof CatalogServiceCatalog); + CatalogServiceCatalog catalog = (CatalogServiceCatalog) db_.getParentCatalog(); MetaStoreClient msClient = catalog.getMetaStoreClient(); Table table; // turn all exceptions into TableLoadingException @@ -141,22 +109,26 @@ public abstract class TableLoader extends CacheLoader { TableType tableType = TableType.valueOf(msTbl.getTableType()); if (!SUPPORTED_TABLE_TYPES.contains(tableType)) { throw new TableLoadingException(String.format( - "Unsupported table type '%s' for: %s.%s", - tableType, db_.getName(), tblName)); + "Unsupported table type '%s' for: %s", tableType, fullTblName)); } // Create a table of appropriate type and have it load itself table = Table.fromMetastoreTable(catalog.getNextTableId(), db_, msTbl); if (table == null) { throw new TableLoadingException( - "Unrecognized table type for table: " + msTbl.getTableName()); + "Unrecognized table type for table: " + fullTblName); } - table.load(cacheEntry, msClient.getHiveClient(), msTbl); + table.load(cachedValue, msClient.getHiveClient(), msTbl); } catch (TableLoadingException e) { table = IncompleteTable.createFailedMetadataLoadTable( catalog.getNextTableId(), db_, tblName, e); } catch (NoSuchObjectException e) { - throw new TableNotFoundException("Table not found: " + tblName, e); + TableLoadingException tableDoesNotExist = new TableLoadingException( + "Table " + fullTblName + " no longer exists in the Hive MetaStore. " + + "Run 'invalidate metadata " + fullTblName + "' to update the Impala " + + "catalog."); + table = IncompleteTable.createFailedMetadataLoadTable( + catalog.getNextTableId(), db_, tblName, tableDoesNotExist); } catch (Exception e) { table = IncompleteTable.createFailedMetadataLoadTable( catalog.getNextTableId(), db_, tblName, new TableLoadingException( @@ -165,32 +137,31 @@ public abstract class TableLoader extends CacheLoader { msClient.release(); } // Set the new catalog version for the table and return it. - table.setCatalogVersion(CatalogServiceCatalog.incrementAndGetCatalogVersion()); + table.setCatalogVersion(catalogVersion); return table; } + + @Override + public long getNextCatalogVersion() { + CatalogServiceCatalog catalog = (CatalogServiceCatalog) db_.getParentCatalog(); + return catalog.incrementAndGetCatalogVersion(); + } } /** * A TableLoader that loads metadata from the CatalogServer. */ private static class CatalogServiceTableLoader extends TableLoader { + private static final String FAILED_TBL_LOAD_MSG = + "Unexpected error loading table metadata. Please run 'invalidate metadata %s'"; + public CatalogServiceTableLoader(Db db) { super(db); } @Override - public ListenableFuture
reload(String tableName, Table cachedValue) - throws ImpalaException { - // To protect against a concurrency issue between add(CatalogObject) and - // reload(), reload() is not supported on a CatalogServiceTableLoader. See comment - // in the CatalogObjectCache for more details. - throw new IllegalStateException("Calling reload() on a CatalogServiceTableLoader" + - " is not supported."); - } - - @Override - protected Table loadTable(String tblName, Table cacheEntry) - throws TableNotFoundException { + public Table load(String tblName, Table cachedValue, long catalogVersion) { + LOG.info("Loading metadata for: " + db_.getName() + "." + tblName); TCatalogObject objectDesc = new TCatalogObject(); objectDesc.setType(TCatalogObjectType.TABLE); objectDesc.setTable(new TTable()); @@ -204,20 +175,36 @@ public abstract class TableLoader extends CacheLoader { TableId.createInvalidId(), db_, tblName, e); } + Table newTable; if (!catalogObject.isSetTable()) { - throw new TableNotFoundException( - String.format("Table not found: %s.%s", db_.getName(), tblName)); + newTable = IncompleteTable.createFailedMetadataLoadTable( + TableId.createInvalidId(), db_, tblName, + new TableLoadingException(FAILED_TBL_LOAD_MSG)); } - Table newTable; + try { newTable = Table.fromThrift(db_, catalogObject.getTable()); } catch (TableLoadingException e) { newTable = IncompleteTable.createFailedMetadataLoadTable( TableId.createInvalidId(), db_, tblName, e); } + + if (newTable == null || !newTable.isLoaded()) { + newTable = IncompleteTable.createFailedMetadataLoadTable( + TableId.createInvalidId(), db_, tblName, + new TableLoadingException(FAILED_TBL_LOAD_MSG)); + } + newTable.setCatalogVersion(catalogObject.getCatalog_version()); return newTable; } + + @Override + public long getNextCatalogVersion() { + // Tables should already have catalog versions assigned catalog + // by the CatalogServer. Always return zero here. + return 0; + } } -} \ No newline at end of file +} diff --git a/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java b/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java index 95ce351ed..1c086fb40 100644 --- a/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java +++ b/fe/src/main/java/com/cloudera/impala/service/DdlExecutor.java @@ -323,8 +323,7 @@ public class DdlExecutor { Preconditions.checkState(tableName != null && tableName.isFullyQualified()); LOG.info(String.format("Updating table stats for %s", tableName)); - Table table = catalog_.getOrReloadTable(tableName.getDb(), - tableName.getTbl()); + Table table = catalog_.getTable(tableName.getDb(), tableName.getTbl()); // Deep copy the msTbl to avoid updating our cache before successfully persisting // the results to the metastore. org.apache.hadoop.hive.metastore.api.Table msTbl = @@ -501,7 +500,7 @@ public class DdlExecutor { if (params.if_not_exists && catalog_.getDb(dbName) != null) { LOG.debug("Skipping database creation because " + dbName + " already exists and " + "IF NOT EXISTS was specified."); - resp.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion()); + resp.getResult().setVersion(catalog_.getCatalogVersion()); return; } org.apache.hadoop.hive.metastore.api.Database db = @@ -527,7 +526,13 @@ public class DdlExecutor { } finally { msClient.release(); } - resp.result.setUpdated_catalog_object(catalog_.addDb(dbName)); + + Db newDb = catalog_.addDb(dbName); + TCatalogObject thriftDb = new TCatalogObject(TCatalogObjectType.DATABASE, + Catalog.INITIAL_CATALOG_VERSION); + thriftDb.setDb(newDb.toThrift()); + thriftDb.setCatalog_version(newDb.getCatalogVersion()); + resp.result.setUpdated_catalog_object(thriftDb); } resp.result.setVersion(resp.result.getUpdated_catalog_object().getCatalog_version()); } @@ -582,7 +587,7 @@ public class DdlExecutor { // If no db was removed as part of this operation just return the current catalog // version. if (removedDb == null) { - removedObject.setCatalog_version(CatalogServiceCatalog.getCatalogVersion()); + removedObject.setCatalog_version(catalog_.getCatalogVersion()); } else { removedObject.setCatalog_version(removedDb.getCatalogVersion()); } @@ -619,7 +624,7 @@ public class DdlExecutor { if (table != null) { resp.result.setVersion(table.getCatalogVersion()); } else { - resp.result.setVersion(CatalogServiceCatalog.getCatalogVersion()); + resp.result.setVersion(catalog_.getCatalogVersion()); } } removedObject.setType(TCatalogObjectType.TABLE); @@ -647,7 +652,7 @@ public class DdlExecutor { } // The user specified IF NOT EXISTS and the function didn't exist, just // return the current catalog version. - resp.result.setVersion(CatalogServiceCatalog.getCatalogVersion()); + resp.result.setVersion(catalog_.getCatalogVersion()); } else { TCatalogObject removedObject = new TCatalogObject(); removedObject.setType(TCatalogObjectType.FUNCTION); @@ -678,7 +683,7 @@ public class DdlExecutor { catalog_.containsTable(tableName.getDb(), tableName.getTbl())) { LOG.debug(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tableName)); - response.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion()); + response.getResult().setVersion(catalog_.getCatalogVersion()); return false; } org.apache.hadoop.hive.metastore.api.Table tbl = @@ -738,10 +743,10 @@ public class DdlExecutor { catalog_.containsTable(tblName.getDb(), tblName.getTbl())) { LOG.debug(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tblName)); - response.getResult().setVersion(CatalogServiceCatalog.getCatalogVersion()); + response.getResult().setVersion(catalog_.getCatalogVersion()); return; } - Table srcTable = catalog_.getOrReloadTable(srcTblName.getDb(), srcTblName.getTbl()); + Table srcTable = catalog_.getTable(srcTblName.getDb(), srcTblName.getTbl()); org.apache.hadoop.hive.metastore.api.Table tbl = srcTable.getMetaStoreTable().deepCopy(); tbl.setDbName(tblName.getDb()); @@ -1216,7 +1221,7 @@ public class DdlExecutor { TableName tableName) throws DatabaseNotFoundException, TableNotFoundException, TableLoadingException { Preconditions.checkState(tableName != null && tableName.isFullyQualified()); - return catalog_.getOrReloadTable(tableName.getDb(), tableName.getTbl()) + return catalog_.getTable(tableName.getDb(), tableName.getTbl()) .getMetaStoreTable().deepCopy(); } @@ -1390,7 +1395,7 @@ public class DdlExecutor { throws ImpalaException { TUpdateCatalogResponse response = new TUpdateCatalogResponse(); // Only update metastore for Hdfs tables. - Table table = catalog_.getOrReloadTable(update.getDb_name(), + Table table = catalog_.getTable(update.getDb_name(), update.getTarget_table()); if (!(table instanceof HdfsTable)) { throw new InternalException("Unexpected table type: " + diff --git a/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java b/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java index ce4686380..03008680c 100644 --- a/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java +++ b/fe/src/main/java/com/cloudera/impala/service/JniCatalog.java @@ -24,6 +24,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.cloudera.impala.catalog.CatalogException; import com.cloudera.impala.catalog.CatalogServiceCatalog; import com.cloudera.impala.catalog.Function; import com.cloudera.impala.catalog.Table; @@ -77,6 +78,11 @@ public class JniCatalog { GlogAppender.Install(TLogLevel.values()[impalaLogLevel], TLogLevel.values()[otherLogLevel]); catalog_ = new CatalogServiceCatalog(getServiceId()); + try { + catalog_.reset(); + } catch (CatalogException e) { + LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e); + } ddlExecutor_ = new DdlExecutor(catalog_); } @@ -127,7 +133,8 @@ public class JniCatalog { } else { // Invalidate the catalog if no table name is provided. Preconditions.checkArgument(!req.isIs_refresh()); - resp.result.setVersion(catalog_.reset()); + catalog_.reset(); + resp.result.setVersion(catalog_.getCatalogVersion()); } resp.getResult().setStatus( new TStatus(TStatusCode.OK, new ArrayList())); diff --git a/testdata/workloads/targeted-stress/queries/stress-with-invalidate-refresh.test b/testdata/workloads/targeted-stress/queries/stress-with-invalidate-refresh.test new file mode 100644 index 000000000..22839bc3e --- /dev/null +++ b/testdata/workloads/targeted-stress/queries/stress-with-invalidate-refresh.test @@ -0,0 +1,24 @@ +==== +---- QUERY : STRESS-Q1 +select count(*) from (select * from lineitem limit 10) p +---- RESULTS +10 +---- TYPES +bigint +==== +---- QUERY : STRESS-INVALIDATE_METADATA +invalidate metadata +==== +---- QUERY : STRESS-INVALIDATE_TABLE +invalidate metadata lineitem +==== +---- QUERY : REFRESH-TABLE +refresh lineitem +==== +---- QUERY : STRESS-Q2 +select count(*) from (select * from lineitem limit 10) p +---- RESULTS +10 +---- TYPES +bigint +==== diff --git a/testdata/workloads/targeted-stress/queries/stress.test b/testdata/workloads/targeted-stress/queries/stress.test index e748635fd..f019297ef 100644 --- a/testdata/workloads/targeted-stress/queries/stress.test +++ b/testdata/workloads/targeted-stress/queries/stress.test @@ -1,10 +1,8 @@ ==== ---- QUERY : STRESS-Q1 select count(*) from (select * from lineitem limit 10) p -# TODO: The second limit should not be needed, but getting wrong results due to product -# bug IMPALA-20. Once that is fixed the second limit can be removed. -#---- RESULTS -#10 +---- RESULTS +10 ---- TYPES bigint ==== diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 302d38649..f2720caf3 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -163,6 +163,11 @@ class ImpaladService(BaseImpalaService): client.connect() return client + def get_catalog_object_dump(self, object_type, object_name): + return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\ + (object_type, object_name)) + + # Allows for interacting with the StateStore service to perform operations such as # accessing the debug webpage. class StateStoredService(BaseImpalaService): @@ -180,3 +185,7 @@ class CatalogdService(BaseImpalaService): def __init__(self, hostname, webserver_port, service_port): super(CatalogdService, self).__init__(hostname, webserver_port) self.service_port = service_port + + def get_catalog_object_dump(self, object_type, object_name): + return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\ + (object_type, object_name)) diff --git a/tests/stress/test_mini_stress.py b/tests/stress/test_mini_stress.py index 1f172103c..5e1bb96a8 100644 --- a/tests/stress/test_mini_stress.py +++ b/tests/stress/test_mini_stress.py @@ -2,7 +2,9 @@ # Copyright (c) 2012 Cloudera, Inc. All rights reserved. # import pytest +import re from time import sleep +from tests.common.impala_cluster import ImpalaCluster from tests.common.test_vector import TestDimension from tests.common.impala_test_suite import ImpalaTestSuite from tests.util.test_file_parser import QueryTestSectionReader @@ -30,3 +32,39 @@ class TestMiniStress(ImpalaTestSuite): def test_mini_stress(self, vector): for i in xrange(NUM_ITERATIONS): self.run_test_case('stress', vector) + + @pytest.mark.stress + def test_run_invalidate_refresh(self, vector): + """Verifies that running concurrent invalidate table/catalog and refresh commands + don't cause failures with other running workloads and ensures catalog versions + are strictly increasing.""" + target_db = self.execute_scalar('select current_database()', vector=vector) + impala_cluster = ImpalaCluster() + impalad = impala_cluster.impalads[0].service + catalogd = impala_cluster.catalogd.service + + for i in xrange(NUM_ITERATIONS): + # Get the catalog versions for the table before running the workload + before_versions = dict() + before_versions['catalogd'] =\ + self.get_table_version(catalogd, target_db, 'lineitem') + before_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem') + + self.run_test_case('stress-with-invalidate-refresh', vector) + + # Get the catalog versions for the table after running the workload + after_versions = dict() + after_versions['catalogd'] = self.get_table_version(catalogd, target_db, 'lineitem') + after_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem') + + # Catalog versions should be strictly increasing + assert before_versions['impalad'] < after_versions['impalad'] + assert before_versions['catalogd'] < after_versions['catalogd'] + + def get_table_version(self, impala_service, db_name, tbl_name): + """Gets the given table's catalog version using the given impalad/catalogd service""" + obj_dump = impala_service.get_catalog_object_dump('table', db_name + '.' + tbl_name) + result = re.search(r'catalog_version \(i64\) = (\d+)', obj_dump) + assert result, 'Unable to find catalog version in object dump: ' + obj_dump + catalog_version = result.group(1) + return int(catalog_version)