IMPALA-14018: Configure OAUTH2 with Lakekeeper and fix Impala's config handling

This patch adds Keycloak as Identity Provider for Lakekeeper, so
now we can test Impala's Iceberg REST Catalog with an OAuth2
authentication (Client-Credential) flow. The Keycloak
instance is pre-configured with a Lakekeeper realm that contain
the necessary clients, users, scopes and roles.

Manual testing also revealed that our Iceberg REST Catalog
configuration is incomplete. This patch refactors config
handling in a way that both Iceberg native configuration
options and Trino-specific configuration options can be
used with Impala. This will help users use their Trino
connectors with Impala.

By default Impala uses Iceberg 1.3 which assumes that the
Iceberg REST server is also the authentication server. It is
not always true, e.g. Lakekeeper cannot even function as the
authententication server, but it can work with external authentication
servers. Btw, this is why we needed Keycloak in the first place.
It means if someone wants to try out Lakekeeper+Impala with Oauth2,
they need to configure Impala with Iceberg 1.5.

Testing
 * manual testing with Iceberg 1.5

Change-Id: Ie5785cb72773e188b1de7c7924cc6f0b1f96de33
(cherry picked from commit a9cb94986a5791be2adcb2f7c576272a9c22e79c)
Reviewed-on: http://gerrit.cloudera.org:8080/23156
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2025-07-08 16:06:36 +02:00
committed by Impala Public Jenkins
parent 8b057881c7
commit 062ba4071a
11 changed files with 820 additions and 120 deletions

View File

@@ -20,36 +20,24 @@ package org.apache.impala.catalog.iceberg;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergTableLoadingException;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.util.IcebergUtil;
import com.google.common.collect.ImmutableMap;
/**
* Implementation of IcebergCatalog for tables stored in HadoopCatalog.
*/
public class IcebergRESTCatalog implements IcebergCatalog {
private static final String KEY_URI = "iceberg.rest-catalog.uri";
private static final String KEY_NAME = "iceberg.rest-catalog.name";
private static final String KEY_CLIENT_ID = "iceberg.rest-catalog.client-id";
private static final String KEY_CLIENT_SECRET = "iceberg.rest-catalog.client-secret";
private static final String KEY_WAREHOUSE = "iceberg.rest-catalog.warehouse";
private final String REST_URI;
private static IcebergRESTCatalog instance_;
@@ -63,77 +51,16 @@ public class IcebergRESTCatalog implements IcebergCatalog {
return instance_;
}
private static class IcebergRestConfig {
String catalogName;
String uri;
String user;
String secret;
String credential;
String warehouseLocation;
IcebergRestConfig(Properties properties) {
uri = getRequiredProperty(properties, KEY_URI);
catalogName = properties.getProperty(KEY_NAME, "");
user = properties.getProperty(KEY_CLIENT_ID);
secret = properties.getProperty(KEY_CLIENT_SECRET);
credential = getCredential();
warehouseLocation = properties.getProperty(KEY_WAREHOUSE);
}
public Map<String, String> getCatalogProperties() {
ImmutableMap.Builder<String, String> mapBuilder = new ImmutableMap.Builder<>();
mapBuilder.put(CatalogProperties.URI, uri);
if (credential != null) mapBuilder.put("credential", credential);
if (warehouseLocation != null){
mapBuilder.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
}
return mapBuilder.build();
}
public SessionCatalog.SessionContext getSessionContext() {
return new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
user,
getCredentialMap(),
ImmutableMap.of());
}
private String getRequiredProperty(Properties properties, String key) {
String value = properties.getProperty(key);
if (value == null) {
throw new IllegalStateException(
String.format("Missing property of IcebergRESTCatalog: %s", key));
}
return value;
}
private String getCredential() {
if (user != null && secret != null) {
return user + ":" + secret;
}
return null;
}
private ImmutableMap<String, String> getCredentialMap() {
ImmutableMap.Builder<String, String> mapBuilder = new ImmutableMap.Builder<>();
if (credential != null) {
mapBuilder.put("credential", credential);
}
return mapBuilder.build();
}
}
private IcebergRESTCatalog(Properties properties) {
setContextClassLoader();
IcebergRestConfig restConfig = new IcebergRestConfig(properties);
REST_URI = restConfig.uri;
restCatalog_ = new RESTCatalog(restConfig.getSessionContext(),
(config) -> HTTPClient.builder(config).uri(REST_URI).build());
RESTCatalogProperties restConfig = new RESTCatalogProperties(properties);
REST_URI = restConfig.getUri();
restCatalog_ = new RESTCatalog();
HiveConf conf = new HiveConf(IcebergRESTCatalog.class);
restCatalog_.setConf(conf);
restCatalog_.initialize(
restConfig.catalogName,
restConfig.getName(),
restConfig.getCatalogProperties());
}

View File

@@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.iceberg;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Utility class to extract native Iceberg catalog properties from a Properties object
* that is possibly created from a Trino connector file. The goal is that users can
* just simply reuse their already existing Trino configurations with Impala.
*
* Iceberg REST Catalog and Trino can use different property names for the same
* functionality.E.g.:
* +-------------------+----------------------------------------+
* | Iceberg | Trino |
* +-------------------+----------------------------------------+
* | uri | iceberg.rest-catalog.uri |
* | warehouse | iceberg.rest-catalog.warehouse |
* | oauth2-server-uri | iceberg.rest-catalog.oauth2.server-uri |
* +-------------------+----------------------------------------+
*
* For a complete list check the followings:
* Iceberg: CatalogProperties, OAuth2Properties
* Trino: IcebergRestCatalogConfig, OAuth2SecurityConfig
*
* With this class Impala can recognize the alternative configuration names and translate
* them to the Iceberg native ones. It also handles required properties like "uri" that
* must be set.
*
* If we only support a single setting for a configuration option, we also verify their
* values. E.g. 'vended-credentials-enabled' must be false, as Impala doesn't support
* vended credentials yet.
*
* And some properties are simply ignored as they are specific to another query engine
* (e.g. 'case-insensitive-name-matching.cache-ttl'), or they have different purposes than
* configuring the REST catalog (e.g. 'connector.name').
*
* The remaining properties (that are not translated, verified, or ignored) don't
* need special treatment and are simply returned as they are.
*/
public class RESTCatalogProperties {
/**
* Utility class for properties that can have alternative names.
*/
private static class Config {
protected String catalogKey;
protected ImmutableList<String> alternativeKeys;
public Config(String key) {
this(key, ImmutableList.of());
}
public Config(String key, ImmutableList<String> alternativeKeys) {
this.catalogKey = key;
this.alternativeKeys = alternativeKeys;
}
public boolean applyConfig(
Map<String, String> sourceMap, Map<String, String> outputMap) {
verifyOutputMap(outputMap);
boolean applied = false;
String value = sourceMap.get(catalogKey);
if (value != null) {
applied = true;
sourceMap.remove(catalogKey);
outputMap.put(catalogKey, value);
}
// Even if already applied, check alternative keys for ambiguity.
for (String alternativeKey : alternativeKeys) {
value = sourceMap.get(alternativeKey);
if (value != null) {
if (applied) {
throw new IllegalStateException(
String.format("Alternative key '%s' sets the same configuration as " +
"'%s' which is already defined with value '%s'",
alternativeKey, catalogKey, value));
}
applied = true;
sourceMap.remove(alternativeKey);
// We still need to use 'catalogKey' for alternative keys.
outputMap.put(catalogKey, value);
}
}
return applied;
}
protected void verifyOutputMap(Map<String, String> outputMap) {
String value = outputMap.get(catalogKey);
if (value != null) {
throw new IllegalStateException(
String.format("REST Catalog property is defined multiple times: %s\n" +
"Current value: %s", catalogKey, value));
}
}
}
/**
* Config that must be present. Currently only 'URI'.
*/
private static class RequiredConfig extends Config {
public RequiredConfig(String key) {
super(key, ImmutableList.of());
}
public RequiredConfig(String key, ImmutableList<String> alternativeKeys) {
super(key, alternativeKeys);
}
@Override
public boolean applyConfig(
Map<String, String> sourceMap, Map<String, String> outputMap) {
boolean success = super.applyConfig(sourceMap, outputMap);
if (success) return true;
throw new IllegalStateException(
String.format("Missing property of IcebergRESTCatalog: %s", catalogKey));
}
}
/**
* Configuration that is only meaningful for other query engines, and cannot be
* translated to Iceberg config.
*/
private static class IgnoredConfig extends Config {
public IgnoredConfig(String key) {
super(key, ImmutableList.of());
}
@Override
public boolean applyConfig(
Map<String, String> sourceMap, Map<String, String> outputMap) {
if (sourceMap.containsKey(catalogKey)) {
sourceMap.remove(catalogKey);
return true;
} else {
return false;
}
}
}
/**
* Config for which we only support a single value.
*/
private static class VerifiedConfig extends Config {
private String expectedValue;
public VerifiedConfig(String key, String expectedValue) {
super(key, ImmutableList.of());
Preconditions.checkState(expectedValue != null);
this.expectedValue = expectedValue;
}
@Override
public boolean applyConfig(
Map<String, String> sourceMap, Map<String, String> outputMap) {
String value = sourceMap.get(catalogKey);
if (value != null) {
// Config keys are case sensitive, but the values are typically not, especially
// the config values that are verified (false/FALSE, none/NONE).
if (!expectedValue.equalsIgnoreCase(value)) {
throw new IllegalStateException(
String.format(
"The only allowed value for REST Catalog property '%s' is '%s'.\n" +
"Value in configuration is '%s'",
catalogKey, expectedValue, value));
}
return true;
}
return false;
}
}
private static final String NAME = "iceberg.rest-catalog.name";
private static final ImmutableList<Config> CATALOG_CONFIGS = ImmutableList.of(
new RequiredConfig(CatalogProperties.URI,
ImmutableList.of("iceberg.rest-catalog.uri")),
new Config("prefix",
ImmutableList.of("iceberg.rest-catalog.prefix")),
new Config(CatalogProperties.WAREHOUSE_LOCATION,
ImmutableList.of("iceberg.rest-catalog.warehouse")),
new Config(CatalogProperties.AUTH_SESSION_TIMEOUT_MS,
ImmutableList.of("iceberg.rest-catalog.session-timeout")),
// USER sessions are not supported
new VerifiedConfig("iceberg.rest-catalog.session", "NONE"),
new VerifiedConfig("iceberg.rest-catalog.vended-credentials-enabled", "false"),
new VerifiedConfig("iceberg.rest-catalog.nested-namespace-enabled", "false"),
new VerifiedConfig("iceberg.rest-catalog.case-insensitive-name-matching", "true"),
new IgnoredConfig("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl"),
new IgnoredConfig("iceberg.catalog.type"),
new IgnoredConfig("connector.name"),
new IgnoredConfig(NAME)
);
private static final ImmutableList<Config> OAUTH2_CONFIGS = ImmutableList.of(
// Since currently only OAUTH2 is possible we ignore this config. It also
// doesn't map to any Iceberg catalog property.
new VerifiedConfig("iceberg.rest-catalog.security", "OAUTH2"),
// TODO: switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade.
new Config("oauth2-server-uri",
ImmutableList.of("iceberg.rest-catalog.oauth2.server-uri")),
new Config(OAuth2Properties.CREDENTIAL,
ImmutableList.of("iceberg.rest-catalog.oauth2.credential")),
new Config(OAuth2Properties.TOKEN,
ImmutableList.of("iceberg.rest-catalog.oauth2.token")),
new Config(OAuth2Properties.TOKEN_REFRESH_ENABLED,
ImmutableList.of("iceberg.rest-catalog.oauth2.token-refresh-enabled")),
new Config(OAuth2Properties.SCOPE,
ImmutableList.of("iceberg.rest-catalog.oauth2.scope"))
);
private Map<String, String> sourceMap_;
private Map<String, String> finalMap_;
private String uri_;
private String name_ = "";
public RESTCatalogProperties(Properties properties) {
sourceMap_ = new HashMap<>();
for (String key : properties.stringPropertyNames()) {
sourceMap_.put(key, properties.getProperty(key));
}
// 'NAME' is used in RESTCatalog.initialize(), not in the properties.
if (sourceMap_.containsKey(NAME)) {
name_ = sourceMap_.get(NAME);
}
finalMap_ = new HashMap<>();
applyConfigs(CATALOG_CONFIGS, sourceMap_, finalMap_);
applyConfigs(OAUTH2_CONFIGS, sourceMap_, finalMap_);
// Copy over remaining configuration that do not need special handling.
for (Map.Entry<String, String> entry : sourceMap_.entrySet()) {
Preconditions.checkState(!finalMap_.containsKey(entry.getKey()));
finalMap_.put(entry.getKey(), entry.getValue());
}
uri_ = finalMap_.get(CatalogProperties.URI);
Preconditions.checkState(uri_ != null);
}
private void applyConfigs(ImmutableList<Config> configs, Map<String, String> sourceMap,
Map<String, String> outputMap) {
for (Config config : configs) {
config.applyConfig(sourceMap, outputMap);
}
}
public String getName() { return name_; }
public String getUri() { return uri_; }
public Map<String, String> getCatalogProperties() { return finalMap_; }
}

View File

@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.iceberg;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.junit.Test;
import java.util.Map;
import java.util.Properties;
public class TestRESTCatalogProperties {
@Test
public void testEmptyConfig() {
try {
Properties props = new Properties();
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
} catch (Exception e) {
// RESTCatalogProperties throws an exception if required properties are not defined.
return;
}
fail();
}
@Test
public void testUriOnlyConfig() {
Properties props = new Properties();
props.setProperty(CatalogProperties.URI, "test-uri");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
assertEquals("test-uri", restProps.getUri());
assertEquals("", restProps.getName());
assertEquals(1, restProps.getCatalogProperties().size());
assertTrue(restProps.getCatalogProperties().containsKey(CatalogProperties.URI));
}
@Test
public void testIcebergNativeConfig() {
Properties props = new Properties();
props.setProperty(CatalogProperties.URI, "test-uri");
props.setProperty("iceberg.rest-catalog.name", "catalog-name");
props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc");
props.setProperty(CatalogProperties.AUTH_SESSION_TIMEOUT_MS, "5000");
//TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade.
props.setProperty("oauth2-server-uri", "oauth-uri");
props.setProperty(OAuth2Properties.TOKEN, "oauth-token");
props.setProperty(OAuth2Properties.SCOPE, "oauth-scope");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
assertEquals("test-uri", restProps.getUri());
assertEquals("catalog-name", restProps.getName());
assertEquals(6, restProps.getCatalogProperties().size());
Map<String, String> catProps = restProps.getCatalogProperties();
assertEquals("test-uri", catProps.get(CatalogProperties.URI));
assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION));
assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS));
assertEquals("oauth-uri", catProps.get("oauth2-server-uri"));
assertEquals("oauth-token", catProps.get(OAuth2Properties.TOKEN));
assertEquals("oauth-scope", catProps.get(OAuth2Properties.SCOPE));
}
@Test
public void testTrinoConfig() {
Properties props = new Properties();
props.setProperty("iceberg.rest-catalog.uri", "test-uri");
props.setProperty("iceberg.rest-catalog.name", "catalog-name");
props.setProperty("iceberg.rest-catalog.warehouse", "warehouse-loc");
props.setProperty("iceberg.rest-catalog.session-timeout", "5000");
//TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade.
props.setProperty("iceberg.rest-catalog.oauth2.server-uri", "oauth-uri");
props.setProperty("iceberg.rest-catalog.oauth2.credential", "oauth-cred");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
assertEquals("test-uri", restProps.getUri());
assertEquals("catalog-name", restProps.getName());
assertEquals(5, restProps.getCatalogProperties().size());
Map<String, String> catProps = restProps.getCatalogProperties();
assertEquals("test-uri", catProps.get(CatalogProperties.URI));
assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION));
assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS));
assertEquals("oauth-uri", catProps.get("oauth2-server-uri"));
assertEquals("oauth-cred", catProps.get(OAuth2Properties.CREDENTIAL));
}
@Test
public void testAmbiguousKeys() {
try {
Properties props = new Properties();
props.setProperty("iceberg.rest-catalog.uri", "test-uri");
props.setProperty("uri", "test-uri2");
props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
} catch (Exception e) {
// RESTCatalogProperties throws an exception when the same property is defined
// multiple times.
return;
}
fail();
}
@Test
public void testVerifiedConfigsSucceed() {
Properties props = new Properties();
props.setProperty("iceberg.rest-catalog.uri", "test-uri");
props.setProperty("iceberg.rest-catalog.session", "none");
props.setProperty("iceberg.rest-catalog.vended-credentials-enabled", "false");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
Map<String, String> catProps = restProps.getCatalogProperties();
assertEquals(3, catProps.size());
assertEquals("test-uri", catProps.get(CatalogProperties.URI));
assertEquals("none", catProps.get("iceberg.rest-catalog.session"));
assertEquals("false", catProps.get(
"iceberg.rest-catalog.vended-credentials-enabled"));
}
@Test
public void testVerifiedConfigsFail() {
try {
Properties props = new Properties();
props.setProperty("iceberg.rest-catalog.uri", "test-uri");
props.setProperty("iceberg.rest-catalog.session", "user");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
} catch (Exception e) {
// RESTCatalogProperties throws an exception when a verified config doesn't
// have the expected value.
return;
}
fail();
}
@Test
public void testIgnoredConfigs() {
Properties props = new Properties();
props.setProperty(CatalogProperties.URI, "test-uri");
props.setProperty("iceberg.rest-catalog.name", "catalog-name");
props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc");
props.setProperty("iceberg.rest-catalog.session-timeout", "5000");
//TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade.
props.setProperty("iceberg.rest-catalog.oauth2.server-uri", "oauth-uri");
props.setProperty(OAuth2Properties.CREDENTIAL, "oauth-cred");
props.setProperty("connector.name", "iceberg");
props.setProperty("iceberg.catalog.type", "rest");
RESTCatalogProperties restProps = new RESTCatalogProperties(props);
assertEquals("test-uri", restProps.getUri());
assertEquals("catalog-name", restProps.getName());
Map<String, String> catProps = restProps.getCatalogProperties();
assertEquals(5, catProps.size());
assertFalse(catProps.containsKey("connector.name"));
assertFalse(catProps.containsKey("iceberg.catalog.type"));
assertEquals("test-uri", catProps.get(CatalogProperties.URI));
assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION));
assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS));
assertEquals("oauth-uri", catProps.get("oauth2-server-uri"));
assertEquals("oauth-cred", catProps.get(OAuth2Properties.CREDENTIAL));
}
}

View File

@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
FROM alpine:latest
RUN apk add --no-cache curl jq

View File

@@ -8,7 +8,7 @@ You need docker compose (Compose V2) in your environment. This usually means you
Via the following scripts you can run/stop Lakekeeper. Be aware that each restart resets the warehouse contents.
```
${IMPALA_HOME}/testdata/bin/run-lakekeeper.sh
${IMPALA_HOME}/testdata/bin/stop-lakekeeper.sh
${IMPALA_HOME}/testdata/bin/kill-lakekeeper.sh
```
## Ingesting data

View File

@@ -24,6 +24,8 @@ services:
- LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@localhost:54321/postgres
- LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@localhost:54321/postgres
- LAKEKEEPER__ENABLE_HDFS_WITH_SYSTEM_CREDENTIALS=true
- LAKEKEEPER__OPENID_PROVIDER_URI=http://localhost:7070/realms/lakekeeper-realm
- LAKEKEEPER__OPENID_CLIENT_ID=lakekeeper-client
- RUST_LOG=trace,axum=trace,sqlx=trace,iceberg-catalog=trace
- HADOOP_USER_NAME=${USER}
- HADOOP_CONF_DIR=/etc/hadoop
@@ -40,6 +42,8 @@ services:
condition: service_completed_successfully
db:
condition: service_healthy
keycloak:
condition: service_healthy
volumes:
- ./core-site.xml:/etc/hadoop/core-site.xml:ro
- ./hdfs-site.xml:/etc/hadoop/hdfs-site.xml:ro
@@ -53,61 +57,29 @@ services:
- LAKEKEEPER__ENABLE_HDFS_WITH_SYSTEM_CREDENTIALS=true
- LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@db:5432/postgres
- LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@db:5432/postgres
- LAKEKEEPER__OPENID_PROVIDER_URI=http://localhost:7070/realms/lakekeeper-realm
- LAKEKEEPER__OPENID_CLIENT_ID=lakekeeper-client
- RUST_LOG=info
restart: "no"
command: [ "migrate" ]
depends_on:
db:
condition: service_healthy
keycloak:
condition: service_healthy
networks:
lakekeeper_net:
bootstrap:
image: curlimages/curl
build: .
depends_on:
lakekeeper:
condition: service_healthy
restart: "no"
command:
- -w
- "%{http_code}"
- "-X"
- "POST"
- "-v"
- "http://localhost:8181/management/v1/bootstrap"
- "-H"
- "Content-Type: application/json"
- "--data"
- '{"accept-terms-of-use": true}'
- "-o"
- "/dev/null"
# - "--fail-with-body"
network_mode: host
initialwarehouse:
image: curlimages/curl
depends_on:
lakekeeper:
condition: service_healthy
bootstrap:
condition: service_completed_successfully
restart: "no"
command:
- -w
- "%{http_code}"
- "-X"
- "POST"
- "-v"
- "http://localhost:8181/management/v1/warehouse"
- "-H"
- "Content-Type: application/json"
- "--data"
- "@create-default-warehouse.json"
- "-o"
- "/dev/null"
- keycloak
- lakekeeper
volumes:
- ./create-default-warehouse.json:/home/curl_user/create-default-warehouse.json
- ./setup.sh:/setup.sh
- ./create-default-warehouse.json:/create-default-warehouse.json
network_mode: host
entrypoint: /setup.sh
db:
image: bitnami/postgresql:16.3.0
@@ -126,6 +98,38 @@ services:
ports:
- "54321:5432"
keycloak:
image: quay.io/keycloak/keycloak:latest
healthcheck:
test:
- "CMD-SHELL"
- >
[ -f /tmp/HealthCheck.java ] ||
echo "public class HealthCheck {
public static void main(String[] args) throws java.lang.Throwable {
java.net.URI uri = java.net.URI.create(args[0]);
System.exit(
java.net.HttpURLConnection.HTTP_OK ==
((java.net.HttpURLConnection)uri.toURL().
openConnection()).getResponseCode() ? 0 : 1);
}
}" > /tmp/HealthCheck.java &&
java /tmp/HealthCheck.java http://localhost:9000/health/live
interval: 5s
timeout: 5s
retries: 5
command: start-dev --import-realm
volumes:
- ./realm-config.json:/opt/keycloak/data/import/realm.json:ro
environment:
KC_BOOTSTRAP_ADMIN_USERNAME: admin
KC_BOOTSTRAP_ADMIN_PASSWORD: admin
KC_HEALTH_ENABLED: true
networks:
lakekeeper_net:
ports:
- "7070:8080"
networks:
lakekeeper_net:

View File

@@ -0,0 +1,217 @@
{
"realm": "lakekeeper-realm",
"enabled": true,
"verifyEmail": false,
"clients": [
{
"clientId": "lakekeeper-client",
"publicClient": true,
"directAccessGrantsEnabled": true,
"defaultClientScopes": [
"catalog"
]
},
{
"clientId": "impala-client",
"secret": "impala-client-secret",
"serviceAccountsEnabled": true,
"clientAuthenticatorType": "client-secret",
"publicClient": false,
"defaultClientScopes": [
"catalog"
]
}
],
"clientScopes": [
{
"name": "catalog",
"protocol": "openid-connect",
"attributes": {
"include.in.token.scope": "true",
"display.on.consent.screen": "false"
},
"protocolMappers": [
{
"name": "client roles",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-client-role-mapper",
"config": {
"access.token.claim": "true",
"claim.name": "resource_access.${client_id}.roles",
"id.token.claim": "false",
"jsonType.label": "String",
"multivalued": "true"
}
},
{
"name": "realm roles",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-realm-role-mapper",
"config": {
"access.token.claim": "true",
"claim.name": "realm_access.roles",
"id.token.claim": "false",
"jsonType.label": "String",
"multivalued": "true"
}
},
{
"name": "subject",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-property-mapper",
"config": {
"access.token.claim": "true",
"claim.name": "sub",
"id.token.claim": "true",
"jsonType.label": "String",
"user.attribute": "id"
}
}
]
}
],
"roles": {
"realm": [
{
"name": "admin",
"description": "Administrator role for Lakekeeper"
}
]
},
"users": [
{
"username": "lakekeeper-admin",
"enabled": true,
"credentials": [
{
"type": "password",
"value": "password",
"temporary": false
}
],
"realmRoles": ["admin"],
"requiredActions": []
}
],
"requiredActions": [
{
"alias": "CONFIGURE_TOTP",
"name": "Configure OTP",
"providerId": "CONFIGURE_TOTP",
"enabled": false,
"defaultAction": false,
"priority": 10,
"config": {}
},
{
"alias": "TERMS_AND_CONDITIONS",
"name": "Terms and Conditions",
"providerId": "TERMS_AND_CONDITIONS",
"enabled": false,
"defaultAction": false,
"priority": 20,
"config": {}
},
{
"alias": "UPDATE_PASSWORD",
"name": "Update Password",
"providerId": "UPDATE_PASSWORD",
"enabled": false,
"defaultAction": false,
"priority": 30,
"config": {}
},
{
"alias": "UPDATE_PROFILE",
"name": "Update Profile",
"providerId": "UPDATE_PROFILE",
"enabled": false,
"defaultAction": false,
"priority": 40,
"config": {}
},
{
"alias": "VERIFY_EMAIL",
"name": "Verify Email",
"providerId": "VERIFY_EMAIL",
"enabled": false,
"defaultAction": false,
"priority": 50,
"config": {}
},
{
"alias": "delete_account",
"name": "Delete Account",
"providerId": "delete_account",
"enabled": false,
"defaultAction": false,
"priority": 60,
"config": {}
},
{
"alias": "webauthn-register",
"name": "Webauthn Register",
"providerId": "webauthn-register",
"enabled": false,
"defaultAction": false,
"priority": 70,
"config": {}
},
{
"alias": "webauthn-register-passwordless",
"name": "Webauthn Register Passwordless",
"providerId": "webauthn-register-passwordless",
"enabled": false,
"defaultAction": false,
"priority": 80,
"config": {}
},
{
"alias": "VERIFY_PROFILE",
"name": "Verify Profile",
"providerId": "VERIFY_PROFILE",
"enabled": false,
"defaultAction": false,
"priority": 90,
"config": {}
},
{
"alias": "delete_credential",
"name": "Delete Credential",
"providerId": "delete_credential",
"enabled": false,
"defaultAction": false,
"priority": 100,
"config": {}
},
{
"alias": "idp_link",
"name": "Linking Identity Provider",
"providerId": "idp_link",
"enabled": false,
"defaultAction": false,
"priority": 110,
"config": {}
},
{
"alias": "CONFIGURE_RECOVERY_AUTHN_CODES",
"name": "Recovery Authentication Codes",
"providerId": "CONFIGURE_RECOVERY_AUTHN_CODES",
"enabled": false,
"defaultAction": false,
"priority": 120,
"config": {}
},
{
"alias": "update_user_locale",
"name": "Update User Locale",
"providerId": "update_user_locale",
"enabled": false,
"defaultAction": false,
"priority": 1000,
"config": {}
}
]
}

70
testdata/bin/minicluster_lakekeeper/setup.sh vendored Executable file
View File

@@ -0,0 +1,70 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Give services time to fully initialize
echo "Waiting for services..."
sleep 5
# Get Token from Keycloak
echo "Getting admin token..."
TOKEN=$(curl -s -X POST \
"http://localhost:7070/realms/lakekeeper-realm/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=lakekeeper-admin" \
-d "password=password" \
-d "grant_type=password" \
-d "client_id=lakekeeper-client" | jq -r '.access_token')
if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then
echo "Failed to get token from Keycloak"
exit 1
fi
echo "Token acquired successfully."
# Bootstrap Lakekeeper
echo "Bootstrapping Lakekeeper..."
curl -f -s -X POST "http://localhost:8181/management/v1/bootstrap" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
--data '{"accept-terms-of-use": true}' \
-o "/dev/null"
if [ $? -ne 0 ]; then
echo "Bootstrap failed!"
exit 1
fi
echo "Bootstrap successful."
# Create warehouse
echo "Creating warehouse..."
curl -f -s -X POST "http://localhost:8181/management/v1/warehouse" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
--data "@/create-default-warehouse.json" \
-o "/dev/null"
if [ $? -ne 0 ]; then
echo "Warehouse creation failed!"
exit 1
fi
echo "Warehouse created."
echo "Setup complete!"

View File

@@ -19,5 +19,8 @@ connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://localhost:8181/catalog
iceberg.rest-catalog.warehouse=lakekeeper_demo
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.oauth2.credential=impala-client:impala-client-secret
iceberg.rest-catalog.oauth2.server-uri=http://localhost:7070/realms/lakekeeper-realm/protocol/openid-connect/token
fs.hadoop.enabled=true
hive.config.resources=/etc/hive-site.xml,/etc/hdfs-site.xml,/etc/core-site.xml

View File

@@ -17,6 +17,14 @@
# specific language governing permissions and limitations
# under the License.
# Check Iceberg version. We need at least Iceberg 1.5
IFS='.-' read -r major minor _ <<< "$IMPALA_ICEBERG_VERSION"
if (( major < 1 )) || { (( major == 1 )) && (( minor < 5 )); }; then
echo "Iceberg version does NOT meet requirement (need at least 1.5):" \
"$IMPALA_ICEBERG_VERSION"
exit
fi
# Copy cluster configs to trino docker directory.
pushd ${HADOOP_CONF_DIR}
cp core-site.xml hdfs-site.xml ${IMPALA_HOME}/testdata/bin/minicluster_lakekeeper