IMPALA-14016: Add multi-catalog support for local catalog mode

This patch adds a new MetaProvider called MultiMetaProvider, which is
capable of handling multiple MetaProviders at once, prioritizing one
primary provider over multiple secondary providers. The primary
provider handles some methods exclusively for deterministic behavior.
In database listings, if one database name occurs multiple times the
contained tables are merged under that database name; if the two
separate databases contain a table with the same name, the query
analyzation fails with an error.
This change also modifies the local catalog implementation's
initialization. If catalogd is deployed, then it instantiates the
CatalogdMetaProvider and checks if the catalog configuration directory
is set as a backend flag. If it's set, then it tries to load every
configuration from the folder, and tries to instantiate the
IcebergMetaProvider from those configs. If the instantiation fails, an
error is reported to the logs, but the startup is not interrupted.

Tests:
 - E2E tests for multi-catalog behavior
 - Unit test for ConfigLoader

Change-Id: Ifbdd0f7085345e7954d9f6f264202699182dd1e1
Reviewed-on: http://gerrit.cloudera.org:8080/22878
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
This commit is contained in:
Peter Rozsa
2025-04-28 09:36:26 +02:00
parent d217b9ecc6
commit b0f1d49042
28 changed files with 1306 additions and 377 deletions

View File

@@ -20,7 +20,7 @@ package org.apache.impala.authorization;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeCatalogManager;
import org.apache.impala.service.catalogmanager.FeCatalogManager;
import java.util.function.Supplier;

View File

@@ -24,7 +24,7 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeCatalogManager;
import org.apache.impala.service.catalogmanager.FeCatalogManager;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TCreateDropRoleParams;
import org.apache.impala.thrift.TDdlExecResponse;

View File

@@ -27,7 +27,7 @@ import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeCatalogManager;
import org.apache.impala.service.catalogmanager.FeCatalogManager;
import java.util.function.Supplier;

View File

@@ -44,6 +44,7 @@ import org.apache.impala.catalog.local.LocalIcebergTable;
import org.apache.impala.catalog.local.LocalKuduTable;
import org.apache.impala.catalog.local.LocalView;
import org.apache.impala.catalog.local.MetaProvider;
import org.apache.impala.catalog.local.MultiMetaProvider;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.service.BackendConfig;
@@ -396,6 +397,9 @@ public abstract class FeCatalogUtils {
if (!BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) return;
Preconditions.checkState(catalog instanceof LocalCatalog);
MetaProvider provider = ((LocalCatalog) catalog).getMetaProvider();
if (provider instanceof MultiMetaProvider) {
provider = ((MultiMetaProvider) provider).getPrimaryProvider();
}
if (!(provider instanceof CatalogdMetaProvider)) return;
CacheStats stats = ((CatalogdMetaProvider) provider).getCacheStats();

View File

@@ -40,18 +40,9 @@ import org.apache.impala.util.IcebergUtil;
public class IcebergRESTCatalog implements IcebergCatalog {
private final String REST_URI;
private static IcebergRESTCatalog instance_;
private final RESTCatalog restCatalog_;
public synchronized static IcebergRESTCatalog getInstance(
Properties properties) {
if (instance_ == null) {
instance_ = new IcebergRESTCatalog(properties);
}
return instance_;
}
private IcebergRESTCatalog(Properties properties) {
public IcebergRESTCatalog(Properties properties) {
setContextClassLoader();
RESTCatalogProperties restConfig = new RESTCatalogProperties(properties);

View File

@@ -106,17 +106,13 @@ public class IcebergMetaProvider implements MetaProvider {
public IcebergMetaProvider(Properties properties) {
properties_ = properties;
iceCatalog_ = initCatalog();
iceCatalog_ = new IcebergRESTCatalog(properties);
}
public String getURI() {
return "Iceberg REST (" + iceCatalog_.getUri() + ")";
}
private IcebergRESTCatalog initCatalog() {
return IcebergRESTCatalog.getInstance(properties_);
}
public void setAuthzChecker(
AtomicReference<? extends AuthorizationChecker> authzChecker) {
authzChecker_ = authzChecker;

View File

@@ -0,0 +1,328 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.local;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.DataSource;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HdfsCachePool;
import org.apache.impala.catalog.SqlConstraints;
import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TBriefTableMeta;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartialTableInfo;
import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.ListMap;
import org.apache.thrift.TException;
/**
* MetaProvider implementation that proxies calls through a chain of MetaProviders. First,
* the primary provider is visited, if it yields exception, secondary providers are
* visited one-by-one. Some methods resort to the primary provider to keep the behavior
* concise, for example: authorization policy checking, readiness probe, and null
* partition key-value fetching is directed to the primary provider.
*/
public class MultiMetaProvider implements MetaProvider {
private final MetaProvider primaryProvider_;
private final List<MetaProvider> secondaryProviders_;
public MultiMetaProvider(MetaProvider primaryProvider,
List<MetaProvider> secondaryProviders) {
primaryProvider_ = primaryProvider;
secondaryProviders_ = secondaryProviders;
}
public MetaProvider getPrimaryProvider() {
return primaryProvider_;
}
@Override
public String getURI() {
StringJoiner joiner = new StringJoiner(", ");
joiner.add(primaryProvider_.getURI());
for (MetaProvider provider : secondaryProviders_) {
joiner.add(provider.getURI());
}
return joiner.toString();
}
@Override
public AuthorizationPolicy getAuthPolicy() {
return primaryProvider_.getAuthPolicy();
}
@Override
public boolean isReady() {
return primaryProvider_.isReady();
}
@Override
public void waitForIsReady(long timeoutMs) {
primaryProvider_.waitForIsReady(timeoutMs);
}
@Override
public ImmutableList<String> loadDbList() throws TException {
return collectFromAllProviders(
unchecked(MetaProvider::loadDbList)).stream().flatMap(
Collection::stream).distinct().collect(ImmutableList.toImmutableList());
}
@Override
public Database loadDb(String dbName) throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadDb(dbName)));
}
@Override
public ImmutableCollection<TBriefTableMeta> loadTableList(String dbName)
throws TException {
ImmutableList<TBriefTableMeta> combinedTableList = collectFromAllProviders(
unchecked(provider -> provider.loadTableList(dbName))).stream().flatMap(
Collection::stream).collect(ImmutableList.toImmutableList());
Optional<Entry<String, Integer>> firstDuplicate = combinedTableList.stream()
.map(tableMeta -> tableMeta.name)
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)).entrySet().stream()
.filter(stringIntegerEntry -> stringIntegerEntry.getValue() > 1).findFirst();
if (firstDuplicate.isPresent()) {
throw new TException("Ambiguous table name: " + firstDuplicate.get().getKey());
}
return combinedTableList;
}
@Override
public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadTable(dbName, tableName)));
}
@Override
public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String tableName) {
try {
return tryAllProviders(
unchecked(provider -> provider.getTableIfPresent(dbName, tableName)));
} catch (TException e) {
return null;
}
}
@Override
public String loadNullPartitionKeyValue() throws TException {
return primaryProvider_.loadNullPartitionKeyValue();
}
@Override
public List<PartitionRef> loadPartitionList(TableMetaRef table)
throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadPartitionList(table)));
}
@Override
public SqlConstraints loadConstraints(TableMetaRef table, Table msTbl)
throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadConstraints(table, msTbl)));
}
@Override
public List<String> loadFunctionNames(String dbName) throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadFunctionNames(dbName)));
}
@Override
public ImmutableList<Function> loadFunction(String dbName, String functionName)
throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadFunction(dbName, functionName)));
}
@Override
public ImmutableList<DataSource> loadDataSources() throws TException {
return tryAllProviders(unchecked(MetaProvider::loadDataSources));
}
@Override
public DataSource loadDataSource(String dsName) throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadDataSource(dsName)));
}
@Override
public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
List<String> partitionColumnNames, ListMap<TNetworkAddress> hostIndex,
List<PartitionRef> partitionRefs)
throws TException, CatalogException {
return tryAllProviders(unchecked(
provider -> provider.loadPartitionsByRefs(table, partitionColumnNames,
hostIndex, partitionRefs)));
}
@Override
public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
List<String> colNames) throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadTableColumnStatistics(table, colNames)));
}
@Override
public TPartialTableInfo loadIcebergTable(TableMetaRef table) throws TException {
return tryAllProviders(
unchecked( provider -> provider.loadIcebergTable(table)));
}
@Override
public org.apache.iceberg.Table loadIcebergApiTable(TableMetaRef table,
TableParams param, Table msTable) throws TException {
return tryAllProviders(
unchecked(provider -> provider.loadIcebergApiTable(table, param, msTable)));
}
@Override
public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
try {
return tryAllProviders(unchecked(
provider -> provider.getValidWriteIdList(ref)));
} catch (TException e) {
return null;
}
}
@Override
public Iterable<HdfsCachePool> getHdfsCachePools() {
try {
return tryAllProviders(unchecked(MetaProvider::getHdfsCachePools));
} catch (TException e) {
return null;
}
}
private ImmutableList<MetaProvider> getAllProviders() {
return ImmutableList.<MetaProvider>builder()
.add(primaryProvider_)
.addAll(secondaryProviders_).build();
}
private <R> R tryAllProviders(java.util.function.Function<MetaProvider, R> function)
throws TException {
Map<String, Exception> exceptions = new HashMap<>();
ImmutableList<MetaProvider> providers = getAllProviders();
for (MetaProvider provider : providers) {
try {
return function.apply(provider);
} catch (MetaProviderException e) {
exceptions.put(provider.getURI(), e);
}
}
handleExceptions(exceptions);
return null;
}
private <R> Collection<R> collectFromAllProviders(
java.util.function.Function<MetaProvider, R> function)
throws TException {
Map<String, Exception> exceptions = new HashMap<>();
ImmutableList<MetaProvider> providers = getAllProviders();
Collection<R> results = new ArrayList<>();
for (MetaProvider provider : providers) {
try {
results.add(function.apply(provider));
} catch (MetaProviderException e) {
exceptions.put(provider.getURI(), e);
}
}
if (!results.isEmpty()) {
return results;
}
handleExceptions(exceptions);
return Collections.emptyList();
}
private void handleExceptions(Map<String, Exception> exceptions) throws TException {
if (!exceptions.isEmpty()) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
for (Entry<String, Exception> e : exceptions.entrySet()) {
printWriter.print(String.format("%s: ", e.getKey()));
e.getValue().printStackTrace(printWriter);
}
String message = String.format(
"Every MetaProvider failed with the following exceptions: %s",
stringWriter);
throw new TException(message);
}
}
private static <T, R> java.util.function.Function<T, R> unchecked(
ThrowingFunction<T, R> tf) {
return t -> {
try {
return tf.apply(t);
} catch (Exception e) {
throw new MetaProviderException(e);
}
};
}
/**
* Exception class to make exception wrapping/unwrapping clear in
* 'collectFromAllProviders' and 'tryAllProviders'.
*/
public static class MetaProviderException extends RuntimeException {
MetaProviderException(Throwable cause) {
super(cause);
}
}
@FunctionalInterface
public interface ThrowingFunction<T, R> {
R apply(T t) throws Exception;
}
}

View File

@@ -1,269 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.local.CatalogdMetaProvider;
import org.apache.impala.catalog.local.IcebergMetaProvider;
import org.apache.impala.catalog.local.LocalCatalog;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.apache.thrift.TException;
/**
* Manages the Catalog implementation used by the frontend.
*
* This class abstracts away the different lifecycles used by the LocalCatalog
* and the ImpaladCatalog. The former creates a new instance for each request or
* query, whereas the latter only creates a new instance upon receiving a full update
* from the catalogd via the statestore.
*/
public abstract class FeCatalogManager {
protected AtomicReference<? extends AuthorizationChecker> authzChecker_;
/**
* @return the appropriate implementation based on the current backend
* configuration.
*/
public static FeCatalogManager createFromBackendConfig() {
TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
if (cfg.use_local_catalog) {
if (!cfg.catalogd_deployed) {
// Currently Iceberg REST Catalog is the only implementation.
return new IcebergRestCatalogImpl();
} else {
return new LocalImpl();
}
} else {
return new CatalogdImpl();
}
}
/**
* Create a manager which always returns the same instance and does not permit
* updates from the statestore.
*/
public static FeCatalogManager createForTests(FeCatalog testCatalog) {
return new TestImpl(testCatalog);
}
public void setAuthzChecker(
AtomicReference<? extends AuthorizationChecker> authzChecker) {
authzChecker_ = Preconditions.checkNotNull(authzChecker);
}
/**
* @return a Catalog instance to be used for a request or query. Depending
* on the catalog implementation this may either be a reused instance or a
* fresh one for each query.
*/
public abstract FeCatalog getOrCreateCatalog();
/**
* Update the Catalog based on an update from the state store.
*
* This can be called either in response to a DDL statement (in which case the update
* may include just the changed objects related to that DDL) or due to data being
* published by the state store.
*
* In the case of the DDL-triggered update, the return value is ignored. In the case
* of the statestore update, the return value is passed back to the C++ code to
* indicate the last applied catalog update and used to implement SYNC_DDL.
*/
abstract TUpdateCatalogCacheResponse updateCatalogCache(
TUpdateCatalogCacheRequest req) throws CatalogException, TException;
/**
* Implementation which creates ImpaladCatalog instances and expects to receive
* updates via the statestore. New instances are created only when full updates
* are received.
*/
private static class CatalogdImpl extends FeCatalogManager {
private final AtomicReference<ImpaladCatalog> catalog_ =
new AtomicReference<>();
private CatalogdImpl() {
catalog_.set(createNewCatalog());
}
@Override
public FeCatalog getOrCreateCatalog() {
return catalog_.get();
}
@Override
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req)
throws CatalogException, TException {
ImpaladCatalog catalog = catalog_.get();
if (req.is_delta) return catalog.updateCatalog(req);
// If this is not a delta, this update should replace the current
// Catalog contents so create a new catalog and populate it.
ImpaladCatalog oldCatalog = catalog;
catalog = createNewCatalog();
TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
// Now that the catalog has been updated, replace the reference to
// catalog_. This ensures that clients don't see the catalog
// disappear. The catalog is guaranteed to be ready since updateCatalog() has a
// postcondition of isReady() == true.
catalog_.set(catalog);
if (oldCatalog != null) oldCatalog.release();
return response;
}
private ImpaladCatalog createNewCatalog() {
return new ImpaladCatalog(authzChecker_);
}
}
/**
* Implementation which creates LocalCatalog instances. A new instance is
* created for each request or query.
*/
private static class LocalImpl extends FeCatalogManager {
private static CatalogdMetaProvider PROVIDER = new CatalogdMetaProvider(
BackendConfig.INSTANCE.getBackendCfg());
@Override
public FeCatalog getOrCreateCatalog() {
PROVIDER.setAuthzChecker(authzChecker_);
return new LocalCatalog(PROVIDER);
}
@Override
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
return PROVIDER.updateCatalogCache(req);
}
}
/**
* Implementation which creates LocalCatalog instances and uses an Iceberg REST
* Catalog.
* TODO(boroknagyz): merge with LocalImpl
*/
private static class IcebergRestCatalogImpl extends FeCatalogManager {
private static IcebergMetaProvider PROVIDER;
@Override
public synchronized FeCatalog getOrCreateCatalog() {
if (PROVIDER == null) {
try {
PROVIDER = initProvider();
} catch (IOException e) {
throw new IllegalStateException("Create IcebergMetaProvider failed", e);
}
}
return new LocalCatalog(PROVIDER);
}
IcebergMetaProvider initProvider() throws IOException {
TBackendGflags flags = BackendConfig.INSTANCE.getBackendCfg();
String catalogConfigDir = flags.catalog_config_dir;
Preconditions.checkState(catalogConfigDir != null &&
!catalogConfigDir.isEmpty());
List<String> files = listFiles(catalogConfigDir);
Preconditions.checkState(files.size() == 1,
String.format("Expected number of files in directory %s is one, found %d files",
catalogConfigDir, files.size()));
String configFile = catalogConfigDir + Path.SEPARATOR + files.get(0);
Properties props = readPropertiesFile(configFile);
// In the future we can expect different catalog types, but currently we only
// support Iceberg REST Catalogs.
checkPropertyValue(configFile, props, "connector.name", "iceberg");
checkPropertyValue(configFile, props, "iceberg.catalog.type", "rest");
return new IcebergMetaProvider(props);
}
private List<String> listFiles(String dirPath) {
File dir = new File(dirPath);
Preconditions.checkState(dir.exists() && dir.isDirectory());
return Stream.of(dir.listFiles())
.filter(file -> !file.isDirectory())
.map(File::getName)
.collect(Collectors.toList());
}
private Properties readPropertiesFile(String file) throws IOException {
Properties props = new Properties();
props.load(new FileInputStream(file));
return props;
}
private void checkPropertyValue(String configFile, Properties props, String key,
String expectedValue) {
if (!props.containsKey(key)) {
throw new IllegalStateException(String.format(
"Expected property %s was not specified in config file %s.", key,
configFile));
}
String actualValue = props.getProperty(key);
if (!Objects.equals(actualValue, expectedValue)) {
throw new IllegalStateException(String.format(
"Expected value of '%s' is '%s', but found '%s' in config file %s",
key, expectedValue, actualValue, configFile));
}
}
@Override
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
return null;
}
}
/**
* Implementation which returns a provided catalog instance, used by tests.
* No updates from the statestore are permitted.
*/
private static class TestImpl extends FeCatalogManager {
private final FeCatalog catalog_;
TestImpl(FeCatalog catalog) {
catalog_ = catalog;
}
@Override
public FeCatalog getOrCreateCatalog() {
return catalog_;
}
@Override
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
throw new IllegalStateException(
"Unexpected call to updateCatalogCache() with a test catalog instance");
}
}
}

View File

@@ -142,7 +142,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.SystemTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -170,7 +169,7 @@ import org.apache.impala.planner.HdfsScanNode;
import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.Planner;
import org.apache.impala.planner.ScanNode;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.catalogmanager.FeCatalogManager;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.TAlterDbParams;
import org.apache.impala.thrift.TBackendGflags;

View File

@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.apache.thrift.TException;
/**
* Implementation which creates ImpaladCatalog instances and expects to receive updates
* via the statestore. New instances are created only when full updates are received.
*/
class CatalogdImpl extends FeCatalogManager {
private final AtomicReference<ImpaladCatalog> catalog_ =
new AtomicReference<>();
CatalogdImpl() {
catalog_.set(createNewCatalog());
}
@Override
public FeCatalog getOrCreateCatalog() {
return catalog_.get();
}
@Override
public TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req)
throws CatalogException, TException {
ImpaladCatalog catalog = catalog_.get();
if (req.is_delta) return catalog.updateCatalog(req);
// If this is not a delta, this update should replace the current
// Catalog contents so create a new catalog and populate it.
ImpaladCatalog oldCatalog = catalog;
catalog = createNewCatalog();
TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
// Now that the catalog has been updated, replace the reference to
// catalog_. This ensures that clients don't see the catalog
// disappear. The catalog is guaranteed to be ready since updateCatalog() has a
// postcondition of isReady() == true.
catalog_.set(catalog);
if (oldCatalog != null) oldCatalog.release();
return response;
}
private ImpaladCatalog createNewCatalog() {
return new ImpaladCatalog(authzChecker_);
}
}

View File

@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.impala.common.ImpalaRuntimeException;
public class ConfigLoader {
private final File configFolder_;
public ConfigLoader(File configFolder) {
this.configFolder_ = configFolder;
}
public List<Properties> loadConfigs() throws ImpalaRuntimeException {
List<File> files = listFiles(configFolder_);
List<Properties> propertiesList = new ArrayList<>();
for (File configFile : files) {
String fileName = configFile.getName();
try {
Properties props = readPropertiesFile(configFile);
checkPropertyValue(fileName, props, "connector.name", "iceberg");
checkPropertyValue(fileName, props, "iceberg.catalog.type", "rest");
propertiesList.add(props);
} catch (IOException e) {
throw new ImpalaRuntimeException(
String.format("Unable to read file %s from configuration directory: %s",
fileName, configFolder_.getAbsolutePath()), e);
}
}
return propertiesList;
}
List<File> listFiles(File configDirectory) {
Preconditions.checkState(configDirectory.exists() && configDirectory.isDirectory());
return Stream.of(configDirectory.listFiles())
.filter(file -> !file.isDirectory())
.collect(Collectors.toList());
}
Properties readPropertiesFile(File file) throws IOException {
Properties props = new Properties();
try (InputStream in = Files.newInputStream(file.toPath())) {
props.load(in);
}
return props;
}
private void checkPropertyValue(String configFile, Properties props, String key,
String expectedValue) {
if (!props.containsKey(key)) {
throw new IllegalStateException(String.format(
"Expected property %s was not specified in config file %s.", key,
configFile));
}
String actualValue = props.getProperty(key);
if (!Objects.equals(actualValue, expectedValue)) {
throw new IllegalStateException(String.format(
"Expected value of '%s' is '%s', but found '%s' in config file %s",
key, expectedValue, actualValue, configFile));
}
}
}

View File

@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.apache.thrift.TException;
/**
* Manages the Catalog implementation used by the frontend.
*
* This class abstracts away the different lifecycles used by the LocalCatalog
* and the ImpaladCatalog. The former creates a new instance for each request or
* query, whereas the latter only creates a new instance upon receiving a full update
* from the catalogd via the statestore.
*/
public abstract class FeCatalogManager {
protected AtomicReference<? extends AuthorizationChecker> authzChecker_;
/**
* @return the appropriate implementation based on the current backend
* configuration.
*/
public static FeCatalogManager createFromBackendConfig() throws ImpalaRuntimeException {
TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
if (cfg.use_local_catalog) {
return new LocalImpl();
} else {
return new CatalogdImpl();
}
}
/**
* Create a manager which always returns the same instance and does not permit
* updates from the statestore.
*/
public static FeCatalogManager createForTests(FeCatalog testCatalog) {
return new TestImpl(testCatalog);
}
public void setAuthzChecker(
AtomicReference<? extends AuthorizationChecker> authzChecker) {
authzChecker_ = Preconditions.checkNotNull(authzChecker);
}
/**
* @return a Catalog instance to be used for a request or query. Depending
* on the catalog implementation this may either be a reused instance or a
* fresh one for each query.
*/
public abstract FeCatalog getOrCreateCatalog();
/**
* Update the Catalog based on an update from the state store.
*
* This can be called either in response to a DDL statement (in which case the update
* may include just the changed objects related to that DDL) or due to data being
* published by the state store.
*
* In the case of the DDL-triggered update, the return value is ignored. In the case
* of the statestore update, the return value is passed back to the C++ code to
* indicate the last applied catalog update and used to implement SYNC_DDL.
*/
public abstract TUpdateCatalogCacheResponse updateCatalogCache(
TUpdateCatalogCacheRequest req) throws CatalogException, TException;
}

View File

@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.local.CatalogdMetaProvider;
import org.apache.impala.catalog.local.IcebergMetaProvider;
import org.apache.impala.catalog.local.LocalCatalog;
import org.apache.impala.catalog.local.MetaProvider;
import org.apache.impala.catalog.local.MultiMetaProvider;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation which creates LocalCatalog instances. A new instance is created for each
* request or query.
*/
class LocalImpl extends FeCatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(LocalImpl.class);
private final MetaProvider provider_;
public LocalImpl() throws ImpalaRuntimeException {
provider_ = getMetaProvider();
}
private MetaProvider getMetaProvider() throws ImpalaRuntimeException {
TBackendGflags backendCfg = BackendConfig.INSTANCE.getBackendCfg();
String catalogConfigDir = backendCfg.catalog_config_dir;
List<MetaProvider> providers = new ArrayList<>();
if (backendCfg.catalogd_deployed) {
providers.add(new CatalogdMetaProvider(backendCfg));
}
if (catalogConfigDir != null && !catalogConfigDir.isEmpty()) {
File configDir = new File(catalogConfigDir);
try {
LOG.info("Loading catalog config from {}", configDir);
List<MetaProvider> secondaryProviders = getSecondaryProviders(configDir);
providers.addAll(secondaryProviders);
} catch (ImpalaRuntimeException e) {
LOG.warn("Unable to load secondary providers from catalog config file", e);
}
}
if (providers.isEmpty()) {
throw new ImpalaRuntimeException("No metadata providers available");
}
if (providers.size() == 1) {
return providers.get(0);
}
return new MultiMetaProvider(providers.get(0),
providers.subList(1, providers.size()));
}
@Override
public FeCatalog getOrCreateCatalog() {
if (provider_ instanceof CatalogdMetaProvider) {
((CatalogdMetaProvider) provider_).setAuthzChecker(authzChecker_);
}
return new LocalCatalog(provider_);
}
@Override
public TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
if (provider_ instanceof CatalogdMetaProvider) {
return ((CatalogdMetaProvider) provider_).updateCatalogCache(req);
}
if (provider_ instanceof MultiMetaProvider) {
MetaProvider primaryProvider = ((MultiMetaProvider) provider_).getPrimaryProvider();
if (primaryProvider instanceof CatalogdMetaProvider) {
return ((CatalogdMetaProvider) primaryProvider).updateCatalogCache(req);
}
}
return null;
}
private static List<MetaProvider> getSecondaryProviders(File catalogConfigDir)
throws ImpalaRuntimeException {
ConfigLoader loader = new ConfigLoader(catalogConfigDir);
List<MetaProvider> list = new ArrayList<>();
for (Properties properties : loader.loadConfigs()) {
try {
IcebergMetaProvider icebergMetaProvider = new IcebergMetaProvider(properties);
list.add(icebergMetaProvider);
} catch (RESTException e) {
LOG.error(String.format(
"Unable to instantiate IcebergMetaProvider from the following "
+ "properties: %s", properties), e);
}
}
return list;
}
}

View File

@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
/**
* Implementation which returns a provided catalog instance, used by tests. No updates
* from the statestore are permitted.
*/
class TestImpl extends FeCatalogManager {
private final FeCatalog catalog_;
TestImpl(FeCatalog catalog) {
catalog_ = catalog;
}
@Override
public FeCatalog getOrCreateCatalog() {
return catalog_;
}
@Override
public TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
throw new IllegalStateException(
"Unexpected call to updateCatalogCache() with a test catalog instance");
}
}

View File

@@ -59,7 +59,7 @@ import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
import org.apache.impala.service.CompilerFactory;
import org.apache.impala.service.CompilerFactoryImpl;
import org.apache.impala.service.FeCatalogManager;
import org.apache.impala.service.catalogmanager.FeCatalogManager;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.FrontendProfile;
import org.apache.impala.testutil.ImpaladTestCatalog;

View File

@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service.catalogmanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.impala.common.ImpalaRuntimeException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class ConfigLoaderTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private File tempDir;
@Before
public void setUp() throws Exception {
tempDir = tempFolder.newFolder("config");
}
private File createConfigFile(String fileName, String content) throws IOException {
File configFile = new File(tempDir, fileName);
try (FileWriter writer = new FileWriter(configFile)) {
writer.write(content);
}
return configFile;
}
@Test
public void testLoadValidConfigs() throws Exception {
createConfigFile("valid1.properties",
"connector.name=iceberg\niceberg.catalog.type=rest\n");
createConfigFile("valid2.properties",
"connector.name=iceberg\niceberg.catalog.type=rest\n");
ConfigLoader loader = new ConfigLoader(tempDir);
List<Properties> configs = loader.loadConfigs();
assertEquals(2, configs.size());
assertEquals("iceberg", configs.get(0).getProperty("connector.name"));
assertEquals("rest", configs.get(0).getProperty("iceberg.catalog.type"));
}
@Test
public void testMissingConnectorNameThrows() throws IOException {
createConfigFile("bad.properties", "iceberg.catalog.type=rest\n");
ConfigLoader loader = new ConfigLoader(tempDir);
try {
loader.loadConfigs();
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("connector.name"));
} catch (ImpalaRuntimeException e) {
fail("Expected IllegalStateException");
}
}
@Test
public void testIncorrectCatalogTypeThrows() throws IOException {
createConfigFile("bad.properties",
"connector.name=iceberg\niceberg.catalog.type=hive\n");
ConfigLoader loader = new ConfigLoader(tempDir);
try {
loader.loadConfigs();
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Expected value of 'iceberg.catalog.type'"));
} catch (ImpalaRuntimeException e) {
fail("Expected IllegalStateException");
}
}
@Test
public void testUnreadableFileThrows() throws IOException {
File unreadableFile = createConfigFile("unreadable.properties",
"connector.name=iceberg\niceberg.catalog.type=rest\n");
// Make the file unreadable (on Unix-like systems)
assertTrue(unreadableFile.setReadable(false));
ConfigLoader loader = new ConfigLoader(tempDir);
try {
loader.loadConfigs();
fail("Expected ImpalaRuntimeException");
} catch (ImpalaRuntimeException e) {
assertTrue(e.getMessage().contains("Unable to read file"));
}
}
@Test
public void testEmptyDirectoryReturnsEmptyList() throws Exception {
ConfigLoader loader = new ConfigLoader(tempDir);
List<Properties> configs = loader.loadConfigs();
assertTrue(configs.isEmpty());
}
}

View File

@@ -85,6 +85,12 @@ under the License.
<version>${iceberg.version}</version>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
<build>

View File

@@ -23,14 +23,19 @@
// switch to an open-source Iceberg REST Catalog.
package org.apache.iceberg.rest;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
@@ -40,31 +45,46 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergRestCatalogTest {
private static final Logger LOG = LoggerFactory.getLogger(IcebergRestCatalogTest.class);
private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
static final int REST_PORT = 9084;
private static final int DEFAULT_REST_PORT = 9084;
private static final String DEFAULT_HADOOP_CATALOG_LOCATION =
"/test-warehouse/iceberg_test/hadoop_catalog";
private static final String USAGE_PREFIX = "java -jar your-iceberg-rest-catalog.jar";
private static final String CATALOG_LOCATION_LONGOPT = "catalog-location";
private static final String PORT_LONGOPT = "port";
private static final String HELP_LONGOPT = "help";
private Server httpServer;
private final int port;
private final String warehouseLocation;
public IcebergRestCatalogTest() {}
private static String getWarehouseLocation() {
String FILESYSTEM_PREFIX = System.getenv("FILESYSTEM_PREFIX");
String HADOOP_CATALOG_LOCATION = "/test-warehouse/iceberg_test/hadoop_catalog";
if (FILESYSTEM_PREFIX != null && !FILESYSTEM_PREFIX.isEmpty()) {
return FILESYSTEM_PREFIX + HADOOP_CATALOG_LOCATION;
}
String DEFAULT_FS = System.getenv("DEFAULT_FS");
return DEFAULT_FS + HADOOP_CATALOG_LOCATION;
public IcebergRestCatalogTest(int port, String warehouseLocation) {
this.port = port;
this.warehouseLocation = warehouseLocation;
}
private Catalog initializeBackendCatalog() throws IOException {
private String getWarehouseLocation() {
String filesystemPrefix = System.getenv("FILESYSTEM_PREFIX");
if (filesystemPrefix != null && !filesystemPrefix.isEmpty()) {
return filesystemPrefix + warehouseLocation;
}
String defaultFs = System.getenv("DEFAULT_FS");
return defaultFs + warehouseLocation;
}
private Catalog initializeBackendCatalog() {
Configuration conf = new Configuration();
conf.set("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO");
LOG.info("Default filesystem configured for this Iceberg REST Catalog is " +
conf.get("fs.defaultFS"));
return new HadoopCatalog(conf, getWarehouseLocation());
String actualWarehouseLocation = getWarehouseLocation();
LOG.info("Initializing Hadoop Catalog at: {}", actualWarehouseLocation);
String defaultFs = conf.get("fs.defaultFS");
LOG.info("Default filesystem configured for this Iceberg REST Catalog is {}",
defaultFs);
return new HadoopCatalog(conf, actualWarehouseLocation);
}
public void start(boolean join) throws Exception {
@@ -83,8 +103,7 @@ public class IcebergRestCatalogTest {
T response =
super.execute(
method, path, queryParams, request, responseType, headers, errorHandler);
T responseAfterSerialization = roundTripSerialize(response, "response");
return responseAfterSerialization;
return roundTripSerialize(response, "response");
}
};
@@ -95,9 +114,10 @@ public class IcebergRestCatalogTest {
context.addServlet(servletHolder, "/*");
context.insertHandler(new GzipHandler());
this.httpServer = new Server(REST_PORT);
this.httpServer = new Server(port);
httpServer.setHandler(context);
httpServer.start();
LOG.info("Iceberg REST Catalog started on port: {}", port);
if (join) {
httpServer.join();
@@ -107,33 +127,84 @@ public class IcebergRestCatalogTest {
public void stop() throws Exception {
if (httpServer != null) {
httpServer.stop();
LOG.info("Iceberg REST Catalog stopped.");
}
}
public static void main(String[] args) throws Exception {
new IcebergRestCatalogTest().start(true);
Options options = new Options();
options.addOption(new Option(null, PORT_LONGOPT, true,
"Port for the REST catalog server (default: " + DEFAULT_REST_PORT + ")"));
options.addOption(new Option(null, CATALOG_LOCATION_LONGOPT, true,
"Base location for the Hadoop catalog (default: "
+ DEFAULT_HADOOP_CATALOG_LOCATION + ")"));
options.addOption(new Option(null, HELP_LONGOPT, false, "Display this help message"));
CommandLineParser parser = new BasicParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd;
int port = DEFAULT_REST_PORT;
String catalogLocation = DEFAULT_HADOOP_CATALOG_LOCATION;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
LOG.error("Error: {}", e.getMessage());
formatter.printHelp(USAGE_PREFIX, options);
System.exit(1);
return;
}
if (cmd.hasOption(HELP_LONGOPT)) {
formatter.printHelp(USAGE_PREFIX, options);
System.exit(0);
}
if (cmd.hasOption(PORT_LONGOPT)) {
try {
port = Integer.parseInt(cmd.getOptionValue(PORT_LONGOPT));
} catch (NumberFormatException e) {
LOG.error("Error: --port requires a valid integer value. Got: {}",
cmd.getOptionValue(PORT_LONGOPT));
formatter.printHelp(USAGE_PREFIX, options);
System.exit(1);
}
}
if (cmd.hasOption(CATALOG_LOCATION_LONGOPT)) {
catalogLocation = cmd.getOptionValue(CATALOG_LOCATION_LONGOPT);
}
new IcebergRestCatalogTest(port, catalogLocation).start(true);
}
public static <T> T roundTripSerialize(T payload, String description) {
if (payload != null) {
LOG.trace(payload.toString());
try {
if (payload instanceof RESTMessage) {
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), payload.getClass());
} else {
// use Map so that Jackson doesn't try to instantiate ImmutableMap
// from payload.getClass()
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), Map.class);
}
} catch (Exception e) {
LOG.warn(e.toString());
throw new RuntimeException(
String.format("Failed to serialize and deserialize %s: %s",
description, payload), e);
}
}
return null;
}
if (payload != null) {
if (LOG.isTraceEnabled()) {
LOG.trace(payload.toString());
}
try {
if (payload instanceof RESTMessage) {
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), payload.getClass());
} else {
// use Map so that Jackson doesn't try to instantiate ImmutableMap
// from payload.getClass()
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), Map.class);
}
} catch (Exception e) {
LOG.warn(e.toString());
throw new RuntimeException(
String.format("Failed to serialize and deserialize %s: %s",
description, payload), e);
}
}
return null;
}
}

View File

@@ -31,5 +31,5 @@ fi
CLASSPATH=$(cat $CP_FILE):"$CLASSPATH"
java -cp java/iceberg-rest-catalog-test/target/impala-iceberg-rest-catalog-test-${IMPALA_VERSION}.jar:$CLASSPATH \
org.apache.iceberg.rest.IcebergRestCatalogTest
org.apache.iceberg.rest.IcebergRestCatalogTest $@

View File

@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://localhost:9084

View File

@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://localhost:9085

View File

@@ -3390,6 +3390,20 @@ hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/a
---- DATASET
functional
---- BASE_TABLE_NAME
airports_parquet_alternative
---- CREATE
CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
STORED AS ICEBERG
TBLPROPERTIES('write.format.default'='parquet', 'iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='/test-warehouse/iceberg_test/secondary_hadoop_catalog',
'iceberg.table_identifier'='berg.airports_parquet');
---- DEPENDENT_LOAD
`hadoop fs -mkdir -p /test-warehouse/iceberg_test/secondary_hadoop_catalog/berg && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/airports_parquet /test-warehouse/iceberg_test/secondary_hadoop_catalog/berg
====
---- DATASET
functional
---- BASE_TABLE_NAME
iceberg_resolution_test_external
---- CREATE
CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}

View File

@@ -66,6 +66,7 @@ table_name:hudi_as_parquet, constraint:restrict_to, table_format:parquet/none/no
# Iceberg tests are executed in the PARQUET file format dimension
table_name:airports_orc, constraint:restrict_to, table_format:parquet/none/none
table_name:airports_parquet, constraint:restrict_to, table_format:parquet/none/none
table_name:airports_parquet_alternative, constraint:restrict_to, table_format:parquet/none/none
table_name:complextypestbl_iceberg_orc, constraint:restrict_to, table_format:parquet/none/none
table_name:hadoop_catalog_test_external, constraint:restrict_to, table_format:parquet/none/none
table_name:iceberg_int_partitioned, constraint:restrict_to, table_format:parquet/none/none
1 # Table level constraints:
66 table_name:iceberg_int_partitioned, constraint:restrict_to, table_format:parquet/none/none table_name:hadoop_catalog_test_external, constraint:restrict_to, table_format:parquet/none/none
67 table_name:iceberg_non_partitioned, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_int_partitioned, constraint:restrict_to, table_format:parquet/none/none
68 table_name:iceberg_partitioned, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_non_partitioned, constraint:restrict_to, table_format:parquet/none/none
69 table_name:iceberg_partitioned, constraint:restrict_to, table_format:parquet/none/none
70 table_name:iceberg_partitioned_orc_external, constraint:restrict_to, table_format:parquet/none/none
71 table_name:iceberg_partition_transforms_zorder, constraint:restrict_to, table_format:parquet/none/none
72 table_name:iceberg_resolution_test_external, constraint:restrict_to, table_format:parquet/none/none

View File

@@ -0,0 +1,45 @@
====
---- QUERY
SHOW DATABASES like "ice";
---- RESULTS: VERIFY_IS_SUBSET
'ice',''
---- TYPES
STRING, STRING
====
---- QUERY
SHOW DATABASES like "functional_parquet";
---- RESULTS: VERIFY_IS_SUBSET
'functional_parquet',''
---- TYPES
STRING, STRING
====
---- QUERY
USE ice;
====
---- QUERY
SELECT lat FROM airports_parquet WHERE iata = '00R';
---- RESULTS
30.68586111
---- TYPES
DOUBLE
====
---- QUERY
SELECT * FROM functional_parquet.alltypes WHERE id = 3650
---- RESULTS
3650,true,0,0,0,0,0,0,'01/01/10','0',2010-01-01 00:00:00,2010,1
---- TYPES
INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
====
---- QUERY
SELECT p_float, string_col from ice.iceberg_alltypes_part ice_alltypes INNER JOIN functional_parquet.alltypes alltypes ON alltypes.id = ice_alltypes.i
---- RESULTS
1.100000023841858,'2'
1.100000023841858,'1'
---- TYPES
FLOAT, STRING
====
---- QUERY
INSERT INTO ice.iceberg_alltypes_part(i) VALUES (567)
---- CATCH
AnalysisException: Write not supported. Table ice.iceberg_alltypes_part access type is: READONLY
====

View File

@@ -0,0 +1,6 @@
====
---- QUERY
select * from ice.iceberg_v2_positional_not_all_data_files_have_delete_files;
---- CATCH
TException: Ambiguous table name
====

View File

@@ -0,0 +1,40 @@
====
---- QUERY
SHOW DATABASES LIKE "ice";
---- RESULTS: VERIFY_IS_SUBSET
'ice',''
---- TYPES
STRING, STRING
====
---- QUERY
SHOW DATABASES LIKE "berg";
---- RESULTS: VERIFY_IS_SUBSET
'berg',''
---- TYPES
STRING, STRING
====
---- QUERY
USE ice;
====
---- QUERY
SELECT lat FROM berg.airports_parquet WHERE iata = '00R';
---- RESULTS
30.68586111
---- TYPES
DOUBLE
====
---- QUERY
SELECT ice_air.iata, ice_air.lat, berg_air.lon
FROM ice.airports_parquet ice_air
INNER JOIN berg.airports_parquet berg_air
ON berg_air.iata = ice_air.iata AND berg_air.iata = "00M"
---- RESULTS
'00M',31.95376472,-89.2345047
---- TYPES
STRING, DOUBLE, DOUBLE
====
---- QUERY
INSERT INTO berg.airports_parquet(lat) VALUES (32.1)
---- CATCH
AnalysisException: Write not supported. Table berg.airports_parquet access type is: READONLY
====

View File

@@ -24,8 +24,8 @@ import socket
import sys
import time
REST_SERVER_PORT = 9084
IMPALA_HOME = os.environ['IMPALA_HOME']
IMPALA_CLUSTER_LOGS = os.environ['IMPALA_CLUSTER_LOGS_DIR']
LOG = logging.getLogger('impala_test_suite')
@@ -34,36 +34,62 @@ class IcebergRestServer(object):
Utility class for starting and stopping our minimal Iceberg REST server.
"""
def start_rest_server(self, timeout_s):
self.process = subprocess.Popen('testdata/bin/run-iceberg-rest-server.sh',
stdout=sys.stdout, stderr=sys.stderr, shell=True,
DEFAULT_REST_SERVER_PORT = 9084
DEFAULT_CATALOG_LOCATION = "/test-warehouse/iceberg_test/hadoop_catalog"
LOG_PATTERN = "%s/iceberg-rest-server.%d.%s"
def __init__(self, port=DEFAULT_REST_SERVER_PORT,
catalog_location=DEFAULT_CATALOG_LOCATION):
self.port = port
self.catalog_location = catalog_location
self.process = None
self.stdout = None
self.stderr = None
def start_rest_server(self, timeout_s=60):
start_time = time.strftime("%Y%m%d-%H%M%S")
log_pattern = self.LOG_PATTERN % (IMPALA_CLUSTER_LOGS, self.port, start_time)
self.stdout = open(log_pattern + '.stdout', 'w')
self.stderr = open(log_pattern + '.stderr', 'w')
self.process = subprocess.Popen(
['testdata/bin/run-iceberg-rest-server.sh', "--port",
str(self.port), "--catalog-location", self.catalog_location],
stdout=self.stdout, stderr=self.stderr,
preexec_fn=os.setsid, cwd=IMPALA_HOME)
self._wait_for_rest_server_to_start(timeout_s)
def stop_rest_server(self, timeout_s):
if self.process:
os.killpg(self.process.pid, signal.SIGTERM)
self._wait_for_rest_server_to_be_killed(timeout_s)
def stop_rest_server(self, timeout_s=60):
try:
if self.process:
os.killpg(self.process.pid, signal.SIGTERM)
self._wait_for_rest_server_to_be_killed(timeout_s)
except Exception as e:
LOG.error("An error occurred while stopping the Iceberg REST server: %s", e)
finally:
if self.stdout:
self.stdout.close()
if self.stderr:
self.stderr.close()
def _wait_for_rest_server_to_start(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
if s.connect_ex(('localhost', self.port)) == 0:
LOG.info("Iceberg REST server is available.")
return
s.close()
time.sleep(sleep_interval_s)
raise Exception(
"Webserver did not become available within {} seconds.".format(timeout_s))
raise Exception("Webserver did not become available within {} "
"seconds.".format(timeout_s))
def _wait_for_rest_server_to_be_killed(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex(('localhost', REST_SERVER_PORT)) != 0:
if s.connect_ex(('localhost', self.port)) != 0:
LOG.info("Iceberg REST server has stopped.")
return
s.close()

View File

@@ -22,16 +22,102 @@ import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite, HIVE_CONF_DIR
from tests.common.iceberg_rest_server import IcebergRestServer
REST_SERVER_PORT = 9084
IMPALA_HOME = os.environ['IMPALA_HOME']
START_ARGS = 'start_args'
IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
NO_CATALOGD_STARTARGS = '--no_catalogd'
REST_STANDALONE_IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
.format(IMPALA_HOME)
MULTICATALOG_IMPALAD_ARGS = """--use_local_catalog=true
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
.format(IMPALA_HOME)
MULTIPLE_REST_IMPALAD_ARGS = """--use_local_catalog=true
--catalog_config_dir={}/testdata/configs/catalog_configs/multicatalog_rest_config"""\
.format(IMPALA_HOME)
MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS = """--use_local_catalog=true
--catalogd_deployed=false \
--catalog_config_dir={}/testdata/configs/catalog_configs/multicatalog_rest_config"""\
.format(IMPALA_HOME)
MULTICATALOG_CATALOGD_ARGS = "--catalog_topic_mode=minimal"
class TestIcebergRestCatalog(CustomClusterTestSuite):
"""Test suite for Iceberg REST Catalog."""
def RestServerProperties(*server_configs):
"""
Annotation to specify configurations for multiple REST servers to be started.
Each argument should be a dictionary with 'port' and 'catalog_location' keys.
Example:
@RestServerProperties({'port': 9085}, {'port': 9086, 'catalog_location': '/tmp/cat2'})
"""
def decorator(func):
func.rest_server_configs = list(server_configs)
return func
return decorator
class IcebergRestCatalogTests(CustomClusterTestSuite):
"""Base class for Iceberg REST Catalog tests."""
def setup_method(self, method):
args = method.__dict__
if HIVE_CONF_DIR in args:
raise Exception("Cannot specify HIVE_CONF_DIR because the tests of this class are "
"running without Hive.")
self.servers = []
server_configs = getattr(method, 'rest_server_configs', None)
if server_configs:
for config in server_configs:
port = config.get('port', IcebergRestServer.DEFAULT_REST_SERVER_PORT)
catalog_location = config.get('catalog_location',
IcebergRestServer.DEFAULT_CATALOG_LOCATION)
print("Starting REST server with annotation properties: "
"Port=%s, Catalog Location=%s" % (port, catalog_location))
self.servers.append(IcebergRestServer(port, catalog_location))
try:
for server in self.servers:
server.start_rest_server(300)
super(IcebergRestCatalogTests, self).setup_method(method)
# At this point we can create the Impala clients that we will need.
self.create_impala_clients()
except Exception as e:
for server in self.servers:
server.stop_rest_server(10)
raise e
def teardown_method(self, method):
for server in self.servers:
server.stop_rest_server()
super(IcebergRestCatalogTests, self).teardown_method(method)
class TestIcebergRestCatalogWithHms(IcebergRestCatalogTests):
"""Test suite for Iceberg REST Catalog. HMS running while tests are running"""
@RestServerProperties({'port': 9084})
@CustomClusterTestSuite.with_args(
impalad_args=MULTICATALOG_IMPALAD_ARGS,
catalogd_args=MULTICATALOG_CATALOGD_ARGS)
@pytest.mark.execute_serially
def test_rest_catalog_multicatalog(self, vector):
self.run_test_case('QueryTest/iceberg-multicatalog',
vector, use_db="ice")
@RestServerProperties(
{'port': 9084},
{'port': 9085,
'catalog_location': '/test-warehouse/iceberg_test/secondary_hadoop_catalog'}
)
@CustomClusterTestSuite.with_args(
impalad_args=MULTIPLE_REST_IMPALAD_ARGS,
catalogd_args=MULTICATALOG_CATALOGD_ARGS)
@pytest.mark.execute_serially
def test_multiple_rest_catalogs(self, vector):
self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs',
vector, use_db="ice")
class TestIcebergRestCatalogNoHms(IcebergRestCatalogTests):
"""Test suite for Iceberg REST Catalog. HMS is stopped while tests are running"""
@classmethod
def need_default_clients(cls):
@@ -40,13 +126,7 @@ class TestIcebergRestCatalog(CustomClusterTestSuite):
@classmethod
def setup_class(cls):
super(TestIcebergRestCatalog, cls).setup_class()
try:
cls.iceberg_rest_server = IcebergRestServer()
cls.iceberg_rest_server.start_rest_server(300)
except Exception as e:
cls.iceberg_rest_server.stop_rest_server(10)
raise e
super(TestIcebergRestCatalogNoHms, cls).setup_class()
try:
cls._stop_hive_service()
@@ -57,29 +137,41 @@ class TestIcebergRestCatalog(CustomClusterTestSuite):
@classmethod
def teardown_class(cls):
cls.cleanup_infra_services()
return super(TestIcebergRestCatalog, cls).teardown_class()
return super(TestIcebergRestCatalogNoHms, cls).teardown_class()
@classmethod
def cleanup_infra_services(cls):
cls.iceberg_rest_server.stop_rest_server(10)
cls._start_hive_service(None)
def setup_method(self, method):
args = method.__dict__
if HIVE_CONF_DIR in args:
raise Exception("Cannot specify HIVE_CONF_DIR because the tests of this class are "
"running without Hive.")
# Invoke start-impala-cluster.py with '--no_catalogd'
start_args = "--no_catalogd"
if START_ARGS in args:
start_args = args[START_ARGS] + " " + start_args
args[START_ARGS] = start_args
super(TestIcebergRestCatalog, self).setup_method(method)
# At this point we can create the Impala clients that we will need.
self.create_impala_clients()
@CustomClusterTestSuite.with_args(impalad_args=IMPALAD_ARGS)
@RestServerProperties({'port': 9084})
@CustomClusterTestSuite.with_args(
impalad_args=REST_STANDALONE_IMPALAD_ARGS,
start_args=NO_CATALOGD_STARTARGS)
@pytest.mark.execute_serially
def test_rest_catalog_basic(self, vector):
self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")
@RestServerProperties(
{'port': 9084},
{'port': 9085,
'catalog_location': '/test-warehouse/iceberg_test/secondary_hadoop_catalog'}
)
@CustomClusterTestSuite.with_args(
impalad_args=MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS,
start_args=NO_CATALOGD_STARTARGS)
@pytest.mark.execute_serially
def test_multiple_rest_catalogs_without_catalogd(self, vector):
self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs',
vector, use_db="ice")
@RestServerProperties(
{'port': 9084},
{'port': 9085}
)
@CustomClusterTestSuite.with_args(
impalad_args=MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS,
start_args=NO_CATALOGD_STARTARGS)
@pytest.mark.execute_serially
def test_multiple_rest_catalogs_with_ambiguous_tables(self, vector):
self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs-ambiguous-name',
vector, use_db="ice")