From 062ba4071a2d3e82ed33827edb64e12c9630b955 Mon Sep 17 00:00:00 2001 From: Zoltan Borok-Nagy Date: Tue, 8 Jul 2025 16:06:36 +0200 Subject: [PATCH] IMPALA-14018: Configure OAUTH2 with Lakekeeper and fix Impala's config handling This patch adds Keycloak as Identity Provider for Lakekeeper, so now we can test Impala's Iceberg REST Catalog with an OAuth2 authentication (Client-Credential) flow. The Keycloak instance is pre-configured with a Lakekeeper realm that contain the necessary clients, users, scopes and roles. Manual testing also revealed that our Iceberg REST Catalog configuration is incomplete. This patch refactors config handling in a way that both Iceberg native configuration options and Trino-specific configuration options can be used with Impala. This will help users use their Trino connectors with Impala. By default Impala uses Iceberg 1.3 which assumes that the Iceberg REST server is also the authentication server. It is not always true, e.g. Lakekeeper cannot even function as the authententication server, but it can work with external authentication servers. Btw, this is why we needed Keycloak in the first place. It means if someone wants to try out Lakekeeper+Impala with Oauth2, they need to configure Impala with Iceberg 1.5. Testing * manual testing with Iceberg 1.5 Change-Id: Ie5785cb72773e188b1de7c7924cc6f0b1f96de33 (cherry picked from commit a9cb94986a5791be2adcb2f7c576272a9c22e79c) Reviewed-on: http://gerrit.cloudera.org:8080/23156 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- .../catalog/iceberg/IcebergRESTCatalog.java | 81 +----- .../iceberg/RESTCatalogProperties.java | 271 ++++++++++++++++++ .../iceberg/TestRESTCatalogProperties.java | 181 ++++++++++++ ...{stop-lakekeeper.sh => kill-lakekeeper.sh} | 0 .../bin/minicluster_lakekeeper/Dockerfile | 19 ++ testdata/bin/minicluster_lakekeeper/README.md | 2 +- .../docker-compose.yaml | 88 +++--- .../minicluster_lakekeeper/realm-config.json | 217 ++++++++++++++ testdata/bin/minicluster_lakekeeper/setup.sh | 70 +++++ .../iceberg_lakekeeper.properties | 3 + testdata/bin/run-lakekeeper.sh | 8 + 11 files changed, 820 insertions(+), 120 deletions(-) create mode 100644 fe/src/main/java/org/apache/impala/catalog/iceberg/RESTCatalogProperties.java create mode 100644 fe/src/test/java/org/apache/impala/catalog/iceberg/TestRESTCatalogProperties.java rename testdata/bin/{stop-lakekeeper.sh => kill-lakekeeper.sh} (100%) create mode 100644 testdata/bin/minicluster_lakekeeper/Dockerfile create mode 100644 testdata/bin/minicluster_lakekeeper/realm-config.json create mode 100755 testdata/bin/minicluster_lakekeeper/setup.sh diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java index 61e1e0529..d1c5c0786 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java @@ -20,36 +20,24 @@ package org.apache.impala.catalog.iceberg; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.UUID; import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTCatalog; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergTableLoadingException; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.util.IcebergUtil; -import com.google.common.collect.ImmutableMap; - /** * Implementation of IcebergCatalog for tables stored in HadoopCatalog. */ public class IcebergRESTCatalog implements IcebergCatalog { - private static final String KEY_URI = "iceberg.rest-catalog.uri"; - private static final String KEY_NAME = "iceberg.rest-catalog.name"; - private static final String KEY_CLIENT_ID = "iceberg.rest-catalog.client-id"; - private static final String KEY_CLIENT_SECRET = "iceberg.rest-catalog.client-secret"; - private static final String KEY_WAREHOUSE = "iceberg.rest-catalog.warehouse"; - private final String REST_URI; private static IcebergRESTCatalog instance_; @@ -63,77 +51,16 @@ public class IcebergRESTCatalog implements IcebergCatalog { return instance_; } - private static class IcebergRestConfig { - String catalogName; - String uri; - String user; - String secret; - String credential; - String warehouseLocation; - - IcebergRestConfig(Properties properties) { - uri = getRequiredProperty(properties, KEY_URI); - catalogName = properties.getProperty(KEY_NAME, ""); - user = properties.getProperty(KEY_CLIENT_ID); - secret = properties.getProperty(KEY_CLIENT_SECRET); - credential = getCredential(); - warehouseLocation = properties.getProperty(KEY_WAREHOUSE); - } - - public Map getCatalogProperties() { - ImmutableMap.Builder mapBuilder = new ImmutableMap.Builder<>(); - mapBuilder.put(CatalogProperties.URI, uri); - if (credential != null) mapBuilder.put("credential", credential); - if (warehouseLocation != null){ - mapBuilder.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); - } - return mapBuilder.build(); - } - - public SessionCatalog.SessionContext getSessionContext() { - return new SessionCatalog.SessionContext( - UUID.randomUUID().toString(), - user, - getCredentialMap(), - ImmutableMap.of()); - } - - private String getRequiredProperty(Properties properties, String key) { - String value = properties.getProperty(key); - if (value == null) { - throw new IllegalStateException( - String.format("Missing property of IcebergRESTCatalog: %s", key)); - } - return value; - } - - private String getCredential() { - if (user != null && secret != null) { - return user + ":" + secret; - } - return null; - } - - private ImmutableMap getCredentialMap() { - ImmutableMap.Builder mapBuilder = new ImmutableMap.Builder<>(); - if (credential != null) { - mapBuilder.put("credential", credential); - } - return mapBuilder.build(); - } - } - private IcebergRESTCatalog(Properties properties) { setContextClassLoader(); - IcebergRestConfig restConfig = new IcebergRestConfig(properties); - REST_URI = restConfig.uri; - restCatalog_ = new RESTCatalog(restConfig.getSessionContext(), - (config) -> HTTPClient.builder(config).uri(REST_URI).build()); + RESTCatalogProperties restConfig = new RESTCatalogProperties(properties); + REST_URI = restConfig.getUri(); + restCatalog_ = new RESTCatalog(); HiveConf conf = new HiveConf(IcebergRESTCatalog.class); restCatalog_.setConf(conf); restCatalog_.initialize( - restConfig.catalogName, + restConfig.getName(), restConfig.getCatalogProperties()); } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/RESTCatalogProperties.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/RESTCatalogProperties.java new file mode 100644 index 000000000..17c12960a --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/RESTCatalogProperties.java @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Utility class to extract native Iceberg catalog properties from a Properties object + * that is possibly created from a Trino connector file. The goal is that users can + * just simply reuse their already existing Trino configurations with Impala. + * + * Iceberg REST Catalog and Trino can use different property names for the same + * functionality.E.g.: + * +-------------------+----------------------------------------+ + * | Iceberg | Trino | + * +-------------------+----------------------------------------+ + * | uri | iceberg.rest-catalog.uri | + * | warehouse | iceberg.rest-catalog.warehouse | + * | oauth2-server-uri | iceberg.rest-catalog.oauth2.server-uri | + * +-------------------+----------------------------------------+ + * + * For a complete list check the followings: + * Iceberg: CatalogProperties, OAuth2Properties + * Trino: IcebergRestCatalogConfig, OAuth2SecurityConfig + * + * With this class Impala can recognize the alternative configuration names and translate + * them to the Iceberg native ones. It also handles required properties like "uri" that + * must be set. + * + * If we only support a single setting for a configuration option, we also verify their + * values. E.g. 'vended-credentials-enabled' must be false, as Impala doesn't support + * vended credentials yet. + * + * And some properties are simply ignored as they are specific to another query engine + * (e.g. 'case-insensitive-name-matching.cache-ttl'), or they have different purposes than + * configuring the REST catalog (e.g. 'connector.name'). + * + * The remaining properties (that are not translated, verified, or ignored) don't + * need special treatment and are simply returned as they are. + */ +public class RESTCatalogProperties { + /** + * Utility class for properties that can have alternative names. + */ + private static class Config { + protected String catalogKey; + protected ImmutableList alternativeKeys; + + public Config(String key) { + this(key, ImmutableList.of()); + } + + public Config(String key, ImmutableList alternativeKeys) { + this.catalogKey = key; + this.alternativeKeys = alternativeKeys; + } + + public boolean applyConfig( + Map sourceMap, Map outputMap) { + verifyOutputMap(outputMap); + + boolean applied = false; + String value = sourceMap.get(catalogKey); + if (value != null) { + applied = true; + sourceMap.remove(catalogKey); + outputMap.put(catalogKey, value); + } + // Even if already applied, check alternative keys for ambiguity. + for (String alternativeKey : alternativeKeys) { + value = sourceMap.get(alternativeKey); + if (value != null) { + if (applied) { + throw new IllegalStateException( + String.format("Alternative key '%s' sets the same configuration as " + + "'%s' which is already defined with value '%s'", + alternativeKey, catalogKey, value)); + } + applied = true; + sourceMap.remove(alternativeKey); + // We still need to use 'catalogKey' for alternative keys. + outputMap.put(catalogKey, value); + } + } + return applied; + } + + protected void verifyOutputMap(Map outputMap) { + String value = outputMap.get(catalogKey); + if (value != null) { + throw new IllegalStateException( + String.format("REST Catalog property is defined multiple times: %s\n" + + "Current value: %s", catalogKey, value)); + } + } + } + + /** + * Config that must be present. Currently only 'URI'. + */ + private static class RequiredConfig extends Config { + public RequiredConfig(String key) { + super(key, ImmutableList.of()); + } + + public RequiredConfig(String key, ImmutableList alternativeKeys) { + super(key, alternativeKeys); + } + + @Override + public boolean applyConfig( + Map sourceMap, Map outputMap) { + boolean success = super.applyConfig(sourceMap, outputMap); + if (success) return true; + throw new IllegalStateException( + String.format("Missing property of IcebergRESTCatalog: %s", catalogKey)); + } + } + + /** + * Configuration that is only meaningful for other query engines, and cannot be + * translated to Iceberg config. + */ + private static class IgnoredConfig extends Config { + public IgnoredConfig(String key) { + super(key, ImmutableList.of()); + } + + @Override + public boolean applyConfig( + Map sourceMap, Map outputMap) { + if (sourceMap.containsKey(catalogKey)) { + sourceMap.remove(catalogKey); + return true; + } else { + return false; + } + } + } + + /** + * Config for which we only support a single value. + */ + private static class VerifiedConfig extends Config { + private String expectedValue; + public VerifiedConfig(String key, String expectedValue) { + super(key, ImmutableList.of()); + Preconditions.checkState(expectedValue != null); + this.expectedValue = expectedValue; + } + + @Override + public boolean applyConfig( + Map sourceMap, Map outputMap) { + String value = sourceMap.get(catalogKey); + if (value != null) { + // Config keys are case sensitive, but the values are typically not, especially + // the config values that are verified (false/FALSE, none/NONE). + if (!expectedValue.equalsIgnoreCase(value)) { + throw new IllegalStateException( + String.format( + "The only allowed value for REST Catalog property '%s' is '%s'.\n" + + "Value in configuration is '%s'", + catalogKey, expectedValue, value)); + } + return true; + } + return false; + } + } + + private static final String NAME = "iceberg.rest-catalog.name"; + + private static final ImmutableList CATALOG_CONFIGS = ImmutableList.of( + new RequiredConfig(CatalogProperties.URI, + ImmutableList.of("iceberg.rest-catalog.uri")), + new Config("prefix", + ImmutableList.of("iceberg.rest-catalog.prefix")), + new Config(CatalogProperties.WAREHOUSE_LOCATION, + ImmutableList.of("iceberg.rest-catalog.warehouse")), + new Config(CatalogProperties.AUTH_SESSION_TIMEOUT_MS, + ImmutableList.of("iceberg.rest-catalog.session-timeout")), + // USER sessions are not supported + new VerifiedConfig("iceberg.rest-catalog.session", "NONE"), + new VerifiedConfig("iceberg.rest-catalog.vended-credentials-enabled", "false"), + new VerifiedConfig("iceberg.rest-catalog.nested-namespace-enabled", "false"), + new VerifiedConfig("iceberg.rest-catalog.case-insensitive-name-matching", "true"), + new IgnoredConfig("iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl"), + new IgnoredConfig("iceberg.catalog.type"), + new IgnoredConfig("connector.name"), + new IgnoredConfig(NAME) + ); + + private static final ImmutableList OAUTH2_CONFIGS = ImmutableList.of( + // Since currently only OAUTH2 is possible we ignore this config. It also + // doesn't map to any Iceberg catalog property. + new VerifiedConfig("iceberg.rest-catalog.security", "OAUTH2"), + // TODO: switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade. + new Config("oauth2-server-uri", + ImmutableList.of("iceberg.rest-catalog.oauth2.server-uri")), + new Config(OAuth2Properties.CREDENTIAL, + ImmutableList.of("iceberg.rest-catalog.oauth2.credential")), + new Config(OAuth2Properties.TOKEN, + ImmutableList.of("iceberg.rest-catalog.oauth2.token")), + new Config(OAuth2Properties.TOKEN_REFRESH_ENABLED, + ImmutableList.of("iceberg.rest-catalog.oauth2.token-refresh-enabled")), + new Config(OAuth2Properties.SCOPE, + ImmutableList.of("iceberg.rest-catalog.oauth2.scope")) + ); + + private Map sourceMap_; + private Map finalMap_; + private String uri_; + private String name_ = ""; + + public RESTCatalogProperties(Properties properties) { + sourceMap_ = new HashMap<>(); + for (String key : properties.stringPropertyNames()) { + sourceMap_.put(key, properties.getProperty(key)); + } + + // 'NAME' is used in RESTCatalog.initialize(), not in the properties. + if (sourceMap_.containsKey(NAME)) { + name_ = sourceMap_.get(NAME); + } + + finalMap_ = new HashMap<>(); + applyConfigs(CATALOG_CONFIGS, sourceMap_, finalMap_); + applyConfigs(OAUTH2_CONFIGS, sourceMap_, finalMap_); + // Copy over remaining configuration that do not need special handling. + for (Map.Entry entry : sourceMap_.entrySet()) { + Preconditions.checkState(!finalMap_.containsKey(entry.getKey())); + finalMap_.put(entry.getKey(), entry.getValue()); + } + uri_ = finalMap_.get(CatalogProperties.URI); + Preconditions.checkState(uri_ != null); + } + + private void applyConfigs(ImmutableList configs, Map sourceMap, + Map outputMap) { + for (Config config : configs) { + config.applyConfig(sourceMap, outputMap); + } + } + + public String getName() { return name_; } + public String getUri() { return uri_; } + public Map getCatalogProperties() { return finalMap_; } +} diff --git a/fe/src/test/java/org/apache/impala/catalog/iceberg/TestRESTCatalogProperties.java b/fe/src/test/java/org/apache/impala/catalog/iceberg/TestRESTCatalogProperties.java new file mode 100644 index 000000000..7dc89896d --- /dev/null +++ b/fe/src/test/java/org/apache/impala/catalog/iceberg/TestRESTCatalogProperties.java @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.junit.Test; + +import java.util.Map; +import java.util.Properties; + +public class TestRESTCatalogProperties { + + @Test + public void testEmptyConfig() { + try { + Properties props = new Properties(); + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + } catch (Exception e) { + // RESTCatalogProperties throws an exception if required properties are not defined. + return; + } + fail(); + } + + @Test + public void testUriOnlyConfig() { + Properties props = new Properties(); + props.setProperty(CatalogProperties.URI, "test-uri"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + assertEquals("test-uri", restProps.getUri()); + assertEquals("", restProps.getName()); + assertEquals(1, restProps.getCatalogProperties().size()); + assertTrue(restProps.getCatalogProperties().containsKey(CatalogProperties.URI)); + } + + @Test + public void testIcebergNativeConfig() { + Properties props = new Properties(); + props.setProperty(CatalogProperties.URI, "test-uri"); + props.setProperty("iceberg.rest-catalog.name", "catalog-name"); + props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc"); + props.setProperty(CatalogProperties.AUTH_SESSION_TIMEOUT_MS, "5000"); + //TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade. + props.setProperty("oauth2-server-uri", "oauth-uri"); + props.setProperty(OAuth2Properties.TOKEN, "oauth-token"); + props.setProperty(OAuth2Properties.SCOPE, "oauth-scope"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + assertEquals("test-uri", restProps.getUri()); + assertEquals("catalog-name", restProps.getName()); + assertEquals(6, restProps.getCatalogProperties().size()); + Map catProps = restProps.getCatalogProperties(); + assertEquals("test-uri", catProps.get(CatalogProperties.URI)); + assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION)); + assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS)); + assertEquals("oauth-uri", catProps.get("oauth2-server-uri")); + assertEquals("oauth-token", catProps.get(OAuth2Properties.TOKEN)); + assertEquals("oauth-scope", catProps.get(OAuth2Properties.SCOPE)); + } + + @Test + public void testTrinoConfig() { + Properties props = new Properties(); + props.setProperty("iceberg.rest-catalog.uri", "test-uri"); + props.setProperty("iceberg.rest-catalog.name", "catalog-name"); + props.setProperty("iceberg.rest-catalog.warehouse", "warehouse-loc"); + props.setProperty("iceberg.rest-catalog.session-timeout", "5000"); + //TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade. + props.setProperty("iceberg.rest-catalog.oauth2.server-uri", "oauth-uri"); + props.setProperty("iceberg.rest-catalog.oauth2.credential", "oauth-cred"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + assertEquals("test-uri", restProps.getUri()); + assertEquals("catalog-name", restProps.getName()); + assertEquals(5, restProps.getCatalogProperties().size()); + Map catProps = restProps.getCatalogProperties(); + assertEquals("test-uri", catProps.get(CatalogProperties.URI)); + assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION)); + assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS)); + assertEquals("oauth-uri", catProps.get("oauth2-server-uri")); + assertEquals("oauth-cred", catProps.get(OAuth2Properties.CREDENTIAL)); + } + + @Test + public void testAmbiguousKeys() { + try { + Properties props = new Properties(); + props.setProperty("iceberg.rest-catalog.uri", "test-uri"); + props.setProperty("uri", "test-uri2"); + props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + } catch (Exception e) { + // RESTCatalogProperties throws an exception when the same property is defined + // multiple times. + return; + } + fail(); + } + + @Test + public void testVerifiedConfigsSucceed() { + Properties props = new Properties(); + props.setProperty("iceberg.rest-catalog.uri", "test-uri"); + props.setProperty("iceberg.rest-catalog.session", "none"); + props.setProperty("iceberg.rest-catalog.vended-credentials-enabled", "false"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + Map catProps = restProps.getCatalogProperties(); + assertEquals(3, catProps.size()); + assertEquals("test-uri", catProps.get(CatalogProperties.URI)); + assertEquals("none", catProps.get("iceberg.rest-catalog.session")); + assertEquals("false", catProps.get( + "iceberg.rest-catalog.vended-credentials-enabled")); + } + + @Test + public void testVerifiedConfigsFail() { + try { + Properties props = new Properties(); + props.setProperty("iceberg.rest-catalog.uri", "test-uri"); + props.setProperty("iceberg.rest-catalog.session", "user"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + } catch (Exception e) { + // RESTCatalogProperties throws an exception when a verified config doesn't + // have the expected value. + return; + } + fail(); + } + + @Test + public void testIgnoredConfigs() { + Properties props = new Properties(); + props.setProperty(CatalogProperties.URI, "test-uri"); + props.setProperty("iceberg.rest-catalog.name", "catalog-name"); + props.setProperty(CatalogProperties.WAREHOUSE_LOCATION, "warehouse-loc"); + props.setProperty("iceberg.rest-catalog.session-timeout", "5000"); + //TODO: Switch to OAuth2Properties.OAUTH2_SERVER_URI with Iceberg upgrade. + props.setProperty("iceberg.rest-catalog.oauth2.server-uri", "oauth-uri"); + props.setProperty(OAuth2Properties.CREDENTIAL, "oauth-cred"); + props.setProperty("connector.name", "iceberg"); + props.setProperty("iceberg.catalog.type", "rest"); + + RESTCatalogProperties restProps = new RESTCatalogProperties(props); + assertEquals("test-uri", restProps.getUri()); + assertEquals("catalog-name", restProps.getName()); + Map catProps = restProps.getCatalogProperties(); + assertEquals(5, catProps.size()); + assertFalse(catProps.containsKey("connector.name")); + assertFalse(catProps.containsKey("iceberg.catalog.type")); + assertEquals("test-uri", catProps.get(CatalogProperties.URI)); + assertEquals("warehouse-loc", catProps.get(CatalogProperties.WAREHOUSE_LOCATION)); + assertEquals("5000", catProps.get(CatalogProperties.AUTH_SESSION_TIMEOUT_MS)); + assertEquals("oauth-uri", catProps.get("oauth2-server-uri")); + assertEquals("oauth-cred", catProps.get(OAuth2Properties.CREDENTIAL)); + } +} diff --git a/testdata/bin/stop-lakekeeper.sh b/testdata/bin/kill-lakekeeper.sh similarity index 100% rename from testdata/bin/stop-lakekeeper.sh rename to testdata/bin/kill-lakekeeper.sh diff --git a/testdata/bin/minicluster_lakekeeper/Dockerfile b/testdata/bin/minicluster_lakekeeper/Dockerfile new file mode 100644 index 000000000..35722f148 --- /dev/null +++ b/testdata/bin/minicluster_lakekeeper/Dockerfile @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM alpine:latest +RUN apk add --no-cache curl jq diff --git a/testdata/bin/minicluster_lakekeeper/README.md b/testdata/bin/minicluster_lakekeeper/README.md index b1e5b2df8..4ae269d2d 100644 --- a/testdata/bin/minicluster_lakekeeper/README.md +++ b/testdata/bin/minicluster_lakekeeper/README.md @@ -8,7 +8,7 @@ You need docker compose (Compose V2) in your environment. This usually means you Via the following scripts you can run/stop Lakekeeper. Be aware that each restart resets the warehouse contents. ``` ${IMPALA_HOME}/testdata/bin/run-lakekeeper.sh -${IMPALA_HOME}/testdata/bin/stop-lakekeeper.sh +${IMPALA_HOME}/testdata/bin/kill-lakekeeper.sh ``` ## Ingesting data diff --git a/testdata/bin/minicluster_lakekeeper/docker-compose.yaml b/testdata/bin/minicluster_lakekeeper/docker-compose.yaml index 1e1c6ba25..f9db64f69 100644 --- a/testdata/bin/minicluster_lakekeeper/docker-compose.yaml +++ b/testdata/bin/minicluster_lakekeeper/docker-compose.yaml @@ -24,6 +24,8 @@ services: - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@localhost:54321/postgres - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@localhost:54321/postgres - LAKEKEEPER__ENABLE_HDFS_WITH_SYSTEM_CREDENTIALS=true + - LAKEKEEPER__OPENID_PROVIDER_URI=http://localhost:7070/realms/lakekeeper-realm + - LAKEKEEPER__OPENID_CLIENT_ID=lakekeeper-client - RUST_LOG=trace,axum=trace,sqlx=trace,iceberg-catalog=trace - HADOOP_USER_NAME=${USER} - HADOOP_CONF_DIR=/etc/hadoop @@ -40,6 +42,8 @@ services: condition: service_completed_successfully db: condition: service_healthy + keycloak: + condition: service_healthy volumes: - ./core-site.xml:/etc/hadoop/core-site.xml:ro - ./hdfs-site.xml:/etc/hadoop/hdfs-site.xml:ro @@ -53,61 +57,29 @@ services: - LAKEKEEPER__ENABLE_HDFS_WITH_SYSTEM_CREDENTIALS=true - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://postgres:postgres@db:5432/postgres - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://postgres:postgres@db:5432/postgres + - LAKEKEEPER__OPENID_PROVIDER_URI=http://localhost:7070/realms/lakekeeper-realm + - LAKEKEEPER__OPENID_CLIENT_ID=lakekeeper-client - RUST_LOG=info restart: "no" command: [ "migrate" ] depends_on: db: condition: service_healthy + keycloak: + condition: service_healthy networks: lakekeeper_net: bootstrap: - image: curlimages/curl + build: . depends_on: - lakekeeper: - condition: service_healthy - restart: "no" - command: - - -w - - "%{http_code}" - - "-X" - - "POST" - - "-v" - - "http://localhost:8181/management/v1/bootstrap" - - "-H" - - "Content-Type: application/json" - - "--data" - - '{"accept-terms-of-use": true}' - - "-o" - - "/dev/null" - # - "--fail-with-body" - network_mode: host - - initialwarehouse: - image: curlimages/curl - depends_on: - lakekeeper: - condition: service_healthy - bootstrap: - condition: service_completed_successfully - restart: "no" - command: - - -w - - "%{http_code}" - - "-X" - - "POST" - - "-v" - - "http://localhost:8181/management/v1/warehouse" - - "-H" - - "Content-Type: application/json" - - "--data" - - "@create-default-warehouse.json" - - "-o" - - "/dev/null" + - keycloak + - lakekeeper volumes: - - ./create-default-warehouse.json:/home/curl_user/create-default-warehouse.json + - ./setup.sh:/setup.sh + - ./create-default-warehouse.json:/create-default-warehouse.json network_mode: host + entrypoint: /setup.sh db: image: bitnami/postgresql:16.3.0 @@ -126,6 +98,38 @@ services: ports: - "54321:5432" + keycloak: + image: quay.io/keycloak/keycloak:latest + healthcheck: + test: + - "CMD-SHELL" + - > + [ -f /tmp/HealthCheck.java ] || + echo "public class HealthCheck { + public static void main(String[] args) throws java.lang.Throwable { + java.net.URI uri = java.net.URI.create(args[0]); + System.exit( + java.net.HttpURLConnection.HTTP_OK == + ((java.net.HttpURLConnection)uri.toURL(). + openConnection()).getResponseCode() ? 0 : 1); + } + }" > /tmp/HealthCheck.java && + java /tmp/HealthCheck.java http://localhost:9000/health/live + interval: 5s + timeout: 5s + retries: 5 + command: start-dev --import-realm + volumes: + - ./realm-config.json:/opt/keycloak/data/import/realm.json:ro + environment: + KC_BOOTSTRAP_ADMIN_USERNAME: admin + KC_BOOTSTRAP_ADMIN_PASSWORD: admin + KC_HEALTH_ENABLED: true + networks: + lakekeeper_net: + ports: + - "7070:8080" + networks: lakekeeper_net: diff --git a/testdata/bin/minicluster_lakekeeper/realm-config.json b/testdata/bin/minicluster_lakekeeper/realm-config.json new file mode 100644 index 000000000..a54edcf15 --- /dev/null +++ b/testdata/bin/minicluster_lakekeeper/realm-config.json @@ -0,0 +1,217 @@ +{ + "realm": "lakekeeper-realm", + "enabled": true, + "verifyEmail": false, + "clients": [ + { + "clientId": "lakekeeper-client", + "publicClient": true, + "directAccessGrantsEnabled": true, + "defaultClientScopes": [ + "catalog" + ] + }, + { + "clientId": "impala-client", + "secret": "impala-client-secret", + "serviceAccountsEnabled": true, + "clientAuthenticatorType": "client-secret", + "publicClient": false, + "defaultClientScopes": [ + "catalog" + ] + } + ], + "clientScopes": [ + { + "name": "catalog", + "protocol": "openid-connect", + "attributes": { + "include.in.token.scope": "true", + "display.on.consent.screen": "false" + }, + "protocolMappers": [ + { + "name": "client roles", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-client-role-mapper", + "config": { + "access.token.claim": "true", + "claim.name": "resource_access.${client_id}.roles", + "id.token.claim": "false", + "jsonType.label": "String", + "multivalued": "true" + } + }, + { + "name": "realm roles", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-realm-role-mapper", + "config": { + "access.token.claim": "true", + "claim.name": "realm_access.roles", + "id.token.claim": "false", + "jsonType.label": "String", + "multivalued": "true" + } + }, + { + "name": "subject", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-property-mapper", + "config": { + "access.token.claim": "true", + "claim.name": "sub", + "id.token.claim": "true", + "jsonType.label": "String", + "user.attribute": "id" + } + } + ] + } + ], + "roles": { + "realm": [ + { + "name": "admin", + "description": "Administrator role for Lakekeeper" + } + ] + }, + "users": [ + { + "username": "lakekeeper-admin", + "enabled": true, + "credentials": [ + { + "type": "password", + "value": "password", + "temporary": false + } + ], + "realmRoles": ["admin"], + "requiredActions": [] + } + ], + "requiredActions": [ + { + "alias": "CONFIGURE_TOTP", + "name": "Configure OTP", + "providerId": "CONFIGURE_TOTP", + "enabled": false, + "defaultAction": false, + "priority": 10, + "config": {} + }, + { + "alias": "TERMS_AND_CONDITIONS", + "name": "Terms and Conditions", + "providerId": "TERMS_AND_CONDITIONS", + "enabled": false, + "defaultAction": false, + "priority": 20, + "config": {} + }, + { + "alias": "UPDATE_PASSWORD", + "name": "Update Password", + "providerId": "UPDATE_PASSWORD", + "enabled": false, + "defaultAction": false, + "priority": 30, + "config": {} + }, + { + "alias": "UPDATE_PROFILE", + "name": "Update Profile", + "providerId": "UPDATE_PROFILE", + "enabled": false, + "defaultAction": false, + "priority": 40, + "config": {} + }, + { + "alias": "VERIFY_EMAIL", + "name": "Verify Email", + "providerId": "VERIFY_EMAIL", + "enabled": false, + "defaultAction": false, + "priority": 50, + "config": {} + }, + { + "alias": "delete_account", + "name": "Delete Account", + "providerId": "delete_account", + "enabled": false, + "defaultAction": false, + "priority": 60, + "config": {} + }, + { + "alias": "webauthn-register", + "name": "Webauthn Register", + "providerId": "webauthn-register", + "enabled": false, + "defaultAction": false, + "priority": 70, + "config": {} + }, + { + "alias": "webauthn-register-passwordless", + "name": "Webauthn Register Passwordless", + "providerId": "webauthn-register-passwordless", + "enabled": false, + "defaultAction": false, + "priority": 80, + "config": {} + }, + { + "alias": "VERIFY_PROFILE", + "name": "Verify Profile", + "providerId": "VERIFY_PROFILE", + "enabled": false, + "defaultAction": false, + "priority": 90, + "config": {} + }, + { + "alias": "delete_credential", + "name": "Delete Credential", + "providerId": "delete_credential", + "enabled": false, + "defaultAction": false, + "priority": 100, + "config": {} + }, + + { + "alias": "idp_link", + "name": "Linking Identity Provider", + "providerId": "idp_link", + "enabled": false, + "defaultAction": false, + "priority": 110, + "config": {} + }, + { + "alias": "CONFIGURE_RECOVERY_AUTHN_CODES", + "name": "Recovery Authentication Codes", + "providerId": "CONFIGURE_RECOVERY_AUTHN_CODES", + "enabled": false, + "defaultAction": false, + "priority": 120, + "config": {} + }, + { + "alias": "update_user_locale", + "name": "Update User Locale", + "providerId": "update_user_locale", + "enabled": false, + "defaultAction": false, + "priority": 1000, + "config": {} + } + ] +} + diff --git a/testdata/bin/minicluster_lakekeeper/setup.sh b/testdata/bin/minicluster_lakekeeper/setup.sh new file mode 100755 index 000000000..f9e1f9ed2 --- /dev/null +++ b/testdata/bin/minicluster_lakekeeper/setup.sh @@ -0,0 +1,70 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Give services time to fully initialize +echo "Waiting for services..." +sleep 5 + +# Get Token from Keycloak +echo "Getting admin token..." +TOKEN=$(curl -s -X POST \ + "http://localhost:7070/realms/lakekeeper-realm/protocol/openid-connect/token" \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "username=lakekeeper-admin" \ + -d "password=password" \ + -d "grant_type=password" \ + -d "client_id=lakekeeper-client" | jq -r '.access_token') + +if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then + echo "Failed to get token from Keycloak" + exit 1 +fi + +echo "Token acquired successfully." + +# Bootstrap Lakekeeper +echo "Bootstrapping Lakekeeper..." +curl -f -s -X POST "http://localhost:8181/management/v1/bootstrap" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + --data '{"accept-terms-of-use": true}' \ + -o "/dev/null" + +if [ $? -ne 0 ]; then + echo "Bootstrap failed!" + exit 1 +fi + +echo "Bootstrap successful." + +# Create warehouse +echo "Creating warehouse..." +curl -f -s -X POST "http://localhost:8181/management/v1/warehouse" \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + --data "@/create-default-warehouse.json" \ + -o "/dev/null" + +if [ $? -ne 0 ]; then + echo "Warehouse creation failed!" + exit 1 +fi + +echo "Warehouse created." +echo "Setup complete!" diff --git a/testdata/bin/minicluster_trino/iceberg_lakekeeper.properties b/testdata/bin/minicluster_trino/iceberg_lakekeeper.properties index c72d0abfa..28baeb93a 100644 --- a/testdata/bin/minicluster_trino/iceberg_lakekeeper.properties +++ b/testdata/bin/minicluster_trino/iceberg_lakekeeper.properties @@ -19,5 +19,8 @@ connector.name=iceberg iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://localhost:8181/catalog iceberg.rest-catalog.warehouse=lakekeeper_demo +iceberg.rest-catalog.security=OAUTH2 +iceberg.rest-catalog.oauth2.credential=impala-client:impala-client-secret +iceberg.rest-catalog.oauth2.server-uri=http://localhost:7070/realms/lakekeeper-realm/protocol/openid-connect/token fs.hadoop.enabled=true hive.config.resources=/etc/hive-site.xml,/etc/hdfs-site.xml,/etc/core-site.xml diff --git a/testdata/bin/run-lakekeeper.sh b/testdata/bin/run-lakekeeper.sh index 559ba319b..f4097542c 100755 --- a/testdata/bin/run-lakekeeper.sh +++ b/testdata/bin/run-lakekeeper.sh @@ -17,6 +17,14 @@ # specific language governing permissions and limitations # under the License. +# Check Iceberg version. We need at least Iceberg 1.5 +IFS='.-' read -r major minor _ <<< "$IMPALA_ICEBERG_VERSION" +if (( major < 1 )) || { (( major == 1 )) && (( minor < 5 )); }; then + echo "Iceberg version does NOT meet requirement (need at least 1.5):" \ + "$IMPALA_ICEBERG_VERSION" + exit +fi + # Copy cluster configs to trino docker directory. pushd ${HADOOP_CONF_DIR} cp core-site.xml hdfs-site.xml ${IMPALA_HOME}/testdata/bin/minicluster_lakekeeper