IMPALA-13586: Initial support for Iceberg REST Catalogs

This patch adds initial support for Iceberg REST Catalogs. This means
now it's possible to run an Impala cluster without the Hive Metastore,
and without the Impala CatalogD. Impala Coordinators can directly
connect to an Iceberg REST server and fetch metadata for databases and
tables from there. The support is read-only, i.e. DDL and DML statements
are not supported yet.

This was initially developed in the context of a company Hackathon
program, i.e. it was a team effort that I squashed into a single commit
and polished the code a bit.

The Hackathon team members were:
* Daniel Becker
* Gabor Kaszab
* Kurt Deschler
* Peter Rozsa
* Zoltan Borok-Nagy

The Iceberg REST Catalog support can be configured via a Java properties
file, the location of it can be specified via:
 --catalog_config_dir: Directory of configuration files

Currently only one configuration file can be in the direcory as we only
support a single Catalog at a time. The following properties are mandatory
in the config file:
* connector.name=iceberg
* iceberg.catalog.type=rest
* iceberg.rest-catalog.uri

The first two properties can only be 'iceberg' and 'rest' for now, they
are needed for extensibility in the future.

Moreover, Impala Daemons need to specify the following flags to connect
to an Iceberg REST Catalog:
 --use_local_catalog=true
 --catalogd_deployed=false

Testing
* e2e added to test basic functionlity with against a custom-built
  Iceberg REST server that delegates to HadoopCatalog under the hood
* Further testing, e.g. Ranger tests are expected in subsequent
  commits

TODO:
* manual testing against Polaris / Lakekeeper, we could add automated
  tests in a later patch

Change-Id: I1722b898b568d2f5689002f2b9bef59320cb088c
Reviewed-on: http://gerrit.cloudera.org:8080/22353
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2024-12-20 14:48:11 +01:00
committed by Impala Public Jenkins
parent 99fc96adea
commit bd3486c051
34 changed files with 1508 additions and 28 deletions

View File

@@ -430,6 +430,9 @@ DEFINE_bool(iceberg_always_allow_merge_on_read_operations, false, "Impala can on
"executing DELETE, UPDATE and MERGE operations on Iceberg tables even if the table "
"property is 'copy-on-write'.");
DEFINE_string(catalog_config_dir, "", "Directory of configuration files of external "
"catalogs, e.g. Iceberg REST Catalog.");
// Host and port of Statestore Service
DEFINE_string(state_store_host, "localhost",
"hostname where StatestoreService is running");

View File

@@ -121,6 +121,7 @@ Frontend::Frontend() {
{"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
{"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
{"updateExecutorMembership", "([B)V", &update_membership_id_},
{"getCatalogInfo", "()[B", &get_catalog_info_},
{"getCatalogMetrics", "()[B", &get_catalog_metrics_id_},
{"getTableNames", "([B)[B", &get_table_names_id_},
{"getMetadataTableNames", "([B)[B", &get_metadata_table_names_id_},
@@ -258,6 +259,10 @@ Status Frontend::GetMetadataTableNames(const string& db, const string& table_nam
metadata_table_names);
}
Status Frontend::GetCatalogInfo(TGetCatalogInfoResult* catalog_info) {
return JniUtil::CallJniMethod(fe_, get_catalog_info_, catalog_info);
}
Status Frontend::GetDbs(const string* pattern, const TSessionState* session,
TGetDbsResult* dbs) {
TGetDbsParams params;

View File

@@ -89,6 +89,9 @@ class Frontend {
const string* pattern, const TSessionState* session,
TGetTablesResult* metadata_table_names);
/// Return list of catalog info strings
Status GetCatalogInfo(TGetCatalogInfoResult* catalog_info);
/// Return all databases matching the optional argument 'pattern'.
/// If pattern is NULL, match all databases otherwise match only those databases that
/// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
@@ -264,6 +267,7 @@ class Frontend {
jmethodID check_config_id_; // JniFrontend.checkConfiguration()
jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache(byte[][])
jmethodID update_membership_id_; // JniFrontend.updateExecutorMembership()
jmethodID get_catalog_info_; // JniFrontend.getCatalogInfo()
jmethodID get_catalog_metrics_id_; // JniFrontend.getCatalogMetrics()
jmethodID get_table_names_id_; // JniFrontend.getTableNames
jmethodID get_metadata_table_names_id_; // JniFrontend.getMetadataTableNames

View File

@@ -993,6 +993,24 @@ void ImpalaHttpHandler::CatalogHandler(const Webserver::WebRequest& req,
return;
}
TGetCatalogInfoResult catalog_info;
status = server_->exec_env_->frontend()->GetCatalogInfo(&catalog_info);
if (!status.ok()) {
Value error(status.GetDetail().c_str(), document->GetAllocator());
document->AddMember("error", error, document->GetAllocator());
return;
}
Value info(kArrayType);
for (const string& str: catalog_info.info) {
Value str_val(str.c_str(), document->GetAllocator());
Value value(kObjectType);
value.AddMember("value", str_val, document->GetAllocator());
info.PushBack(value, document->GetAllocator());
}
document->AddMember("info", info, document->GetAllocator());
Value databases(kArrayType);
for (const TDatabase& db: get_dbs_result.dbs) {
Value database(kObjectType);

View File

@@ -291,6 +291,10 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
"queries from clients. If false, it will refuse client connections.");
DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
"fragments.");
DEFINE_bool(catalogd_deployed, true, "If false, Impala daemon doesn't expect Catalog "
"Daemon to be present.");
DEFINE_string(executor_groups, "",
"List of executor groups, separated by comma. Each executor group specification can "
"optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
@@ -576,7 +580,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
// Register the catalog update callback if running in a real cluster as a coordinator.
if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && FLAGS_is_coordinator) {
if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && FLAGS_is_coordinator &&
FLAGS_catalogd_deployed) {
auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) {
CatalogUpdateCallback(state, topic_updates);
@@ -1695,6 +1700,7 @@ void ImpalaServer::CloseClientRequestState(const QueryHandle& query_handle) {
}
Status ImpalaServer::UpdateCatalogMetrics() {
if (!FLAGS_catalogd_deployed) return Status::OK();
TGetCatalogMetricsResult metrics;
RETURN_IF_ERROR(exec_env_->frontend()->GetCatalogMetrics(&metrics));
ImpaladMetrics::CATALOG_NUM_DBS->SetValue(metrics.num_dbs);
@@ -2233,8 +2239,7 @@ bool ImpalaServer::IsAuthorizedProxyUser(const string& user) {
!= authorized_proxy_group_config_.end();
}
void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
{
void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics() {
ImpaladMetrics::CATALOG_VERSION->SetValue(catalog_version);
ImpaladMetrics::CATALOG_OBJECT_VERSION_LOWER_BOUND->SetValue(
catalog_object_version_lower_bound);
@@ -2245,6 +2250,7 @@ void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
void ImpalaServer::CatalogUpdateCallback(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
DCHECK(FLAGS_catalogd_deployed);
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
if (topic == incoming_topic_deltas.end()) return;
@@ -2304,6 +2310,7 @@ static inline void MarkTimelineEvent(RuntimeProfile::EventSequence* timeline,
void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* timeline) {
DCHECK(FLAGS_catalogd_deployed);
unique_lock<mutex> unique_lock(catalog_version_lock_);
// Wait for the update to be processed locally.
VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
@@ -3138,6 +3145,12 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
kudu::Version target_schema_version;
if (FLAGS_enable_workload_mgmt && !FLAGS_catalogd_deployed) {
// TODO(IMPALA-13830): Enable workload management in lightweight deployments.
return Status("Workload management needs CatalogD to be deployed.");
}
if (FLAGS_is_coordinator) {
if (FLAGS_enable_workload_mgmt) {
ABORT_IF_ERROR(workloadmgmt::ParseSchemaVersionFlag(&target_schema_version));

View File

@@ -114,6 +114,8 @@ DECLARE_bool(enable_json_scanner);
DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
DECLARE_bool(iceberg_always_allow_merge_on_read_operations);
DECLARE_bool(enable_reading_puffin_stats);
DECLARE_bool(catalogd_deployed);
DECLARE_string(catalog_config_dir);
DECLARE_int32(catalog_operation_log_size);
DECLARE_string(hostname);
DECLARE_bool(allow_catalog_cache_op_from_masked_users);
@@ -478,6 +480,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
FLAGS_iceberg_always_allow_merge_on_read_operations);
cfg.__set_enable_reading_puffin_stats(
FLAGS_enable_reading_puffin_stats);
cfg.__set_catalogd_deployed(FLAGS_catalogd_deployed);
cfg.__set_catalog_config_dir(FLAGS_catalog_config_dir);
cfg.__set_max_filter_error_rate_from_full_scan(
FLAGS_max_filter_error_rate_from_full_scan);
cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size);

View File

@@ -169,6 +169,9 @@ parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha",
action="store_true", default=False,
help="If true, enables CatalogD HA - the cluster will be launched "
"with two catalogd instances as Active-Passive HA pair.")
parser.add_option("--no_catalogd", dest="no_catalogd",
action="store_true", default=False,
help="If true, there will be no CatalogD.")
parser.add_option("--jni_frontend_class", dest="jni_frontend_class",
action="store", default="",
help="Use a custom java frontend interface.")
@@ -753,7 +756,8 @@ class MiniClusterOperations(object):
"""
def get_cluster(self):
"""Return an ImpalaCluster instance."""
return ImpalaCluster(use_admission_service=options.enable_admission_service)
return ImpalaCluster(use_admission_service=options.enable_admission_service,
deploy_catalogd=not options.no_catalogd)
def kill_all_daemons(self, force=False):
kill_matching_processes(["catalogd", "impalad", "statestored", "admissiond"], force)
@@ -789,7 +793,9 @@ class MiniClusterOperations(object):
" for more details.")
def start_catalogd(self):
if options.enable_catalogd_ha:
if options.no_catalogd:
num_catalogd = 0
elif options.enable_catalogd_ha:
num_catalogd = 2
else:
num_catalogd = 1

View File

@@ -318,4 +318,8 @@ struct TBackendGflags {
143: required string injected_group_members_debug_only
144: required i32 hms_event_sync_sleep_interval_ms
145: required bool catalogd_deployed
146: required string catalog_config_dir
}

View File

@@ -104,6 +104,11 @@ struct TGetTablesResult {
1: list<string> tables
}
// getCatalogInfo returns a list of catalog info strings
struct TGetCatalogInfoResult {
1: list<string> info
}
// Arguments to getTableMetrics, which returns the metrics of a specific table.
struct TGetTableMetricsParams {
1: required CatalogObjects.TTableName table_name

View File

@@ -376,8 +376,10 @@ public interface FeIcebergTable extends FeFsTable {
if (getTTableStats().getNum_rows() < 0) {
getTTableStats().setNum_rows(Utils.calculateNumRows(this));
}
if (getTTableStats().getTotal_file_bytes() <= 0) {
getTTableStats().setTotal_file_bytes(Utils.calculateFileSizeInBytes(this));
}
}
static void setIcebergStorageDescriptor(
org.apache.hadoop.hive.metastore.api.Table hmsTable) {

View File

@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.iceberg;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergTableLoadingException;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.util.IcebergUtil;
import com.google.common.collect.ImmutableMap;
/**
* Implementation of IcebergCatalog for tables stored in HadoopCatalog.
*/
public class IcebergRESTCatalog implements IcebergCatalog {
private static final String KEY_URI = "iceberg.rest-catalog.uri";
private static final String KEY_NAME = "iceberg.rest-catalog.name";
private static final String KEY_CLIENT_ID = "iceberg.rest-catalog.client-id";
private static final String KEY_CLIENT_SECRET = "iceberg.rest-catalog.client-secret";
private static final String KEY_WAREHOUSE = "iceberg.rest-catalog.warehouse";
private final String REST_URI;
private static IcebergRESTCatalog instance_;
private final RESTCatalog restCatalog_;
public synchronized static IcebergRESTCatalog getInstance(
Properties properties) {
if (instance_ == null) {
instance_ = new IcebergRESTCatalog(properties);
}
return instance_;
}
private IcebergRESTCatalog(Properties properties) {
setContextClassLoader();
REST_URI = getRequiredProperty(properties, KEY_URI);
final String CATALOG_NAME = properties.getProperty(KEY_NAME, "");
final String CLIENT_ID = properties.getProperty(KEY_CLIENT_ID, "impala");
final String CLIENT_SECRET = properties.getProperty(KEY_CLIENT_SECRET, "");
final String CLIENT_CREDS = CLIENT_ID + ":" + CLIENT_SECRET;
final String WAREHOUSE_LOCATION = properties.getProperty(KEY_WAREHOUSE, "");
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", CLIENT_CREDS),
ImmutableMap.of());
restCatalog_ = new RESTCatalog(context,
(config) -> HTTPClient.builder(config).uri(REST_URI).build());
HiveConf conf = new HiveConf(IcebergRESTCatalog.class);
restCatalog_.setConf(conf);
restCatalog_.initialize(
CATALOG_NAME,
ImmutableMap.of(
CatalogProperties.URI, REST_URI,
"credential", CLIENT_CREDS,
CatalogProperties.WAREHOUSE_LOCATION, WAREHOUSE_LOCATION)
);
}
private String getRequiredProperty(Properties properties, String key) {
String value = properties.getProperty(key);
if (value == null) {
throw new IllegalStateException(
String.format("Missing property of IcebergRESTCatalog: %s", key));
}
return value;
}
public String getUri() {
return REST_URI;
}
@Override
public Table createTable(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
throw new UnsupportedOperationException(
"CREATE TABLE is not implemented for REST catalog");
}
public ImmutableList<String> listNamespaces() {
ImmutableList.Builder<String> ret = ImmutableList.builder();
for (Namespace ns : restCatalog_.listNamespaces()) {
ret.add(ns.toString());
}
return ret.build();
}
public List<TableIdentifier> listTables(String namespace) {
return restCatalog_.listTables(Namespace.of(namespace));
}
@Override
public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
return loadTable(tableId, null, null);
}
@Override
public Table loadTable(TableIdentifier tableId, String tableLocation,
Map<String, String> properties) throws IcebergTableLoadingException {
return restCatalog_.loadTable(tableId);
}
@Override
public boolean dropTable(FeIcebergTable feTable, boolean purge) {
throw new UnsupportedOperationException(
"DROP TABLE is not implemented for REST catalog");
}
@Override
public boolean dropTable(String dbName, String tblName, boolean purge) {
throw new UnsupportedOperationException(
"DROP TABLE is not implemented for REST catalog");
}
@Override
public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
throw new UnsupportedOperationException(
"RENAME TABLE is not implemented for REST catalog");
}
}

View File

@@ -396,6 +396,10 @@ public class CatalogdMetaProvider implements MetaProvider {
.build();
}
public String getURI() {
return "Catalogd (URI TODO)";
}
public CacheStats getCacheStats() {
return cache_.stats();
}

View File

@@ -85,6 +85,10 @@ class DirectMetaProvider implements MetaProvider {
initMsClientPool();
}
public String getURI() {
return "HMS (URI TODO)";
}
private static synchronized void initMsClientPool() {
// Lazy-init the metastore client pool based on the backend configuration.
// TODO(todd): this should probably be a process-wide singleton.

View File

@@ -0,0 +1,600 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.local;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types;
import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.DataSource;
import org.apache.impala.catalog.FileDescriptor;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HdfsCachePool;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
import org.apache.impala.catalog.HdfsStorageDescriptor;
import org.apache.impala.catalog.IcebergContentFileStore;
import org.apache.impala.catalog.IcebergFileMetadataLoader;
import org.apache.impala.catalog.IcebergTableLoadingException;
import org.apache.impala.catalog.PuffinStatsLoader;
import org.apache.impala.catalog.SqlConstraints;
import org.apache.impala.catalog.VirtualColumn;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.catalog.iceberg.IcebergRESTCatalog;
import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TBriefTableMeta;
import org.apache.impala.thrift.TIcebergContentFileStore;
import org.apache.impala.thrift.TIcebergTable;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartialTableInfo;
import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.IcebergSchemaConverter;
import org.apache.impala.util.ListMap;
import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.Immutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.SnapshotSummary.TOTAL_FILE_SIZE_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READ;
public class IcebergMetaProvider implements MetaProvider {
private final static Logger LOG = LoggerFactory.getLogger(IcebergMetaProvider.class);
private AtomicReference<? extends AuthorizationChecker> authzChecker_;
private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
private IcebergRESTCatalog iceCatalog_;
Properties properties_;
public IcebergMetaProvider(Properties properties) {
properties_ = properties;
iceCatalog_ = initCatalog();
}
public String getURI() {
return "Iceberg REST (" + iceCatalog_.getUri() + ")";
}
private IcebergRESTCatalog initCatalog() {
return IcebergRESTCatalog.getInstance(properties_);
}
public void setAuthzChecker(
AtomicReference<? extends AuthorizationChecker> authzChecker) {
authzChecker_ = authzChecker;
}
@Override
public Iterable<HdfsCachePool> getHdfsCachePools() {
throw new UnsupportedOperationException(
"HDFSCachePools are not supported in IcebergMetaProvider");
}
@Override
public AuthorizationPolicy getAuthPolicy() {
return authPolicy_;
}
@Override
public boolean isReady() {
// Direct provider is always ready since we don't need to wait for
// an update from any external process.
return true;
}
@Override
public void waitForIsReady(long timeoutMs) {
// NOOP
}
@Override
public ImmutableList<String> loadDbList() throws TException {
return iceCatalog_.listNamespaces();
}
@Override
public Database loadDb(String dbName) throws TException {
Database db = new Database();
db.setName(dbName);
return db;
}
@Override
public ImmutableCollection<TBriefTableMeta> loadTableList(String dbName)
throws TException {
ImmutableList.Builder<TBriefTableMeta> ret = ImmutableList.builder();
Namespace ns = Namespace.of(dbName);
for (TableIdentifier tid : iceCatalog_.listTables(ns.toString())) {
try {
org.apache.iceberg.Table tbl = iceCatalog_.loadTable(tid, null, null);
TBriefTableMeta briefMeta = new TBriefTableMeta(getIcebergTableName(tbl));
briefMeta.setMsType("TABLE");
ret.add(briefMeta);
} catch (NoSuchTableException | IcebergTableLoadingException e) {
// Ignore tables that cannot be loaded.
LOG.error(e.toString());
}
}
return ret.build();
}
String getIcebergTableName(org.apache.iceberg.Table tbl) {
return tbl.name().substring(tbl.name().lastIndexOf('.') + 1);
}
@Override
public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String tblName) {
try {
return loadTable(dbName, tblName);
} catch (TException e) {
LOG.error("Failed to load table", e);
return null;
}
}
@Override
public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
throws TException {
try {
Table msTable = new Table();
msTable.setDbName(dbName);
Namespace ns = Namespace.of(dbName);
org.apache.iceberg.Table tbl = iceCatalog_.loadTable(
TableIdentifier.of(ns, tableName), null, null);
msTable.setTableName(getIcebergTableName(tbl));
msTable.setSd(createStorageDescriptor(tbl));
// Iceberg partitioning is not stored in HMS.
msTable.setPartitionKeys(Collections.emptyList());
msTable.setParameters(createTableProps(tbl));
msTable.setTableType(TableType.EXTERNAL_TABLE.toString());
// Only allow READONLY operations.
MetastoreShim.setTableAccessType(msTable, ACCESSTYPE_READ);
long loadingTime = System.currentTimeMillis();
TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, msTable, tbl,
loadingTime);
return Pair.create(msTable, ref);
} catch (ImpalaRuntimeException|IcebergTableLoadingException e) {
throw new IllegalStateException(
String.format("Error loading Iceberg table %s.%s", dbName, tableName), e);
}
}
private StorageDescriptor createStorageDescriptor(org.apache.iceberg.Table tbl)
throws ImpalaRuntimeException {
StorageDescriptor sd = new StorageDescriptor();
sd.setInputFormat(HdfsFileFormat.ICEBERG.inputFormat());
sd.setOutputFormat(HdfsFileFormat.ICEBERG.outputFormat());
sd.setSortCols(Collections.emptyList());
SerDeInfo serde = new SerDeInfo();
serde.setSerializationLib(HdfsFileFormat.ICEBERG.serializationLib());
serde.setParameters(Collections.emptyMap());
sd.setSerdeInfo(serde);
sd.setCols(IcebergSchemaConverter.convertToHiveSchema(tbl.schema()));
Path p = new Path(tbl.location());
sd.setLocation(FileSystemUtil.createFullyQualifiedPath(p).toString());
return sd;
}
private Map<String, String> createTableProps(org.apache.iceberg.Table tbl) {
Map<String, String> props = new HashMap<>(tbl.properties());
Snapshot currentSnapshot = tbl.currentSnapshot();
if (currentSnapshot != null) {
if (props.get(StatsSetupConst.ROW_COUNT) == null) {
props.put(StatsSetupConst.ROW_COUNT,
String.valueOf(currentSnapshot.summary().get(TOTAL_RECORDS_PROP)));
}
if (props.get(StatsSetupConst.TOTAL_SIZE) == null) {
props.put(StatsSetupConst.TOTAL_SIZE,
String.valueOf(currentSnapshot.summary().get(TOTAL_FILE_SIZE_PROP)));
}
}
return props;
}
@Override
public String loadNullPartitionKeyValue() throws MetaException, TException {
return "__HIVE_DEFAULT_PARTITION__";
}
@Override
public List<PartitionRef> loadPartitionList(TableMetaRef table)
throws MetaException, TException {
TableMetaRefImpl ref = (TableMetaRefImpl)table;
Preconditions.checkState(!ref.isPartitioned());
return ImmutableList.of(new PartitionRefImpl(PartitionRefImpl.UNPARTITIONED_NAME));
}
@Override
public SqlConstraints loadConstraints(
TableMetaRef table, Table msTbl) throws TException {
return null;
}
@Override
public Map<String, PartitionMetadata> loadPartitionsByRefs(
TableMetaRef table, List<String> partitionColumnNames,
ListMap<TNetworkAddress> hostIndex,
List<PartitionRef> partitionRefs) throws CatalogException, TException {
Map<String, PartitionMetadata> ret = new HashMap<>();
ret.put("", new PartitionMetadataImpl(((TableMetaRefImpl)table).msTable_));
return ret;
}
/**
* We model partitions slightly differently to Hive. So, in the case of an
* unpartitioned table, we have to create a fake Partition object which has the
* metadata of the table.
*/
private Map<String, PartitionMetadata> loadUnpartitionedPartition(
TableMetaRefImpl table, List<PartitionRef> partitionRefs,
ListMap<TNetworkAddress> hostIndex) throws CatalogException {
Map<String, PartitionMetadata> ret = new HashMap<>();
ret.put("", new PartitionMetadataImpl(((TableMetaRefImpl)table).msTable_));
return ret;
}
@Override
public List<String> loadFunctionNames(String dbName) throws TException {
throw new UnsupportedOperationException(
"Functions not supported by IcebergMetaProvider");
}
@Override
public ImmutableList<Function> loadFunction(String dbName, String functionName)
throws TException {
throw new UnsupportedOperationException(
"Functions not supported by IcebergMetaProvider");
}
@Override
public ImmutableList<DataSource> loadDataSources() throws TException {
throw new UnsupportedOperationException(
"DataSource not supported by IcebergMetaProvider");
}
@Override
public DataSource loadDataSource(String dsName) throws TException {
throw new UnsupportedOperationException(
"DataSource not supported by IcebergMetaProvider");
}
@Override
public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
List<String> colNames) throws TException {
Preconditions.checkArgument(table instanceof TableMetaRefImpl);
TableMetaRefImpl tblImpl = (TableMetaRefImpl) table;
org.apache.iceberg.Table iceTbl = tblImpl.iceApiTbl_;
Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinStats =
PuffinStatsLoader.loadPuffinStats(iceTbl, tblImpl.fullName(),
-1, Collections.emptySet());
List<ColumnStatisticsObj> res = new ArrayList<>();
for (String colName : colNames) {
org.apache.iceberg.types.Types.NestedField field =
iceTbl.schema().findField(colName);
int fieldId = field.fieldId();
PuffinStatsLoader.PuffinStatsRecord stats = puffinStats.get(fieldId);
if (stats != null) {
long ndv = stats.ndv;
LongColumnStatsData ndvData = new LongColumnStatsData();
ndvData.setNumDVs(ndv);
ColumnStatisticsData ndvColStatsData = createNdvColStatsData(field.type(), ndv);
ColumnStatisticsObj statsObj = new ColumnStatisticsObj(colName,
"" /* should be the type */, ndvColStatsData);
res.add(statsObj);
}
}
return res;
}
private ColumnStatisticsData createNdvColStatsData(
org.apache.iceberg.types.Type type, long ndv) {
if (type instanceof Types.BooleanType) {
// No NDV can be set for BooleanColumnStatsData.
BooleanColumnStatsData ndvData = new BooleanColumnStatsData();
return ColumnStatisticsData.booleanStats(ndvData);
} else if (type instanceof Types.IntegerType
|| type instanceof Types.LongType
|| type instanceof Types.TimestampType) {
LongColumnStatsData ndvData = new LongColumnStatsData();
ndvData.setNumDVs(ndv);
return ColumnStatisticsData.longStats(ndvData);
} else if (type instanceof Types.DateType) {
DateColumnStatsData ndvData = new DateColumnStatsData();
ndvData.setNumDVs(ndv);
return ColumnStatisticsData.dateStats(ndvData);
} else if (type instanceof Types.FloatType || type instanceof Types.DoubleType) {
DoubleColumnStatsData ndvData = new DoubleColumnStatsData();
ndvData.setNumDVs(ndv);
return ColumnStatisticsData.doubleStats(ndvData);
} else if (type instanceof Types.StringType) {
StringColumnStatsData ndvData = new StringColumnStatsData();
ndvData.setNumDVs(ndv);
return ColumnStatisticsData.stringStats(ndvData);
} else if (type instanceof Types.BinaryType) {
// No NDV can be set for BinaryColumnStatsData.
BinaryColumnStatsData ndvData = new BinaryColumnStatsData();
return ColumnStatisticsData.binaryStats(ndvData);
} else if (type instanceof Types.DecimalType) {
// No NDV can be set for DecimalColumnStatsData.
DecimalColumnStatsData ndvData = new DecimalColumnStatsData();
ndvData.setNumDVs(ndv);
return ColumnStatisticsData.decimalStats(ndvData);
} else {
return new ColumnStatisticsData();
}
}
@Immutable
private static class PartitionRefImpl implements PartitionRef {
private static final String UNPARTITIONED_NAME = "";
private final String name_;
public PartitionRefImpl(String name) {
this.name_ = name;
}
@Override
public String getName() {
return name_;
}
}
private static class PartitionMetadataImpl implements PartitionMetadata {
private final Table msTable_;
public PartitionMetadataImpl(Table msTable) {
this.msTable_ = msTable;
}
@Override
public Map<String, String> getHmsParameters() { return Collections.emptyMap(); }
@Override
public long getWriteId() {
return -1;
}
@Override
public HdfsStorageDescriptor getInputFormatDescriptor() {
String tblName = msTable_.getDbName() + "." + msTable_.getTableName();
try {
return HdfsStorageDescriptor.fromStorageDescriptor(tblName, msTable_.getSd());
} catch (HdfsStorageDescriptor.InvalidStorageDescriptorException e) {
throw new LocalCatalogException(String.format(
"Invalid input format descriptor for table %s", tblName), e);
}
}
@Override
public HdfsPartitionLocationCompressor.Location getLocation() {
return new HdfsPartitionLocationCompressor(0).new Location(
msTable_.getSd().getLocation());
}
@Override
public ImmutableList<FileDescriptor> getFileDescriptors() {
return ImmutableList.of();
}
@Override
public ImmutableList<FileDescriptor> getInsertFileDescriptors() {
return ImmutableList.of();
}
@Override
public ImmutableList<FileDescriptor> getDeleteFileDescriptors() {
return ImmutableList.of();
}
@Override
public boolean hasIncrementalStats() {
return false; // Incremental stats not supported for Iceberg tables.
}
@Override
public byte[] getPartitionStats() {
return null;
}
@Override
public boolean isMarkedCached() {
return false;
}
@Override
public long getLastCompactionId() {
throw new UnsupportedOperationException("Compaction id is not provided with " +
"IcebergMetaProvider implementation");
}
}
private class TableMetaRefImpl implements TableMetaRef {
private final String dbName_;
private final String tableName_;
private final Table msTable_;
private final long loadingTimeMs_;
private final HdfsPartitionLocationCompressor partitionLocationCompressor_;
private final org.apache.iceberg.Table iceApiTbl_;
public TableMetaRefImpl(String dbName, String tableName, Table msTable,
org.apache.iceberg.Table iceApiTbl, long loadingTimeMs) {
this.dbName_ = dbName;
this.tableName_ = tableName;
this.msTable_ = msTable;
this.iceApiTbl_ = iceApiTbl;
this.loadingTimeMs_ = loadingTimeMs;
this.partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
msTable.getPartitionKeysSize(),
Lists.newArrayList(msTable.getSd().getLocation()));
}
@Override
public boolean isPartitioned() {
return msTable_.getPartitionKeysSize() != 0;
}
@Override
public boolean isMarkedCached() {
return false;
}
@Override
public List<String> getPartitionPrefixes() {
return partitionLocationCompressor_.getPrefixes();
}
@Override
public boolean isTransactional() {
return AcidUtils.isTransactionalTable(msTable_.getParameters());
}
public String fullName() { return dbName_ + "." + tableName_; }
@Override
public List<VirtualColumn> getVirtualColumns() {
List<VirtualColumn> ret = new ArrayList<>();
ret.add(VirtualColumn.INPUT_FILE_NAME);
ret.add(VirtualColumn.FILE_POSITION);
ret.add(VirtualColumn.PARTITION_SPEC_ID);
ret.add(VirtualColumn.ICEBERG_PARTITION_SERIALIZED);
ret.add(VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER);
return ret;
}
@Override
public long getCatalogVersion() {
return 0;
}
@Override
public long getLoadedTimeMs() {
return loadingTimeMs_;
}
}
@Override
public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
return null;
}
/**
* Fetches the latest compaction id from HMS and compares with partition metadata in
* cache. If a partition is stale due to compaction, removes it from metas.
*/
public List<PartitionRef> checkLatestCompaction(String dbName, String tableName,
TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws TException {
return Collections.emptyList();
}
@Override
public TPartialTableInfo loadIcebergTable(final TableMetaRef table) throws TException {
TableMetaRefImpl tableRefImpl = (TableMetaRefImpl)table;
TableParams tableParams = new TableParams(tableRefImpl.msTable_);
org.apache.iceberg.Table apiTable = loadIcebergApiTable(table, tableParams,
tableRefImpl.msTable_);
TPartialTableInfo ret = new TPartialTableInfo();
TIcebergTable iceTable = new TIcebergTable();
if (apiTable.currentSnapshot() != null) {
iceTable.setCatalog_snapshot_id(apiTable.currentSnapshot().snapshotId());
}
iceTable.setDefault_partition_spec_id(apiTable.spec().specId());
ListMap<TNetworkAddress> hostIndex = new ListMap<>();
iceTable.setContent_files(getTContentFileStore(table, apiTable, hostIndex));
iceTable.setPartition_stats(Collections.emptyMap());
ret.setIceberg_table(iceTable);
ret.setNetwork_addresses(hostIndex.getList());
return ret;
}
@Override
public org.apache.iceberg.Table loadIcebergApiTable(final TableMetaRef table,
TableParams params, Table msTable) throws TException {
return ((TableMetaRefImpl)table).iceApiTbl_;
}
public String getLocation(final TableMetaRef table) {
return ((TableMetaRefImpl)table).msTable_.getSd().getLocation();
}
private TIcebergContentFileStore getTContentFileStore(final TableMetaRef table,
org.apache.iceberg.Table apiTable, ListMap<TNetworkAddress> hostIndex) {
try {
TableScan scan = apiTable.newScan();
GroupedContentFiles groupedFiles = new GroupedContentFiles(scan.planFiles());
IcebergFileMetadataLoader iceFml = new IcebergFileMetadataLoader(
apiTable, Collections.emptyList(), hostIndex, groupedFiles,
false);
iceFml.load();
IcebergContentFileStore contentFileStore = new IcebergContentFileStore(
apiTable, iceFml.getLoadedIcebergFds(), groupedFiles);
return contentFileStore.toThrift();
} catch (Exception e) {
throw new IllegalStateException(
"Exception occurred during loading Iceberg file metadata", e);
}
}
}

View File

@@ -89,6 +89,10 @@ public class LocalCatalog implements FeCatalog {
defaultKuduMasterHosts_ = defaultKuduMasterHosts;
}
public String getProviderURI() {
return metaProvider_.getURI();
}
@Override
public List<? extends FeDb> getDbs(PatternMatcher matcher) {
loadDbs();

View File

@@ -57,6 +57,7 @@ import com.google.errorprone.annotations.Immutable;
*/
public interface MetaProvider {
String getURI();
/**
* Get the authorization policy. This acts as a repository of authorization
* metadata.

View File

@@ -16,15 +16,26 @@
// under the License.
package org.apache.impala.service;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.local.CatalogdMetaProvider;
import org.apache.impala.catalog.local.IcebergMetaProvider;
import org.apache.impala.catalog.local.LocalCatalog;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
import org.apache.thrift.TException;
@@ -48,8 +59,14 @@ public abstract class FeCatalogManager {
* configuration.
*/
public static FeCatalogManager createFromBackendConfig() {
if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
if (cfg.use_local_catalog) {
if (!cfg.catalogd_deployed) {
// Currently Iceberg REST Catalog is the only implementation.
return new IcebergRestCatalogImpl();
} else {
return new LocalImpl();
}
} else {
return new CatalogdImpl();
}
@@ -155,6 +172,80 @@ public abstract class FeCatalogManager {
}
}
/**
* Implementation which creates LocalCatalog instances and uses an Iceberg REST
* Catalog.
* TODO(boroknagyz): merge with LocalImpl
*/
private static class IcebergRestCatalogImpl extends FeCatalogManager {
private static IcebergMetaProvider PROVIDER;
@Override
public synchronized FeCatalog getOrCreateCatalog() {
if (PROVIDER == null) {
try {
PROVIDER = initProvider();
} catch (IOException e) {
throw new IllegalStateException("Create IcebergMetaProvider failed", e);
}
}
return new LocalCatalog(PROVIDER, null);
}
IcebergMetaProvider initProvider() throws IOException {
TBackendGflags flags = BackendConfig.INSTANCE.getBackendCfg();
String catalogConfigDir = flags.catalog_config_dir;
Preconditions.checkState(catalogConfigDir != null &&
!catalogConfigDir.isEmpty());
List<String> files = listFiles(catalogConfigDir);
Preconditions.checkState(files.size() == 1,
String.format("Expected number of files in directory %s is one, found %d files",
catalogConfigDir, files.size()));
String configFile = catalogConfigDir + Path.SEPARATOR + files.get(0);
Properties props = readPropertiesFile(configFile);
// In the future we can expect different catalog types, but currently we only
// support Iceberg REST Catalogs.
checkPropertyValue(configFile, props, "connector.name", "iceberg");
checkPropertyValue(configFile, props, "iceberg.catalog.type", "rest");
return new IcebergMetaProvider(props);
}
private List<String> listFiles(String dirPath) {
File dir = new File(dirPath);
Preconditions.checkState(dir.exists() && dir.isDirectory());
return Stream.of(dir.listFiles())
.filter(file -> !file.isDirectory())
.map(File::getName)
.collect(Collectors.toList());
}
private Properties readPropertiesFile(String file) throws IOException {
Properties props = new Properties();
props.load(new FileInputStream(file));
return props;
}
private void checkPropertyValue(String configFile, Properties props, String key,
String expectedValue) {
if (!props.containsKey(key)) {
throw new IllegalStateException(String.format(
"Expected property %s was not specified in config file %s.", key,
configFile));
}
String actualValue = props.getProperty(key);
if (!Objects.equals(actualValue, expectedValue)) {
throw new IllegalStateException(String.format(
"Expected value of '%s' is '%s', but found '%s' in config file %s",
key, expectedValue, actualValue, configFile));
}
}
@Override
TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest req) {
return null;
}
}
/**
* Implementation which returns a provided catalog instance, used by tests.
* No updates from the statestore are permitted.

View File

@@ -130,6 +130,7 @@ import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeSystemTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.local.LocalCatalog;
import org.apache.impala.catalog.IcebergPositionDeleteTable;
import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.ImpaladTableUsageTracker;
@@ -174,6 +175,7 @@ import org.apache.impala.thrift.TClientRequest;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TCommentOnParams;
import org.apache.impala.thrift.TConvertTableRequest;
import org.apache.impala.thrift.TCopyTestCaseReq;
import org.apache.impala.thrift.TCounter;
import org.apache.impala.thrift.TCreateDropRoleParams;
@@ -185,28 +187,28 @@ import org.apache.impala.thrift.TDescribeHistoryParams;
import org.apache.impala.thrift.TDescribeOutputStyle;
import org.apache.impala.thrift.TDescribeResult;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TImpalaTableType;
import org.apache.impala.thrift.TDescribeTableParams;
import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
import org.apache.impala.thrift.TIcebergOperation;
import org.apache.impala.thrift.TExecRequest;
import org.apache.impala.thrift.TExecutorGroupSet;
import org.apache.impala.thrift.TExplainResult;
import org.apache.impala.thrift.TFinalizeParams;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.impala.thrift.TGetCatalogInfoResult;
import org.apache.impala.thrift.TGetCatalogMetricsResult;
import org.apache.impala.thrift.TGetTableHistoryResult;
import org.apache.impala.thrift.TGetTableHistoryResultItem;
import org.apache.impala.thrift.TGrantRevokePrivParams;
import org.apache.impala.thrift.TGrantRevokeRoleParams;
import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
import org.apache.impala.thrift.TIcebergOperation;
import org.apache.impala.thrift.TIcebergOptimizationMode;
import org.apache.impala.thrift.TIcebergOptimizeParams;
import org.apache.impala.thrift.TImpalaQueryOptions;
import org.apache.impala.thrift.TImpalaTableType;
import org.apache.impala.thrift.TLineageGraph;
import org.apache.impala.thrift.TLoadDataReq;
import org.apache.impala.thrift.TLoadDataResp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TConvertTableRequest;
import org.apache.impala.thrift.TIcebergOptimizationMode;
import org.apache.impala.thrift.TIcebergOptimizeParams;
import org.apache.impala.thrift.TPlanExecInfo;
import org.apache.impala.thrift.TPlanFragment;
import org.apache.impala.thrift.TPoolConfig;
@@ -549,8 +551,8 @@ public class Frontend {
impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig(
BackendConfig.INSTANCE);
queryHookManager_ = QueryEventHookManager.createFromConfig(BackendConfig.INSTANCE);
if (!isBackendTest) {
TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
if (!isBackendTest && cfg.catalogd_deployed) {
metaStoreClientPool_ = new MetaStoreClientPool(1, cfg.initial_hms_cnxn_timeout_s);
if (MetastoreShim.getMajorVersion() > 2) {
transactionKeepalive_ = new TransactionKeepalive(metaStoreClientPool_);
@@ -1121,6 +1123,20 @@ public class Frontend {
return resp;
}
public TGetCatalogInfoResult getCatalogInfo() {
TGetCatalogInfoResult resp = new TGetCatalogInfoResult();
resp.setInfo(Lists.newArrayList());
if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
resp.info.add("Catalog Type: Local");
FeCatalog catalog = getCatalog();
String provider = ((LocalCatalog)catalog).getProviderURI();
if (provider != null && !provider.isEmpty()) {
resp.info.add("Provider: " + provider);
}
}
return resp;
}
/**
* Keeps track of retries when handling InconsistentMetadataFetchExceptions.
* Whenever a Catalog object is acquired (e.g., getCatalog), operations that access

View File

@@ -57,12 +57,14 @@ import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCivilTime;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDescribeDbParams;
import org.apache.impala.thrift.TDescribeHistoryParams;
import org.apache.impala.thrift.TDescribeResult;
import org.apache.impala.thrift.TDescribeTableParams;
import org.apache.impala.thrift.TDescriptorTable;
import org.apache.impala.thrift.TExecRequest;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.impala.thrift.TGetAllHadoopConfigsResponse;
import org.apache.impala.thrift.TGetCatalogInfoResult;
import org.apache.impala.thrift.TGetCatalogMetricsResult;
import org.apache.impala.thrift.TGetDataSrcsParams;
import org.apache.impala.thrift.TGetDataSrcsResult;
@@ -72,10 +74,8 @@ import org.apache.impala.thrift.TGetFunctionsParams;
import org.apache.impala.thrift.TGetFunctionsResult;
import org.apache.impala.thrift.TGetHadoopConfigRequest;
import org.apache.impala.thrift.TGetHadoopConfigResponse;
import org.apache.impala.thrift.TGetHadoopGroupsRequest;
import org.apache.impala.thrift.TGetHadoopGroupsResponse;
import org.apache.impala.thrift.TGetTableHistoryResult;
import org.apache.impala.thrift.TGetMetadataTablesParams;
import org.apache.impala.thrift.TGetTableHistoryResult;
import org.apache.impala.thrift.TGetTablesParams;
import org.apache.impala.thrift.TGetTablesResult;
import org.apache.impala.thrift.TLoadDataReq;
@@ -85,14 +85,13 @@ import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TQueryCompleteContext;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TSessionState;
import org.apache.impala.thrift.TShowFilesParams;
import org.apache.impala.thrift.TShowGrantPrincipalParams;
import org.apache.impala.thrift.TShowRolesParams;
import org.apache.impala.thrift.TShowStatsOp;
import org.apache.impala.thrift.TShowStatsParams;
import org.apache.impala.thrift.TStringLiteral;
import org.apache.impala.thrift.TDescribeHistoryParams;
import org.apache.impala.thrift.TSessionState;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
@@ -260,6 +259,17 @@ public class JniFrontend {
}
}
public byte[] getCatalogInfo() throws ImpalaException {
Preconditions.checkNotNull(frontend_);
TGetCatalogInfoResult info = frontend_.getCatalogInfo();
try {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(info);
} catch (TException e) {
throw new InternalException(e.getMessage());
}
}
/**
* Returns a list of table names matching an optional pattern.
*

View File

@@ -131,6 +131,12 @@ public class IcebergUtil {
private static final Logger LOG = LoggerFactory.getLogger(IcebergUtil.class);
public static final String ICEBERG_REST_URI = "iceberg_rest_uri";
public static final String ICEBERG_REST_USER_ID = "iceberg_rest_user_id";
public static final String ICEBERG_REST_USER_SECRET = "iceberg_rest_user_secret";
public static final String ICEBERG_REST_WAREHOUSE_LOCATION =
"iceberg_rest_warehouse_location";
/**
* Returns the corresponding catalog implementation for 'feTable'.
*/
@@ -1227,7 +1233,9 @@ public class IcebergUtil {
CatalogProperties.FILE_IO_IMPL, CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH));
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
ICEBERG_REST_URI, ICEBERG_REST_USER_ID, ICEBERG_REST_USER_SECRET,
ICEBERG_REST_WAREHOUSE_LOCATION));
for (String key : configKeys) {
String val = conf.get("iceberg." + key);

View File

@@ -0,0 +1,107 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.impala</groupId>
<artifactId>impala-parent</artifactId>
<version>5.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>impala-iceberg-rest-catalog-test</artifactId>
<packaging>jar</packaging>
<name>Iceberg REST Catalog Test</name>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<!-- IMPALA-9468: Avoid pulling in netty for security reasons -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<classifier>tests</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// We use the org.apache.iceberg.rest package because some classes
// are package-private. This means this code is more likely to
// break on Iceberg version updates. On the long-term we might
// switch to an open-source Iceberg REST Catalog.
package org.apache.iceberg.rest;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergRestCatalogTest {
private static final Logger LOG = LoggerFactory.getLogger(IcebergRestCatalogTest.class);
private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
static final int REST_PORT = 9084;
private Server httpServer;
public IcebergRestCatalogTest() {}
private static String getWarehouseLocation() {
String FILESYSTEM_PREFIX = System.getenv("FILESYSTEM_PREFIX");
String HADOOP_CATALOG_LOCATION = "/test-warehouse/iceberg_test/hadoop_catalog";
if (FILESYSTEM_PREFIX != null && !FILESYSTEM_PREFIX.isEmpty()) {
return FILESYSTEM_PREFIX + HADOOP_CATALOG_LOCATION;
}
String DEFAULT_FS = System.getenv("DEFAULT_FS");
return DEFAULT_FS + HADOOP_CATALOG_LOCATION;
}
private Catalog initializeBackendCatalog() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
return new HadoopCatalog(conf, getWarehouseLocation());
}
public void start(boolean join) throws Exception {
Catalog catalog = initializeBackendCatalog();
RESTCatalogAdapter adapter = new RESTCatalogAdapter(catalog) {
@Override
public <T extends RESTResponse> T execute(
RESTCatalogAdapter.HTTPMethod method,
String path,
Map<String, String> queryParams,
Object body,
Class<T> responseType,
Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
Object request = roundTripSerialize(body, "request");
T response =
super.execute(
method, path, queryParams, request, responseType, headers, errorHandler);
T responseAfterSerialization = roundTripSerialize(response, "response");
return responseAfterSerialization;
}
};
RESTCatalogServlet servlet = new RESTCatalogServlet(adapter);
ServletContextHandler context = new ServletContextHandler(
ServletContextHandler.NO_SESSIONS);
ServletHolder servletHolder = new ServletHolder(servlet);
context.addServlet(servletHolder, "/*");
context.insertHandler(new GzipHandler());
this.httpServer = new Server(REST_PORT);
httpServer.setHandler(context);
httpServer.start();
if (join) {
httpServer.join();
}
}
public void stop() throws Exception {
if (httpServer != null) {
httpServer.stop();
}
}
public static void main(String[] args) throws Exception {
new IcebergRestCatalogTest().start(true);
}
public static <T> T roundTripSerialize(T payload, String description) {
if (payload != null) {
LOG.trace(payload.toString());
try {
if (payload instanceof RESTMessage) {
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), payload.getClass());
} else {
// use Map so that Jackson doesn't try to instantiate ImmutableMap
// from payload.getClass()
return (T) MAPPER.readValue(
MAPPER.writeValueAsString(payload), Map.class);
}
} catch (Exception e) {
LOG.warn(e.toString());
throw new RuntimeException(
String.format("Failed to serialize and deserialize %s: %s",
description, payload), e);
}
}
return null;
}
}

View File

@@ -408,6 +408,7 @@ under the License.
<modules>
<module>datagenerator</module>
<module>puffin-data-generator</module>
<module>iceberg-rest-catalog-test</module>
<module>executor-deps</module>
<module>ext-data-source</module>
<module>../fe</module>

View File

@@ -27,7 +27,7 @@ RUN \
echo "-DHADOOP_USER_NAME=$USERNAME" >> /etc/trino/jvm.config
COPY hive-site.xml core-site.xml hdfs-site.xml /etc/
COPY iceberg.properties hive.properties /etc/trino/catalog/
COPY iceberg_rest.properties iceberg.properties hive.properties /etc/trino/catalog/
# Expose the Trino port
EXPOSE 9091

View File

@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://localhost:9084
iceberg.rest-catalog.warehouse=hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog
hive.config.resources=/etc/hive-site.xml,/etc/hdfs-site.xml,/etc/core-site.xml

28
testdata/bin/run-iceberg-rest-server.sh vendored Executable file
View File

@@ -0,0 +1,28 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# We can expect that mvn can be found in /usr/local/bin
# set bin/bootstrap_build.sh and bin/bootstrap_system.sh
PATH=${PATH}:/usr/local/bin
cd $IMPALA_HOME
. bin/impala-config.sh
mvn -f "java/iceberg-rest-catalog-test/pom.xml" exec:java \
-Dexec.mainClass="org.apache.iceberg.rest.IcebergRestCatalogTest"

View File

@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://localhost:9084

View File

@@ -0,0 +1,90 @@
====
---- QUERY
SHOW DATABASES;
---- RESULTS: VERIFY_IS_SUBSET
'_impala_builtins','System database for Impala builtin functions'
'hadoop_catalog_test',''
'ice',''
'iceberg_partitioned_orc',''
'iceberg_resolution_test',''
---- TYPES
STRING, STRING
====
---- QUERY
USE ice;
====
---- QUERY
SELECT lat FROM airports_parquet WHERE iata = '00R';
---- RESULTS
30.68586111
---- TYPES
DOUBLE
====
---- QUERY
SELECT * from ice.airports_parquet.history;
---- RESULTS
2021-10-18 16:53:23.865000000,2304960110511088609,NULL,true
---- TYPES
TIMESTAMP, BIGINT, BIGINT, BOOLEAN
====
---- QUERY
DESCRIBE ice.airports_parquet
---- RESULTS
'iata','string','','true'
'airport','string','','true'
'city','string','','true'
'state','double','','true'
'country','string','','true'
'lat','double','','true'
'lon','double','','true'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
DESCRIBE FORMATTED ice.airports_parquet;
---- RESULTS: VERIFY_IS_SUBSET
'# col_name ','data_type ','comment '
'','NULL','NULL'
'iata','string','NULL'
'airport','string','NULL'
'city','string','NULL'
'state','double','NULL'
'country','string','NULL'
'lat','double','NULL'
'lon','double','NULL'
'','NULL','NULL'
'# Detailed Table Information','NULL','NULL'
'Database: ','ice ','NULL'
'OwnerType: ','USER ','NULL'
'Owner: ','null ','NULL'
'Location: ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/airports_parquet','NULL'
'Erasure Coding Policy:','NONE ','NULL'
'Table Type: ','EXTERNAL_TABLE ','NULL'
'Table Parameters:','NULL','NULL'
'','EXTERNAL ','TRUE '
'','bucketing_version ','2 '
'','engine.hive.enabled ','true '
'','gc.enabled ','TRUE '
'','numFiles ','1 '
'','storage_handler ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
'','table_type ','ICEBERG '
'','write.format.default','parquet '
'','NULL','NULL'
'# Storage Information','NULL','NULL'
'SerDe Library: ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
'InputFormat: ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
'OutputFormat: ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
'Compressed: ','No ','NULL'
'Sort Columns: ','[] ','NULL'
'','NULL','NULL'
'# Constraints','NULL','NULL'
---- TYPES
string, string, string
====
---- QUERY
show table stats ice.airports_parquet;
---- RESULTS
row_regex:0,1,'.+KB','NOT CACHED','NOT CACHED','PARQUET','false','.*/test-warehouse/iceberg_test/hadoop_catalog/ice/airports_parquet','$ERASURECODE_POLICY'
---- TYPES
BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING, STRING
====

View File

@@ -603,6 +603,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
expected_subscribers += 1
if "--enable_catalogd_ha" in options:
expected_subscribers += 1
elif "--no_catalogd" in options:
expected_subscribers -= 1
if wait_for_backends:
statestored.service.wait_for_live_subscribers(expected_subscribers,

View File

@@ -93,9 +93,11 @@ def post_data(url, data):
# * The docker minicluster with one container per process connected to a user-defined
# bridge network.
class ImpalaCluster(object):
def __init__(self, docker_network=None, use_admission_service=False):
def __init__(self, docker_network=None, use_admission_service=False,
deploy_catalogd=True):
self.docker_network = docker_network
self.use_admission_service = use_admission_service
self.deploy_catalogd = deploy_catalogd
self.refresh()
@classmethod
@@ -231,7 +233,7 @@ class ImpalaCluster(object):
# process to write a minidump.
assert len(self.impalads) >= expected_num_impalads
assert self.statestored is not None
assert self.catalogd is not None
if (self.deploy_catalogd): assert self.catalogd is not None
sleep_interval = 0.5
# Wait for each webserver to be ready.
@@ -274,7 +276,7 @@ class ImpalaCluster(object):
expected_num=num_impalads, actual_num=len(self.impalads))
if not self.statestored:
msg += "statestored failed to start.\n"
if not self.catalogd:
if self.deploy_catalogd and not self.catalogd:
msg += "catalogd failed to start.\n"
if msg:
raise RuntimeError(msg)

View File

@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import socket
import time
import pytest
import signal
import subprocess
import sys
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
REST_SERVER_PORT = 9084
IMPALA_HOME = os.environ['IMPALA_HOME']
START_ARGS = 'start_args'
IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
.format(IMPALA_HOME)
class TestIcebergRestCatalog(CustomClusterTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
def setup_method(self, method):
# Invoke start-impala-cluster.py with '--no_catalogd'
start_args = "--no_catalogd"
if START_ARGS in method.__dict__:
start_args = method.__dict__[START_ARGS] + " " + start_args
method.__dict__[START_ARGS] = start_args
try:
self._start_rest_server()
self._wait_for_rest_server(300)
super(TestIcebergRestCatalog, self).setup_method(method)
except Exception as e:
self._stop_rest_server()
raise e
def teardown_method(self, method):
self._stop_rest_server()
super(TestIcebergRestCatalog, self).teardown_method(method)
def _start_rest_server(self):
self.process = subprocess.Popen(
'testdata/bin/run-iceberg-rest-server.sh',
stdout=sys.stdout, stderr=sys.stderr, shell=True,
preexec_fn=os.setsid, cwd=IMPALA_HOME)
def _stop_rest_server(self):
if self.process:
os.killpg(self.process.pid, signal.SIGTERM)
def _wait_for_rest_server(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
print("Iceberg REST server is available.")
return
finally:
s.close()
time.sleep(sleep_interval_s)
raise Exception(
"Webserver did not become available within {} seconds.".format(timeout_s))
@CustomClusterTestSuite.with_args(
impalad_args=IMPALAD_ARGS)
@pytest.mark.execute_serially
def test_rest_catalog_basic(self, vector):
self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")

View File

@@ -20,6 +20,24 @@ under the License.
<h2>Catalog</h2>
{{?info}}
<h3>Info</h3>
<table class='table table-bordered table-hover'>
<tr>
<th>Value</th>
</tr>
{{/info}}
{{#info}}
<tr>
<td><tt>{{value}}</tt></td>
</tr>
{{/info}}
{{?info}}
</table>
{{/info}}
{{?has_large_tables}}
<div class="card">
<div class="card-header">