1
0
mirror of synced 2025-12-25 02:09:19 -05:00

add the ability to use a secret persistence (#6415)

* test exposing secrets in configrepo

* fix local persistence sql

* working propagation, just without check/discover replacements and without feature flagging

* switch if statement

* set up secret persistence for google secrets manager

* add ttl-based secret persistence for check/discover usage in the future

* set up check/discover to pass around necessary parts

* Revert "set up check/discover to pass around necessary parts"

This reverts commit 489d2d5f5d.

* working updates + check/discover operations

* fix additional configs created on deletion

* clean up docker compose file

* finish up configrepo

* make api path optional

* clean up schedulerapp and local testing persistence

* make optional in the worker app

* add rest of feature flagging

* fmt

* remove completed todo

* fix refactoring typo

* fix another refactoring typo

* fix compilation error in test case

* fix tests

* final cleanups

* fix conditional

* address a couple of things

* add hydrator interface

* add replaceAllConfigs

* specfetcher handling

* fix constructor

* fix test

* fix typo

* fix merge build error

* remove extra config

* fix integration test

* fix final piece
This commit is contained in:
Jared Rhizor
2021-09-29 11:53:29 -07:00
committed by GitHub
parent 3c59b49135
commit f88b8313a8
44 changed files with 844 additions and 444 deletions

3
.env
View File

@@ -75,3 +75,6 @@ MAX_SYNC_JOB_ATTEMPTS=3
# Time in days to reach a timeout to cancel the synchronization
MAX_SYNC_TIMEOUT_DAYS=3
# Set secret persistence store to use. Do not change this for existing installations!
SECRET_PERSISTENCE=NONE

View File

@@ -43,8 +43,6 @@ public interface Configs {
String getSecretStoreGcpCredentials();
String getSecretStoreForConfigs();
boolean runDatabaseMigrationOnStartup();
int getMaxSyncJobAttempts();
@@ -109,6 +107,8 @@ public interface Configs {
String getGoogleApplicationCredentials();
SecretPersistenceType getSecretPersistenceType();
enum TrackingStrategy {
SEGMENT,
LOGGING
@@ -124,4 +124,10 @@ public interface Configs {
CLOUD
}
enum SecretPersistenceType {
NONE,
TESTING_CONFIG_DB_TABLE,
GOOGLE_SECRET_MANAGER
}
}

View File

@@ -67,6 +67,7 @@ public class EnvConfigs implements Configs {
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
private static final String RESOURCE_MEMORY_LIMIT = "RESOURCE_MEMORY_LIMIT";
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
private static final String JOBS_IMAGE_PULL_SECRET = "JOBS_IMAGE_PULL_SECRET";
// defaults
@@ -76,7 +77,6 @@ public class EnvConfigs implements Configs {
private static final String DEFAULT_RESOURCE_REQUIREMENT_MEMORY = null;
private static final String SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID";
private static final String SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS";
private static final String SECRET_STORE_FOR_CONFIGS = "SECRET_STORE_CONFIGS_ENABLE";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
@@ -191,11 +191,6 @@ public class EnvConfigs implements Configs {
return getEnv(SECRET_STORE_GCP_PROJECT_ID);
}
@Override
public String getSecretStoreForConfigs() {
return getEnv(SECRET_STORE_FOR_CONFIGS);
}
@Override
public boolean runDatabaseMigrationOnStartup() {
return getEnvOrDefault(RUN_DATABASE_MIGRATION_ON_STARTUP, true);
@@ -428,6 +423,12 @@ public class EnvConfigs implements Configs {
return getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, "");
}
@Override
public SecretPersistenceType getSecretPersistenceType() {
final var secretPersistenceStr = getEnvOrDefault(SECRET_PERSISTENCE, SecretPersistenceType.NONE.name());
return SecretPersistenceType.valueOf(secretPersistenceStr);
}
private String getEnvOrDefault(final String key, final String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity(), false);
}

View File

@@ -6,6 +6,7 @@ plugins {
dependencies {
implementation 'commons-io:commons-io:2.7'
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-db:lib')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-protocol:models')

View File

@@ -5,7 +5,9 @@
package io.airbyte.config.persistence;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
@@ -18,25 +20,44 @@ import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ConfigRepository {
private final ConfigPersistence persistence;
private static final UUID NO_WORKSPACE = UUID.fromString("00000000-0000-0000-0000-000000000000");
public ConfigRepository(final ConfigPersistence persistence) {
private final ConfigPersistence persistence;
private final SecretsHydrator secretsHydrator;
private final Optional<SecretPersistence> longLivedSecretPersistence;
private final Optional<SecretPersistence> ephemeralSecretPersistence;
private Function<String, ConnectorSpecification> specFetcherFn;
public ConfigRepository(final ConfigPersistence persistence,
final SecretsHydrator secretsHydrator,
final Optional<SecretPersistence> longLivedSecretPersistence,
final Optional<SecretPersistence> ephemeralSecretPersistence) {
this.persistence = persistence;
this.secretsHydrator = secretsHydrator;
this.longLivedSecretPersistence = longLivedSecretPersistence;
this.ephemeralSecretPersistence = ephemeralSecretPersistence;
}
public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
@@ -149,41 +170,173 @@ public class ConfigRepository {
destinationDefinition);
}
public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
public SourceConnection getSourceConnection(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class);
}
public SourceConnection getSourceConnectionWithSecrets(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException {
final var source = getSourceConnection(sourceId);
final var fullConfig = secretsHydrator.hydrate(source.getConfiguration());
return Jsons.clone(source).withConfiguration(fullConfig);
}
private Optional<SourceConnection> getOptionalSourceConnection(final UUID sourceId) throws JsonValidationException, IOException {
try {
return Optional.of(getSourceConnection(sourceId));
} catch (ConfigNotFoundException e) {
return Optional.empty();
}
}
public void writeSourceConnection(final SourceConnection source, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
// actual validation is only for sanity checking
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), source.getConfiguration());
persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
final var previousSourceConnection = getOptionalSourceConnection(source.getSourceId())
.map(SourceConnection::getConfiguration);
final var partialConfig =
statefulUpdateSecrets(source.getWorkspaceId(), previousSourceConnection, source.getConfiguration(), connectorSpecification);
final var partialSource = Jsons.clone(source).withConfiguration(partialConfig);
persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), partialSource);
}
/**
*
* @param workspaceId workspace id for the config
* @param fullConfig full config
* @param spec connector specification
* @return partial config
*/
public JsonNode statefulSplitSecrets(final UUID workspaceId, final JsonNode fullConfig, final ConnectorSpecification spec) {
return splitSecretConfig(workspaceId, fullConfig, spec, longLivedSecretPersistence);
}
/**
*
* @param workspaceId workspace id for the config
* @param oldConfig old full config
* @param fullConfig new full config
* @param spec connector specification
* @return partial config
*/
public JsonNode statefulUpdateSecrets(final UUID workspaceId,
final Optional<JsonNode> oldConfig,
final JsonNode fullConfig,
final ConnectorSpecification spec) {
if (longLivedSecretPersistence.isPresent()) {
if (oldConfig.isPresent()) {
final var splitSecretConfig = SecretsHelpers.splitAndUpdateConfig(
workspaceId,
oldConfig.get(),
fullConfig,
spec,
longLivedSecretPersistence.get());
splitSecretConfig.getCoordinateToPayload().forEach(longLivedSecretPersistence.get()::write);
return splitSecretConfig.getPartialConfig();
} else {
final var splitSecretConfig = SecretsHelpers.splitConfig(
workspaceId,
fullConfig,
spec);
splitSecretConfig.getCoordinateToPayload().forEach(longLivedSecretPersistence.get()::write);
return splitSecretConfig.getPartialConfig();
}
} else {
return fullConfig;
}
}
/**
*
* @param fullConfig full config
* @param spec connector specification
* @return partial config
*/
public JsonNode statefulSplitEphemeralSecrets(final JsonNode fullConfig, final ConnectorSpecification spec) {
return splitSecretConfig(NO_WORKSPACE, fullConfig, spec, ephemeralSecretPersistence);
}
private JsonNode splitSecretConfig(final UUID workspaceId,
final JsonNode fullConfig,
final ConnectorSpecification spec,
final Optional<SecretPersistence> secretPersistence) {
if (secretPersistence.isPresent()) {
final SplitSecretConfig splitSecretConfig = SecretsHelpers.splitConfig(workspaceId, fullConfig, spec);
splitSecretConfig.getCoordinateToPayload().forEach(secretPersistence.get()::write);
return splitSecretConfig.getPartialConfig();
} else {
return fullConfig;
}
}
public List<SourceConnection> listSourceConnection() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
}
public List<SourceConnection> listSourceConnectionWithSecrets() throws JsonValidationException, IOException {
final var sources = listSourceConnection();
return sources.stream()
.map(partialSource -> Exceptions.toRuntime(() -> getSourceConnectionWithSecrets(partialSource.getSourceId())))
.collect(Collectors.toList());
}
public DestinationConnection getDestinationConnection(final UUID destinationId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId.toString(), DestinationConnection.class);
}
public void writeDestinationConnection(final DestinationConnection destinationConnection, final ConnectorSpecification connectorSpecification)
public DestinationConnection getDestinationConnectionWithSecrets(final UUID destinationId)
throws JsonValidationException, IOException, ConfigNotFoundException {
final var destination = getDestinationConnection(destinationId);
final var fullConfig = secretsHydrator.hydrate(destination.getConfiguration());
return Jsons.clone(destination).withConfiguration(fullConfig);
}
private Optional<DestinationConnection> getOptionalDestinationConnection(final UUID destinationId) throws JsonValidationException, IOException {
try {
return Optional.of(getDestinationConnection(destinationId));
} catch (ConfigNotFoundException e) {
return Optional.empty();
}
}
public void writeDestinationConnection(final DestinationConnection destination, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
// actual validation is only for sanity checking
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), destinationConnection.getConfiguration());
validator.ensure(connectorSpecification.getConnectionSpecification(), destination.getConfiguration());
persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), destinationConnection);
final var previousDestinationConnection = getOptionalDestinationConnection(destination.getDestinationId())
.map(DestinationConnection::getConfiguration);
final var partialConfig =
statefulUpdateSecrets(destination.getWorkspaceId(), previousDestinationConnection, destination.getConfiguration(), connectorSpecification);
final var partialDestination = Jsons.clone(destination).withConfiguration(partialConfig);
persistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), partialDestination);
}
public List<DestinationConnection> listDestinationConnection() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
}
public List<DestinationConnection> listDestinationConnectionWithSecrets() throws JsonValidationException, IOException {
final var destinations = listDestinationConnection();
return destinations.stream()
.map(partialDestination -> Exceptions.toRuntime(() -> getDestinationConnectionWithSecrets(partialDestination.getDestinationId())))
.collect(Collectors.toList());
}
public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class);
}
@@ -279,15 +432,94 @@ public class ConfigRepository {
}
public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
persistence.replaceAllConfigs(configs, dryRun);
if (longLivedSecretPersistence.isPresent()) {
final var augmentedMap = new HashMap<>(configs);
// get all source defs so that we can use their specs when storing secrets.
@SuppressWarnings("unchecked")
final List<StandardSourceDefinition> sourceDefs =
(List<StandardSourceDefinition>) augmentedMap.get(ConfigSchema.STANDARD_SOURCE_DEFINITION).collect(Collectors.toList());
// restore data in the map that gets consumed downstream.
augmentedMap.put(ConfigSchema.STANDARD_SOURCE_DEFINITION, sourceDefs.stream());
final Map<UUID, ConnectorSpecification> sourceDefIdToSpec = sourceDefs
.stream()
.collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, sourceDefinition -> {
final String imageName = DockerUtils
.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
return specFetcherFn.apply(imageName);
}));
// get all destination defs so that we can use their specs when storing secrets.
@SuppressWarnings("unchecked")
final List<StandardDestinationDefinition> destinationDefs =
(List<StandardDestinationDefinition>) augmentedMap.get(ConfigSchema.STANDARD_SOURCE_DEFINITION).collect(Collectors.toList());
augmentedMap.put(ConfigSchema.STANDARD_SOURCE_DEFINITION, destinationDefs.stream());
final Map<UUID, ConnectorSpecification> destinationDefIdToSpec = destinationDefs
.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, destinationDefinition -> {
final String imageName = DockerUtils
.getTaggedImageName(destinationDefinition.getDockerRepository(), destinationDefinition.getDockerImageTag());
return specFetcherFn.apply(imageName);
}));
if (augmentedMap.containsKey(ConfigSchema.SOURCE_CONNECTION)) {
final Stream<JsonNode> augmentedValue = augmentedMap.get(ConfigSchema.SOURCE_CONNECTION)
.map(config -> {
final SourceConnection source = (SourceConnection) config;
if (sourceDefIdToSpec.containsKey(source.getSourceDefinitionId())) {
throw new RuntimeException(new ConfigNotFoundException(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId()));
}
return statefulSplitSecrets(source.getWorkspaceId(), source.getConfiguration(), sourceDefIdToSpec.get(source.getSourceDefinitionId()));
});
augmentedMap.put(ConfigSchema.SOURCE_CONNECTION, augmentedValue);
}
if (augmentedMap.containsKey(ConfigSchema.DESTINATION_CONNECTION)) {
final Stream<JsonNode> augmentedValue = augmentedMap.get(ConfigSchema.DESTINATION_CONNECTION)
.map(config -> {
final DestinationConnection destination = (DestinationConnection) config;
if (destinationDefIdToSpec.containsKey(destination.getDestinationId())) {
throw new RuntimeException(
new ConfigNotFoundException(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId()));
}
return statefulSplitSecrets(destination.getWorkspaceId(), destination.getConfiguration(),
sourceDefIdToSpec.get(destination.getDestinationDefinitionId()));
});
augmentedMap.put(ConfigSchema.DESTINATION_CONNECTION, augmentedValue);
}
} else {
persistence.replaceAllConfigs(configs, dryRun);
}
}
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
return persistence.dumpConfigs();
final var map = new HashMap<>(persistence.dumpConfigs());
final var sourceKey = ConfigSchema.SOURCE_CONNECTION.name();
final var destinationKey = ConfigSchema.DESTINATION_CONNECTION.name();
if (map.containsKey(sourceKey)) {
final Stream<JsonNode> augmentedValue = map.get(sourceKey).map(secretsHydrator::hydrate);
map.put(sourceKey, augmentedValue);
}
if (map.containsKey(destinationKey)) {
final Stream<JsonNode> augmentedValue = map.get(destinationKey).map(secretsHydrator::hydrate);
map.put(destinationKey, augmentedValue);
}
return map;
}
public void loadData(ConfigPersistence seedPersistence) throws IOException {
persistence.loadData(seedPersistence);
}
public void setSpecFetcher(Function<String, ConnectorSpecification> specFetcherFn) {
this.specFetcherFn = specFetcherFn;
}
}

View File

@@ -1,135 +0,0 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.ProjectName;
import com.google.cloud.secretmanager.v1.Replication;
import com.google.cloud.secretmanager.v1.Secret;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretManagerServiceSettings;
import com.google.cloud.secretmanager.v1.SecretName;
import com.google.cloud.secretmanager.v1.SecretPayload;
import com.google.cloud.secretmanager.v1.SecretVersion;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.airbyte.config.EnvConfigs;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Wrapper class to simplify the API for accessing secrets
*/
public class GoogleSecretsManager {
/**
* Manual test fixture to make sure you've got your project id set in env and have appropriate creds
* to reach/write the secret store.
*/
public static void main(String[] args) throws Exception {
// Check that we're configured to a usable GCP project.
EnvConfigs envConfig = new EnvConfigs();
String projectId = envConfig.getSecretStoreGcpProjectId();
Preconditions.checkNotNull(projectId, "Project ID must not be empty");
Preconditions.checkNotNull(Long.parseLong(projectId), "Project ID must be purely numeric, not %s".format(projectId));
// Check that we can read an existing one from that project / have permissions etc.
Preconditions.checkArgument(existsSecret("zzzzzz") == false, "Secret doesn't exist, should return false.");
Preconditions.checkArgument(existsSecret("dev_practice_sample_secret"), "Secret already exists, should return true.");
String content = readSecret("dev_practice_sample_secret");
Preconditions.checkArgument("ThisIsMyTest".equals(content));
// Try creating a new one and reading it back.
String rand = UUID.randomUUID().toString();
String key = "dev_practice_sample_" + rand;
saveSecret(key, rand);
String rand2 = readSecret(key);
Preconditions.checkArgument(rand.equals(rand2), "Values should have matched after writing and re-reading a new key.");
saveSecret(key, "foo");
deleteSecret(key);
}
public static String readSecret(String secretId) throws IOException {
EnvConfigs envConfig = new EnvConfigs();
String projectId = envConfig.getSecretStoreGcpProjectId();
try (SecretManagerServiceClient client = getSecretManagerServiceClient()) {
SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return response.getPayload().getData().toStringUtf8();
} catch (com.google.api.gax.rpc.NotFoundException e) {
return null;
}
}
private static SecretManagerServiceClient getSecretManagerServiceClient() throws IOException {
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream((new EnvConfigs()).getSecretStoreGcpCredentials().getBytes(StandardCharsets.UTF_8)));
return SecretManagerServiceClient.create(
SecretManagerServiceSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build());
}
public static boolean existsSecret(String secretId) throws IOException {
EnvConfigs envConfig = new EnvConfigs();
String projectId = envConfig.getSecretStoreGcpProjectId();
try (SecretManagerServiceClient client = getSecretManagerServiceClient()) {
System.out.println("Project ID: " + projectId);
System.out.println("Secret ID: " + secretId);
SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, "latest");
System.out.println(secretVersionName);
AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName);
return true;
} catch (com.google.api.gax.rpc.NotFoundException e) {
return false;
}
}
public static void saveSecret(String secretId, String value) throws IOException {
EnvConfigs envConfig = new EnvConfigs();
String projectId = envConfig.getSecretStoreGcpProjectId();
try (SecretManagerServiceClient client = getSecretManagerServiceClient()) {
if (!existsSecret(secretId)) {
Secret secret = Secret.newBuilder().setReplication(Replication.newBuilder().setAutomatic(
Replication.Automatic.newBuilder().build()).build()).build();
Secret createdSecret = client.createSecret(ProjectName.of(projectId), secretId, secret);
}
SecretPayload payload = SecretPayload.newBuilder()
.setData(ByteString.copyFromUtf8(value))
.build();
SecretVersion version = client.addSecretVersion(SecretName.of(projectId, secretId), payload);
}
}
public static void deleteSecret(String secretId) throws IOException {
EnvConfigs envConfig = new EnvConfigs();
String projectId = envConfig.getSecretStoreGcpProjectId();
try (SecretManagerServiceClient client = getSecretManagerServiceClient()) {
SecretName secretName = SecretName.of(projectId, secretId);
client.deleteSecret(secretName);
}
}
public static List<String> listSecretsMatching(String prefix) throws IOException {
final String PREFIX_REGEX = "projects/\\d+/secrets/";
List<String> names = new ArrayList<String>();
try (SecretManagerServiceClient client = getSecretManagerServiceClient()) {
client.listSecrets(ProjectName.of(new EnvConfigs().getSecretStoreGcpProjectId())).iterateAll()
.forEach(
secret -> {
if (secret.getName().replaceFirst(PREFIX_REGEX, "").startsWith(prefix)) {
names.add(secret.getName().replaceFirst(PREFIX_REGEX, ""));
}
});
}
return names;
}
}

View File

@@ -1,145 +0,0 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
public class GoogleSecretsManagerConfigPersistence implements ConfigPersistence {
public GoogleSecretsManagerConfigPersistence() {}
public String getVersion() {
return "secrets-v1";
}
// @Override
public void loadData(ConfigPersistence seedPersistence) throws IOException {
loadData(seedPersistence, new HashSet<String>());
}
public void loadData(ConfigPersistence seedPersistence, Set<String> configsInUse) throws IOException {
// Don't need to do anything because the seed persistence only contains
// non-secret configs, which we don't load into the secrets store.
}
/**
* Returns the definition ids for every connector we're storing. Hopefully this can be refactored
* into not existing once we have secrets as coordinates instead of storing the whole config as a
* single secret.
*/
public Set<String> listDefinitionIdsInUseByConnectors() {
Set<String> definitionIds = new HashSet<String>();
try {
List<SourceConnection> sources = listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
for (SourceConnection source : sources) {
definitionIds.add(source.getSourceDefinitionId().toString());
}
List<DestinationConnection> destinations = listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
for (DestinationConnection dest : destinations) {
definitionIds.add(dest.getDestinationDefinitionId().toString());
}
return definitionIds;
} catch (IOException | JsonValidationException io) {
throw new RuntimeException(io);
}
}
// @Override
public Set<String> getRepositoriesFromDefinitionIds(Set<String> usedConnectorDefinitionIds) throws IOException {
throw new UnsupportedOperationException(
"Secrets Manager does not store the list of definitions and thus cannot be used to look up docker repositories.");
}
/**
* Determines the secrets manager key name for storing a particular config
*/
protected <T> String generateKeyNameFromType(AirbyteConfig configType, String configId) {
return String.format("%s-%s-%s-configuration", getVersion(), configType.getIdFieldName(), configId);
}
protected <T> String generateKeyPrefixFromType(AirbyteConfig configType) {
return String.format("%s-%s-", getVersion(), configType.getIdFieldName());
}
@Override
public <T> T getConfig(AirbyteConfig configType, String configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException, IOException {
String keyName = generateKeyNameFromType(configType, configId);
return Jsons.deserialize(GoogleSecretsManager.readSecret(keyName), clazz);
}
@Override
public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws JsonValidationException, IOException {
List<T> configs = new ArrayList<T>();
for (String keyName : GoogleSecretsManager.listSecretsMatching(generateKeyPrefixFromType(configType))) {
configs.add(Jsons.deserialize(GoogleSecretsManager.readSecret(keyName), clazz));
}
return configs;
}
@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) throws JsonValidationException, IOException {
String keyName = generateKeyNameFromType(configType, configId);
System.out.println("keyname " + keyName);
GoogleSecretsManager.saveSecret(keyName, Jsons.serialize(config));
}
@Override
public void deleteConfig(AirbyteConfig configType, String configId) throws ConfigNotFoundException, IOException {
String keyName = generateKeyNameFromType(configType, configId);
GoogleSecretsManager.deleteSecret(keyName);
}
@Override
public void replaceAllConfigs(Map<AirbyteConfig, Stream<?>> configs, boolean dryRun) throws IOException {
if (dryRun) {
for (final Map.Entry<AirbyteConfig, Stream<?>> configuration : configs.entrySet()) {
configuration.getValue().forEach(Jsons::serialize);
}
return;
}
for (final Map.Entry<AirbyteConfig, Stream<?>> configuration : configs.entrySet()) {
AirbyteConfig configType = configuration.getKey();
configuration.getValue().forEach(config -> {
try {
GoogleSecretsManager.saveSecret(generateKeyNameFromType(configType, configType.getId(config)), Jsons.serialize(config));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
}
@Override
public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
final Map<String, Stream<JsonNode>> configs = new HashMap<>();
for (AirbyteConfig ctype : new ConfigSchema[] {ConfigSchema.SOURCE_CONNECTION, ConfigSchema.DESTINATION_CONNECTION}) {
List<String> names = GoogleSecretsManager.listSecretsMatching(generateKeyPrefixFromType(ctype));
final List<JsonNode> configList = new ArrayList<JsonNode>();
for (String name : names) {
configList.add(Jsons.deserialize(GoogleSecretsManager.readSecret(name), JsonNode.class));
}
configs.put(ctype.name(), configList.stream());
}
return configs;
}
}

View File

@@ -0,0 +1,127 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence.split_secrets;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.secretmanager.v1.ProjectName;
import com.google.cloud.secretmanager.v1.Replication;
import com.google.cloud.secretmanager.v1.Secret;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretManagerServiceSettings;
import com.google.cloud.secretmanager.v1.SecretName;
import com.google.cloud.secretmanager.v1.SecretPayload;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import io.airbyte.commons.lang.Exceptions;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.joda.time.Days;
/**
* Uses Google Secret Manager (https://cloud.google.com/secret-manager) as a K/V store to access
* secrets. In the future we will likely want to introduce more granular permission handling here.
*
* It's important to note that we are not making use of the versioning feature of Google Secret
* Manager. This is for a few reasons: 1. There isn't a clean interface for getting the most recent
* version. 2. Version writes must be sequential. This means that if we wanted to move between
* secrets management platforms such as Hashicorp Vault and GSM, we would need to create secrets in
* order (or depending on our retention for the secrets pretend to insert earlier versions).
*/
public class GoogleSecretManagerPersistence implements SecretPersistence {
/**
* The "latest" alias is a magic string that gives you access to the latest secret without
* explicitly specifying the version. For more info see:
* https://cloud.google.com/secret-manager/docs/creating-and-accessing-secrets#access
*/
private static final String LATEST = "latest";
private static final Duration EPHEMERAL_TTL = Duration.newBuilder()
.setSeconds(Days.days(5).toStandardSeconds().getSeconds())
.build();
private static final Replication REPLICATION_POLICY = Replication.newBuilder()
.setAutomatic(Replication.Automatic.newBuilder().build())
.build();
private final String gcpProjectId;
private final Supplier<SecretManagerServiceClient> clientSupplier;
private final @Nullable Duration ttl;
/**
* Creates a persistence with an infinite TTL for stored secrets. Used for source/destination config
* secret storage.
*/
public static GoogleSecretManagerPersistence getLongLived(final String gcpProjectId, final String gcpCredentialsJson) {
return new GoogleSecretManagerPersistence(gcpProjectId, gcpCredentialsJson, null);
}
/**
* Creates a persistence with a relatively short TTL for stored secrets. Used for temporary
* operations such as check/discover operations where we need to use secret storage to communicate
* from the server to Temporal, but where we don't want to maintain the secrets indefinitely.
*/
public static GoogleSecretManagerPersistence getEphemeral(final String gcpProjectId, final String gcpCredentialsJson) {
return new GoogleSecretManagerPersistence(gcpProjectId, gcpCredentialsJson, EPHEMERAL_TTL);
}
private GoogleSecretManagerPersistence(final String gcpProjectId, final String gcpCredentialsJson, final @Nullable Duration ttl) {
this.gcpProjectId = gcpProjectId;
this.clientSupplier = () -> Exceptions.toRuntime(() -> getSecretManagerServiceClient(gcpCredentialsJson));
this.ttl = ttl;
}
@Override
public Optional<String> read(final SecretCoordinate coordinate) {
try (final var client = clientSupplier.get()) {
final var secretVersionName = SecretVersionName.of(gcpProjectId, coordinate.getFullCoordinate(), LATEST);
final var response = client.accessSecretVersion(secretVersionName);
return Optional.of(response.getPayload().getData().toStringUtf8());
} catch (NotFoundException e) {
return Optional.empty();
}
}
@Override
public void write(final SecretCoordinate coordinate, final String payload) {
try (final var client = clientSupplier.get()) {
if (read(coordinate).isEmpty()) {
final var secretBuilder = Secret.newBuilder().setReplication(REPLICATION_POLICY);
if (ttl != null) {
secretBuilder.setTtl(ttl);
}
client.createSecret(ProjectName.of(gcpProjectId), coordinate.getFullCoordinate(), secretBuilder.build());
}
final var name = SecretName.of(gcpProjectId, coordinate.getFullCoordinate());
final var secretPayload = SecretPayload.newBuilder()
.setData(ByteString.copyFromUtf8(payload))
.build();
client.addSecretVersion(name, secretPayload);
}
}
public static SecretManagerServiceClient getSecretManagerServiceClient(String credentialsJson) throws IOException {
final var credentialsByteStream = new ByteArrayInputStream(credentialsJson.getBytes(StandardCharsets.UTF_8));
final var credentials = ServiceAccountCredentials.fromStream(credentialsByteStream);
final var clientSettings = SecretManagerServiceSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.build();
return SecretManagerServiceClient.create(clientSettings);
}
}

View File

@@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence.split_secrets;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.Database;
import java.util.Optional;
/**
* Secrets persistence intended only for local development.
*/
public class LocalTestingSecretPersistence implements SecretPersistence {
private final Database configDatabase;
public LocalTestingSecretPersistence(final Database configDatabase) {
this.configDatabase = configDatabase;
Exceptions.toRuntime(() -> {
this.configDatabase.query(ctx -> {
ctx.execute("CREATE TABLE IF NOT EXISTS secrets ( coordinate TEXT PRIMARY KEY, payload TEXT);");
return null;
});
});
}
@Override
public Optional<String> read(final SecretCoordinate coordinate) {
return Exceptions.toRuntime(() -> this.configDatabase.query(ctx -> {
final var result = ctx.fetch("SELECT payload FROM secrets WHERE coordinate = ?;", coordinate.getFullCoordinate());
if (result.size() == 0) {
return Optional.empty();
} else {
return Optional.of(result.get(0).getValue(0, String.class));
}
}));
}
@Override
public void write(final SecretCoordinate coordinate, final String payload) {
Exceptions.toRuntime(() -> this.configDatabase.query(ctx -> {
ctx.query("INSERT INTO secrets(coordinate,payload) VALUES(?, ?) ON CONFLICT (coordinate) DO UPDATE SET payload = ?;",
coordinate.getFullCoordinate(), payload, payload, coordinate.getFullCoordinate()).execute();
return null;
}));
}
}

View File

@@ -16,12 +16,12 @@ public class MemorySecretPersistence implements SecretPersistence {
final Map<SecretCoordinate, String> secretMap = new HashMap<>();
@Override
public Optional<String> read(SecretCoordinate coordinate) {
public Optional<String> read(final SecretCoordinate coordinate) {
return Optional.ofNullable(secretMap.get(coordinate));
}
@Override
public void write(SecretCoordinate coordinate, String payload) {
public void write(final SecretCoordinate coordinate, final String payload) {
secretMap.put(coordinate, payload);
}

View File

@@ -0,0 +1,19 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence.split_secrets;
import com.fasterxml.jackson.databind.JsonNode;
/**
* No-op hydrator. Used if there is no secrets persistence configured for this Airbyte instance.
*/
public class NoOpSecretsHydrator implements SecretsHydrator {
@Override
public JsonNode hydrate(JsonNode partialConfig) {
return partialConfig;
}
}

View File

@@ -0,0 +1,25 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence.split_secrets;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Adds secrets to a partial config based off a persistence.
*/
public class RealSecretsHydrator implements SecretsHydrator {
private final ReadOnlySecretPersistence readOnlySecretPersistence;
public RealSecretsHydrator(ReadOnlySecretPersistence readOnlySecretPersistence) {
this.readOnlySecretPersistence = readOnlySecretPersistence;
}
@Override
public JsonNode hydrate(JsonNode partialConfig) {
return SecretsHelpers.combineConfig(partialConfig, readOnlySecretPersistence);
}
}

View File

@@ -4,6 +4,10 @@
package io.airbyte.config.persistence.split_secrets;
import io.airbyte.config.Configs;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import java.util.Optional;
/**
@@ -11,10 +15,60 @@ import java.util.Optional;
* are always strings. See {@link SecretCoordinate} for more information on how secrets are
* identified.
*/
public interface SecretPersistence {
public interface SecretPersistence extends ReadOnlySecretPersistence {
Optional<String> read(SecretCoordinate coordinate);
Optional<String> read(final SecretCoordinate coordinate);
void write(SecretCoordinate coordinate, String payload);
void write(final SecretCoordinate coordinate, final String payload) throws IllegalArgumentException;
static Optional<SecretPersistence> getLongLived(final Configs configs) throws IOException {
switch (configs.getSecretPersistenceType()) {
case TESTING_CONFIG_DB_TABLE -> {
final Database configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
return Optional.of(new LocalTestingSecretPersistence(configDatabase));
}
case GOOGLE_SECRET_MANAGER -> {
return Optional.of(GoogleSecretManagerPersistence.getLongLived(configs.getSecretStoreGcpProjectId(), configs.getSecretStoreGcpProjectId()));
}
default -> {
return Optional.empty();
}
}
}
static SecretsHydrator getSecretsHydrator(final Configs configs) throws IOException {
final var persistence = getLongLived(configs);
if (persistence.isPresent()) {
return new RealSecretsHydrator(persistence.get());
} else {
return new NoOpSecretsHydrator();
}
}
static Optional<SecretPersistence> getEphemeral(final Configs configs) throws IOException {
switch (configs.getSecretPersistenceType()) {
case TESTING_CONFIG_DB_TABLE -> {
final Database configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
return Optional.of(new LocalTestingSecretPersistence(configDatabase));
}
case GOOGLE_SECRET_MANAGER -> {
return Optional.of(GoogleSecretManagerPersistence.getEphemeral(configs.getSecretStoreGcpProjectId(), configs.getSecretStoreGcpProjectId()));
}
default -> {
return Optional.empty();
}
}
}
}

View File

@@ -26,7 +26,7 @@ import javax.annotation.Nullable;
* These are the three main helpers provided by this class:
* {@link SecretsHelpers#splitConfig(UUID, JsonNode, ConnectorSpecification)}
* {@link SecretsHelpers#splitAndUpdateConfig(UUID, JsonNode, JsonNode, ConnectorSpecification, ReadOnlySecretPersistence)}
* {@link SecretsHelpers#combineConfig(JsonNode, SecretPersistence)}
* {@link SecretsHelpers#combineConfig(JsonNode, ReadOnlySecretPersistence)}
*
* Here's an overview on some terminology used in this class:
*
@@ -99,7 +99,7 @@ public class SecretsHelpers {
* @param secretPersistence secret storage mechanism
* @return full config including actual secret values
*/
public static JsonNode combineConfig(final JsonNode partialConfig, final SecretPersistence secretPersistence) {
public static JsonNode combineConfig(final JsonNode partialConfig, final ReadOnlySecretPersistence secretPersistence) {
final var config = partialConfig.deepCopy();
// if the entire config is a secret coordinate object
@@ -328,7 +328,7 @@ public class SecretsHelpers {
* @throws RuntimeException when a secret at that coordinate is not available in the persistence
* @return a json text node containing the secret value
*/
private static TextNode getOrThrowSecretValueNode(final SecretPersistence secretPersistence, final SecretCoordinate coordinate) {
private static TextNode getOrThrowSecretValueNode(final ReadOnlySecretPersistence secretPersistence, final SecretCoordinate coordinate) {
final var secretValue = secretPersistence.read(coordinate);
if (secretValue.isEmpty()) {

View File

@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence.split_secrets;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Adds secrets to a partial config.
*/
public interface SecretsHydrator {
/**
* Adds secrets to a partial config.
*
* @param partialConfig partial config (without secrets)
* @return full config with secrets
*/
JsonNode hydrate(JsonNode partialConfig);
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.secretmanager.v1.SecretName;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.persistence.split_secrets.GoogleSecretManagerPersistence;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import java.io.IOException;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Triggered as part of integration tests in CI. It uses credentials in Github to connect to the
* integration testing GCP project.
*/
public class GoogleSecretManagerPersistenceIntegrationTest {
private GoogleSecretManagerPersistence persistence;
private String baseCoordinate;
@BeforeEach
void setUp() {
final var configs = new EnvConfigs();
persistence = GoogleSecretManagerPersistence.getEphemeral(
configs.getSecretStoreGcpProjectId(),
configs.getSecretStoreGcpCredentials());
baseCoordinate = "GoogleSecretManagerPersistenceIntegrationTest_coordinate_" + RandomUtils.nextInt() % 20000;
}
@AfterEach
void tearDown() throws IOException {
final var configs = new EnvConfigs();
try (final var client = GoogleSecretManagerPersistence.getSecretManagerServiceClient(configs.getSecretStoreGcpCredentials())) {
// try to delete this so we aren't charged for the secret
// if this is missed due to some sort of failure the secret will be deleted after the ttl
try {
client.deleteSecret(SecretName.of(
configs.getSecretStoreGcpProjectId(),
baseCoordinate));
} catch (NotFoundException nfe) {
// do nothing
}
}
}
@Test
void testReadWriteUpdate() {
final var coordinate1 = new SecretCoordinate(baseCoordinate, 1);
// try reading non-existent value
final var firstRead = persistence.read(coordinate1);
assertTrue(firstRead.isEmpty());
// write
final var firstPayload = "abc";
persistence.write(coordinate1, firstPayload);
final var secondRead = persistence.read(coordinate1);
assertTrue(secondRead.isPresent());
assertEquals(firstPayload, secondRead.get());
// update
final var secondPayload = "def";
final var coordinate2 = new SecretCoordinate(baseCoordinate, 2);
persistence.write(coordinate2, secondPayload);
final var thirdRead = persistence.read(coordinate2);
assertTrue(thirdRead.isPresent());
assertEquals(secondPayload, thirdRead.get());
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.config.persistence;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.Sets;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.UUID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class GoogleSecretsManagerConfigPersistenceTest {
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private GoogleSecretsManagerConfigPersistence configPersistence;
public static final UUID UUID_1 = new UUID(0, 1);
public static final StandardSourceDefinition SOURCE_1 = new StandardSourceDefinition();
static {
SOURCE_1.withSourceDefinitionId(UUID_1).withName("postgresql");
}
public static final UUID UUID_2 = new UUID(0, 2);
public static final StandardSourceDefinition SOURCE_2 = new StandardSourceDefinition();
static {
SOURCE_2.withSourceDefinitionId(UUID_2).withName("apache storm");
}
@BeforeEach
void setUp() throws IOException {
configPersistence = new GoogleSecretsManagerConfigPersistence();
}
@Test
void testReadWriteConfig() throws IOException, JsonValidationException, ConfigNotFoundException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1);
Assertions.assertEquals(SOURCE_1,
configPersistence.getConfig(
ConfigSchema.STANDARD_SOURCE_DEFINITION,
UUID_1.toString(),
StandardSourceDefinition.class));
}
@Test
void testListConfigs() throws JsonValidationException, IOException {
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_1.toString(), SOURCE_1);
configPersistence.writeConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, UUID_2.toString(), SOURCE_2);
Assertions.assertEquals(
Sets.newHashSet(SOURCE_1, SOURCE_2),
Sets.newHashSet(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)));
}
private void assertRecordCount(int expectedCount) throws Exception {
// Result<Record1<Integer>> recordCount = database.query(ctx ->
// ctx.select(count(asterisk())).from(table("airbyte_configs")).fetch());
assertEquals(expectedCount, 999);// TODO: Fix // recordCount.get(0).value1());
}
private void assertHasSource(StandardSourceDefinition source) throws Exception {
Assertions.assertEquals(source, configPersistence
.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, source.getSourceDefinitionId().toString(),
StandardSourceDefinition.class));
}
private void assertHasDestination(StandardDestinationDefinition destination) throws Exception {
Assertions.assertEquals(destination, configPersistence
.getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, destination.getDestinationDefinitionId().toString(),
StandardDestinationDefinition.class));
}
}

View File

@@ -10,8 +10,11 @@ import static org.mockito.Mockito.when;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -26,7 +29,9 @@ class ConfigRepositoryTest {
@BeforeEach
void setup() {
configPersistence = mock(ConfigPersistence.class);
configRepository = new ConfigRepository(configPersistence);
final var secretPersistence = new MemorySecretPersistence();
configRepository =
new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence));
}
@Test

View File

@@ -19,6 +19,8 @@ import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
@@ -36,6 +38,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -199,7 +202,10 @@ public class SchedulerApp {
configs.getConfigDatabaseUrl())
.getInitialized();
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation();
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);
final JobCleaner jobCleaner = new JobCleaner(
configs.getWorkspaceRetentionConfig(),
workspaceRoot,

View File

@@ -4,6 +4,7 @@ plugins {
dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:models')

View File

@@ -22,6 +22,7 @@ import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.Job;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -83,7 +85,7 @@ class WorkspaceHelperTest {
public void setup() throws IOException {
tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5));
configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir));
configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir), new NoOpSecretsHydrator(), Optional.empty(), Optional.empty());
jobPersistence = mock(JobPersistence.class);
workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

View File

@@ -11,10 +11,12 @@ import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +40,10 @@ public class SecretsMigration {
public void run() throws IOException {
LOGGER.info("Starting migration run.");
final ConfigRepository readFromConfigRepository = new ConfigRepository(readFromPersistence);
final ConfigRepository writeToConfigRepository = new ConfigRepository(writeToPersistence);
final ConfigRepository readFromConfigRepository =
new ConfigRepository(readFromPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty());
final ConfigRepository writeToConfigRepository =
new ConfigRepository(writeToPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty());
LOGGER.info("... Dry Run: deserializing configurations and writing to the new store...");
Map<String, Stream<JsonNode>> configurations = readFromConfigRepository.dumpConfigs();

View File

@@ -162,7 +162,7 @@ public class ConfigDumpExporter {
final Collection<SourceConnection> sourceConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.SOURCE_CONNECTION.name(),
configRepository::listSourceConnection,
configRepository::listSourceConnectionWithSecrets,
(sourceConnection) -> workspaceId.equals(sourceConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
() -> listSourceDefinition(sourceConnections),
@@ -171,7 +171,7 @@ public class ConfigDumpExporter {
final Collection<DestinationConnection> destinationConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.DESTINATION_CONNECTION.name(),
configRepository::listDestinationConnection,
configRepository::listDestinationConnectionWithSecrets,
(destinationConnection) -> workspaceId.equals(destinationConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(),
() -> listDestinationDefinition(destinationConnections),

View File

@@ -6,6 +6,7 @@ package io.airbyte.server;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
@@ -17,6 +18,8 @@ import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
@@ -166,7 +169,13 @@ public class ServerApp implements ServerRunnable {
.getAndInitialize();
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.migrateFileConfigs(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation());
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence.withValidation(), secretsHydrator, secretPersistence, ephemeralSecretPersistence);
LOGGER.info("Creating Scheduler persistence...");
final Database jobDatabase = new JobsDatabaseInstance(
@@ -208,6 +217,9 @@ public class ServerApp implements ServerRunnable {
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);
// required before migration
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.execute(dockerImage)));
Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;

View File

@@ -54,7 +54,8 @@ public interface ServerFactory {
configs,
new FileTtlManager(10, TimeUnit.MINUTES, 10),
MDC.getCopyOfContextMap(),
configsDatabase, jobsDatabase);
configsDatabase,
jobsDatabase);
// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class);

View File

@@ -38,7 +38,7 @@ public class ConfigurationUpdate {
public SourceConnection source(UUID sourceId, String sourceName, JsonNode newConfiguration)
throws ConfigNotFoundException, IOException, JsonValidationException {
// get existing source
final SourceConnection persistedSource = configRepository.getSourceConnection(sourceId);
final SourceConnection persistedSource = configRepository.getSourceConnectionWithSecrets(sourceId);
persistedSource.setName(sourceName);
// get spec
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(persistedSource.getSourceDefinitionId());
@@ -56,7 +56,7 @@ public class ConfigurationUpdate {
public DestinationConnection destination(UUID destinationId, String destName, JsonNode newConfiguration)
throws ConfigNotFoundException, IOException, JsonValidationException {
// get existing destination
final DestinationConnection persistedDestination = configRepository.getDestinationConnection(destinationId);
final DestinationConnection persistedDestination = configRepository.getDestinationConnectionWithSecrets(destinationId);
persistedDestination.setName(destName);
// get spec
final StandardDestinationDefinition destinationDefinition = configRepository

View File

@@ -15,6 +15,7 @@ import io.airbyte.api.model.DestinationReadList;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.StandardDestinationDefinition;
@@ -117,13 +118,15 @@ public class DestinationHandler {
connectionsHandler.deleteConnection(connectionRead);
}
final var fullConfig = configRepository.getDestinationConnectionWithSecrets(destination.getDestinationId()).getConfiguration();
// persist
persistDestinationConnection(
destination.getName(),
destination.getDestinationDefinitionId(),
destination.getWorkspaceId(),
destination.getDestinationId(),
destination.getConnectionConfiguration(),
fullConfig,
true);
}
@@ -222,7 +225,7 @@ public class DestinationHandler {
throws ConfigNotFoundException, IOException, JsonValidationException {
// remove secrets from config before returning the read
final DestinationConnection dci = configRepository.getDestinationConnection(destinationId);
final DestinationConnection dci = Jsons.clone(configRepository.getDestinationConnection(destinationId));
dci.setConfiguration(secretsProcessor.maskSecrets(dci.getConfiguration(), spec.getConnectionSpecification()));
final StandardDestinationDefinition standardDestinationDefinition =

View File

@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
public class SchedulerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private static final UUID NO_WORKSPACE = UUID.fromString("00000000-0000-0000-0000-000000000000");
private final ConfigRepository configRepository;
private final SchedulerJobClient schedulerJobClient;
@@ -136,11 +137,16 @@ public class SchedulerHandler {
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceConfig.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
sourceConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceConfig.getSourceDefinitionId())
.withConfiguration(sourceConfig.getConnectionConfiguration());
.withConfiguration(partialConfig);
return reportConnectionStatus(synchronousSchedulerClient.createSourceCheckConnectionJob(source, imageName));
}
@@ -172,11 +178,17 @@ public class SchedulerHandler {
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(destinationConfig.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());
final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
destinationConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final DestinationConnection destination = new DestinationConnection()
.withDestinationDefinitionId(destinationConfig.getDestinationDefinitionId())
.withConfiguration(destinationConfig.getConnectionConfiguration());
.withConfiguration(partialConfig);
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
}

View File

@@ -170,6 +170,9 @@ public class SourceHandler {
final ConnectorSpecification spec = getSpecFromSourceId(source.getSourceId());
validateSource(spec, source.getConnectionConfiguration());
final var fullConfig = configRepository.getSourceConnectionWithSecrets(source.getSourceId()).getConfiguration();
validateSource(spec, fullConfig);
// persist
persistSourceConnection(
source.getName(),
@@ -177,7 +180,7 @@ public class SourceHandler {
source.getWorkspaceId(),
source.getSourceId(),
true,
source.getConnectionConfiguration(),
fullConfig,
spec);
}

View File

@@ -143,12 +143,12 @@ class ConfigDumpImporterTest {
@Test
public void testImportIntoWorkspaceWithConflicts() throws JsonValidationException, ConfigNotFoundException, IOException {
when(configRepository.listSourceConnection())
when(configRepository.listSourceConnectionWithSecrets())
.thenReturn(List.of(sourceConnection,
new SourceConnection()
.withSourceId(UUID.randomUUID())
.withWorkspaceId(UUID.randomUUID())));
when(configRepository.listDestinationConnection())
when(configRepository.listDestinationConnectionWithSecrets())
.thenReturn(List.of(destinationConnection,
new DestinationConnection()
.withDestinationId(UUID.randomUUID())
@@ -179,7 +179,7 @@ class ConfigDumpImporterTest {
@Test
public void testImportIntoWorkspaceWithoutConflicts() throws JsonValidationException, ConfigNotFoundException, IOException {
when(configRepository.listSourceConnection())
when(configRepository.listSourceConnectionWithSecrets())
// First called for export
.thenReturn(List.of(sourceConnection,
new SourceConnection()
@@ -189,7 +189,7 @@ class ConfigDumpImporterTest {
.thenReturn(List.of(new SourceConnection()
.withSourceId(UUID.randomUUID())
.withWorkspaceId(UUID.randomUUID())));
when(configRepository.listDestinationConnection())
when(configRepository.listDestinationConnectionWithSecrets())
// First called for export
.thenReturn(List.of(destinationConnection,
new DestinationConnection()

View File

@@ -86,7 +86,7 @@ class ConfigurationUpdateTest {
@Test
void testSourceUpdate() throws JsonValidationException, IOException, ConfigNotFoundException {
when(configRepository.getSourceConnection(UUID1)).thenReturn(ORIGINAL_SOURCE_CONNECTION);
when(configRepository.getSourceConnectionWithSecrets(UUID1)).thenReturn(ORIGINAL_SOURCE_CONNECTION);
when(configRepository.getStandardSourceDefinition(UUID2)).thenReturn(SOURCE_DEFINITION);
when(specFetcher.execute(IMAGE_NAME)).thenReturn(CONNECTOR_SPECIFICATION);
when(secretsProcessor.copySecrets(ORIGINAL_CONFIGURATION, NEW_CONFIGURATION, SPEC)).thenReturn(NEW_CONFIGURATION);
@@ -98,7 +98,7 @@ class ConfigurationUpdateTest {
@Test
void testDestinationUpdate() throws JsonValidationException, IOException, ConfigNotFoundException {
when(configRepository.getDestinationConnection(UUID1)).thenReturn(ORIGINAL_DESTINATION_CONNECTION);
when(configRepository.getDestinationConnectionWithSecrets(UUID1)).thenReturn(ORIGINAL_DESTINATION_CONNECTION);
when(configRepository.getStandardDestinationDefinition(UUID2)).thenReturn(DESTINATION_DEFINITION);
when(specFetcher.execute(IMAGE_NAME)).thenReturn(CONNECTOR_SPECIFICATION);
when(secretsProcessor.copySecrets(ORIGINAL_CONFIGURATION, NEW_CONFIGURATION, SPEC)).thenReturn(NEW_CONFIGURATION);

View File

@@ -29,6 +29,7 @@ import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
@@ -44,6 +45,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -107,7 +109,7 @@ public class ArchiveHandlerTest {
configPersistence = new DatabaseConfigPersistence(database);
configPersistence.replaceAllConfigs(Collections.emptyMap(), false);
configPersistence.loadData(seedPersistence);
configRepository = new ConfigRepository(configPersistence);
configRepository = new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty());
jobPersistence.setVersion(VERSION);
@@ -239,7 +241,7 @@ public class ArchiveHandlerTest {
.workspaceId(secondWorkspaceId));
assertEquals(StatusEnum.SUCCEEDED, secondImportResult.getStatus());
final UUID secondSourceId = configRepository.listSourceConnection()
final UUID secondSourceId = configRepository.listSourceConnectionWithSecrets()
.stream()
.filter(sourceConnection -> secondWorkspaceId.equals(sourceConnection.getWorkspaceId()))
.map(SourceConnection::getSourceId)

View File

@@ -150,6 +150,9 @@ class DestinationHandlerTest {
final ConnectionReadList connectionReadList = new ConnectionReadList().connections(Collections.singletonList(connectionRead));
final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody().workspaceId(destinationConnection.getWorkspaceId());
when(configRepository.getDestinationConnectionWithSecrets(destinationConnection.getDestinationId()))
.thenReturn(destinationConnection)
.thenReturn(expectedDestinationConnection);
when(configRepository.getDestinationConnection(destinationConnection.getDestinationId()))
.thenReturn(destinationConnection)
.thenReturn(expectedDestinationConnection);

View File

@@ -9,6 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -189,6 +191,9 @@ class SchedulerHandlerTest {
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withSourceDefinitionId(source.getSourceDefinitionId()));
when(configRepository.statefulSplitEphemeralSecrets(
eq(source.getConfiguration()),
any())).thenReturn(source.getConfiguration());
when(synchronousSchedulerClient.createSourceCheckConnectionJob(source, SOURCE_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
@@ -217,7 +222,9 @@ class SchedulerHandlerTest {
.withConfiguration(source.getConfiguration());
when(synchronousSchedulerClient.createSourceCheckConnectionJob(submittedSource, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(configRepository.statefulSplitEphemeralSecrets(
eq(source.getConfiguration()),
any())).thenReturn(source.getConfiguration());
schedulerHandler.checkSourceConnectionFromSourceIdForUpdate(sourceUpdate);
verify(jsonSchemaValidator).ensure(CONNECTION_SPECIFICATION.getConnectionSpecification(), source.getConfiguration());
@@ -313,7 +320,9 @@ class SchedulerHandlerTest {
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(configRepository.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationCreate(destinationCoreConfig);
verify(synchronousSchedulerClient).createDestinationCheckConnectionJob(destination, DESTINATION_DOCKER_IMAGE);
@@ -340,7 +349,9 @@ class SchedulerHandlerTest {
.withConfiguration(destination.getConfiguration());
when(synchronousSchedulerClient.createDestinationCheckConnectionJob(submittedDestination, DESTINATION_DOCKER_IMAGE))
.thenReturn((SynchronousResponse<StandardCheckConnectionOutput>) jobResponse);
when(configRepository.statefulSplitEphemeralSecrets(
eq(destination.getConfiguration()),
any())).thenReturn(destination.getConfiguration());
schedulerHandler.checkDestinationConnectionFromDestinationIdForUpdate(destinationUpdate);
verify(jsonSchemaValidator).ensure(CONNECTION_SPECIFICATION.getConnectionSpecification(), destination.getConfiguration());

View File

@@ -231,6 +231,9 @@ class SourceHandlerTest {
when(configRepository.getSourceConnection(sourceConnection.getSourceId()))
.thenReturn(sourceConnection)
.thenReturn(expectedSourceConnection);
when(configRepository.getSourceConnectionWithSecrets(sourceConnection.getSourceId()))
.thenReturn(sourceConnection)
.thenReturn(expectedSourceConnection);
when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId()))
.thenReturn(standardSourceDefinition);
when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition);

View File

@@ -27,6 +27,9 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.Database;
import io.airbyte.migrate.Migrations;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
@@ -58,10 +61,12 @@ public class RunMigrationTest {
private static final String DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED = "d2147be5-fa36-4936-977e-f031affa5895";
private static final String DEPRECATED_SOURCE_DEFINITION_BEING_USED = "4eb22946-2a79-4d20-a3e6-effd234613c3";
private List<File> resourceToBeCleanedUp;
private SecretPersistence secretPersistence;
@BeforeEach
public void setup() {
resourceToBeCleanedUp = new ArrayList<>();
secretPersistence = new MemorySecretPersistence();
}
@AfterEach
@@ -105,7 +110,9 @@ public class RunMigrationTest {
private void assertPreMigrationConfigs(final Path configRoot, final JobPersistence jobPersistence) throws Exception {
assertDatabaseVersion(jobPersistence, INITIAL_VERSION);
final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot));
final ConfigRepository configRepository =
new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence),
Optional.of(secretPersistence));
final Map<String, StandardSourceDefinition> sourceDefinitionsBeforeMigration = configRepository.listStandardSources().stream()
.collect(Collectors.toMap(c -> c.getSourceDefinitionId().toString(), c -> c));
assertTrue(sourceDefinitionsBeforeMigration.containsKey(DEPRECATED_SOURCE_DEFINITION_NOT_BEING_USED));
@@ -119,7 +126,9 @@ public class RunMigrationTest {
}
private void assertPostMigrationConfigs(final Path importRoot) throws Exception {
final ConfigRepository configRepository = new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot));
final ConfigRepository configRepository =
new ConfigRepository(FileSystemConfigPersistence.createWithValidation(importRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence),
Optional.of(secretPersistence));
final UUID workspaceId = configRepository.listStandardWorkspaces(true).get(0).getWorkspaceId();
// originally the default workspace started with a hardcoded id. the migration in version 0.29.0
// took that id and randomized it. we want to check that the id is now NOT that hardcoded id and
@@ -288,7 +297,8 @@ public class RunMigrationTest {
private void runMigration(final JobPersistence jobPersistence, final Path configRoot) throws Exception {
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot)),
new ConfigRepository(FileSystemConfigPersistence.createWithValidation(configRoot), new NoOpSecretsHydrator(), Optional.of(secretPersistence),
Optional.of(secretPersistence)),
TARGET_VERSION,
YamlSeedConfigPersistence.getDefault(),
mock(SpecFetcher.class) // this test was disabled/broken when this fetcher mock was added. apologies if you have to fix this

View File

@@ -135,6 +135,8 @@ public class AcceptanceTests {
private static final boolean IS_KUBE = System.getenv().containsKey("KUBE");
private static final boolean IS_MINIKUBE = System.getenv().containsKey("IS_MINIKUBE");
private static final boolean IS_GKE = System.getenv().containsKey("IS_GKE");
private static final boolean USE_EXTERNAL_DEPLOYMENT =
System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_";
private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}";
@@ -167,6 +169,7 @@ public class AcceptanceTests {
@SuppressWarnings("UnstableApiUsage")
@BeforeAll
public static void init() throws URISyntaxException, IOException, InterruptedException {
System.out.println("in init");
if (IS_GKE && !IS_KUBE) {
throw new RuntimeException("KUBE Flag should also be enabled if GKE flag is enabled");
}
@@ -178,7 +181,7 @@ public class AcceptanceTests {
}
// by default use airbyte deployment governed by a test container.
if (System.getenv("USE_EXTERNAL_DEPLOYMENT") == null || !System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true")) {
if (!USE_EXTERNAL_DEPLOYMENT) {
LOGGER.info("Using deployment of airbyte managed by test containers.");
airbyteTestContainer = new AirbyteTestContainer.Builder(new File(Resources.getResource(DOCKER_COMPOSE_FILE_NAME).toURI()))
.setEnv(ENV_FILE)

View File

@@ -25,6 +25,7 @@ dependencies {
implementation project(':airbyte-api')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')

View File

@@ -8,6 +8,8 @@ import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.MaxWorkersConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
@@ -43,15 +45,18 @@ public class WorkerApp {
private final Path workspaceRoot;
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final WorkflowServiceStubs temporalService;
private final MaxWorkersConfig maxWorkers;
public WorkerApp(Path workspaceRoot,
ProcessFactory processFactory,
SecretsHydrator secretsHydrator,
WorkflowServiceStubs temporalService,
MaxWorkersConfig maxWorkers) {
this.workspaceRoot = workspaceRoot;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.temporalService = temporalService;
this.maxWorkers = maxWorkers;
}
@@ -77,18 +82,20 @@ public class WorkerApp {
final Worker checkConnectionWorker =
factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers()));
checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class);
checkConnectionWorker.registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, workspaceRoot));
checkConnectionWorker
.registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot));
final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers()));
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class);
discoverWorker.registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, workspaceRoot));
discoverWorker
.registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, secretsHydrator, workspaceRoot));
final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class);
syncWorker.registerActivitiesImplementations(
new SyncWorkflow.ReplicationActivityImpl(processFactory, workspaceRoot),
new SyncWorkflow.NormalizationActivityImpl(processFactory, workspaceRoot),
new SyncWorkflow.DbtTransformationActivityImpl(processFactory, workspaceRoot));
new SyncWorkflow.ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot),
new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot),
new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot));
factory.start();
}
@@ -110,7 +117,7 @@ public class WorkerApp {
}
}
private static final WorkerOptions getWorkerOptions(int max) {
private static WorkerOptions getWorkerOptions(int max) {
return WorkerOptions.newBuilder()
.setMaxConcurrentActivityExecutionSize(max)
.build();
@@ -127,11 +134,13 @@ public class WorkerApp {
final String temporalHost = configs.getTemporalHost();
LOGGER.info("temporalHost = " + temporalHost);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final ProcessFactory processFactory = getProcessBuilderFactory(configs);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
new WorkerApp(workspaceRoot, processFactory, temporalService, configs.getMaxWorkers()).start();
new WorkerApp(workspaceRoot, processFactory, secretsHydrator, temporalService, configs.getMaxWorkers()).start();
}
}

View File

@@ -4,9 +4,11 @@
package io.airbyte.workers.temporal;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultCheckConnectionWorker;
@@ -63,10 +65,12 @@ public interface CheckConnectionWorkflow {
class CheckConnectionActivityImpl implements CheckConnectionActivity {
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
public CheckConnectionActivityImpl(ProcessFactory processFactory, Path workspaceRoot) {
public CheckConnectionActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
}
@@ -74,7 +78,12 @@ public interface CheckConnectionWorkflow {
IntegrationLauncherConfig launcherConfig,
StandardCheckConnectionInput connectionConfiguration) {
final Supplier<StandardCheckConnectionInput> inputSupplier = () -> connectionConfiguration;
final JsonNode fullConfig = secretsHydrator.hydrate(connectionConfiguration.getConnectionConfiguration());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withConnectionConfiguration(fullConfig);
final Supplier<StandardCheckConnectionInput> inputSupplier = () -> input;
final TemporalAttemptExecution<StandardCheckConnectionInput, StandardCheckConnectionOutput> temporalAttemptExecution =
new TemporalAttemptExecution<>(

View File

@@ -4,8 +4,10 @@
package io.airbyte.workers.temporal;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
@@ -65,17 +67,25 @@ public interface DiscoverCatalogWorkflow {
class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity {
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
public DiscoverCatalogActivityImpl(ProcessFactory processFactory, Path workspaceRoot) {
public DiscoverCatalogActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
}
public AirbyteCatalog run(JobRunConfig jobRunConfig,
IntegrationLauncherConfig launcherConfig,
StandardDiscoverCatalogInput config) {
final Supplier<StandardDiscoverCatalogInput> inputSupplier = () -> config;
final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration());
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(fullConfig);
final Supplier<StandardDiscoverCatalogInput> inputSupplier = () -> input;
final TemporalAttemptExecution<StandardDiscoverCatalogInput, AirbyteCatalog> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot,

View File

@@ -19,6 +19,7 @@ import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
@@ -126,16 +127,21 @@ public interface SyncWorkflow {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationActivityImpl.class);
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
public ReplicationActivityImpl(ProcessFactory processFactory, Path workspaceRoot) {
this(processFactory, workspaceRoot, new AirbyteConfigValidator());
public ReplicationActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) {
this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator());
}
@VisibleForTesting
ReplicationActivityImpl(ProcessFactory processFactory, Path workspaceRoot, AirbyteConfigValidator validator) {
ReplicationActivityImpl(ProcessFactory processFactory,
SecretsHydrator secretsHydrator,
Path workspaceRoot,
AirbyteConfigValidator validator) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = validator;
}
@@ -146,9 +152,16 @@ public interface SyncWorkflow {
IntegrationLauncherConfig destinationLauncherConfig,
StandardSyncInput syncInput) {
final var fullSourceConfig = secretsHydrator.hydrate(syncInput.getSourceConfiguration());
final var fullDestinationConfig = secretsHydrator.hydrate(syncInput.getDestinationConfiguration());
final var fullSyncInput = Jsons.clone(syncInput)
.withSourceConfiguration(fullSourceConfig)
.withDestinationConfiguration(fullDestinationConfig);
final Supplier<StandardSyncInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.STANDARD_SYNC_INPUT, Jsons.jsonNode(syncInput));
return syncInput;
validator.ensureAsRuntime(ConfigSchema.STANDARD_SYNC_INPUT, Jsons.jsonNode(fullSyncInput));
return fullSyncInput;
};
final TemporalAttemptExecution<StandardSyncInput, ReplicationOutput> temporalAttempt = new TemporalAttemptExecution<>(
@@ -237,16 +250,21 @@ public interface SyncWorkflow {
private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationActivityImpl.class);
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
public NormalizationActivityImpl(ProcessFactory processFactory, Path workspaceRoot) {
this(processFactory, workspaceRoot, new AirbyteConfigValidator());
public NormalizationActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) {
this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator());
}
@VisibleForTesting
NormalizationActivityImpl(ProcessFactory processFactory, Path workspaceRoot, AirbyteConfigValidator validator) {
NormalizationActivityImpl(ProcessFactory processFactory,
SecretsHydrator secretsHydrator,
Path workspaceRoot,
AirbyteConfigValidator validator) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = validator;
}
@@ -256,9 +274,12 @@ public interface SyncWorkflow {
IntegrationLauncherConfig destinationLauncherConfig,
NormalizationInput input) {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
final Supplier<NormalizationInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(input));
return input;
validator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};
final TemporalAttemptExecution<NormalizationInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
@@ -299,16 +320,21 @@ public interface SyncWorkflow {
private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationActivityImpl.class);
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
public DbtTransformationActivityImpl(ProcessFactory processFactory, Path workspaceRoot) {
this(processFactory, workspaceRoot, new AirbyteConfigValidator());
public DbtTransformationActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) {
this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator());
}
@VisibleForTesting
DbtTransformationActivityImpl(ProcessFactory processFactory, Path workspaceRoot, AirbyteConfigValidator validator) {
DbtTransformationActivityImpl(ProcessFactory processFactory,
SecretsHydrator secretsHydrator,
Path workspaceRoot,
AirbyteConfigValidator validator) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = validator;
}
@@ -319,9 +345,12 @@ public interface SyncWorkflow {
ResourceRequirements resourceRequirements,
OperatorDbtInput input) {
final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);
final Supplier<OperatorDbtInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.OPERATOR_DBT_INPUT, Jsons.jsonNode(input));
return input;
validator.ensureAsRuntime(ConfigSchema.OPERATOR_DBT_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};
final TemporalAttemptExecution<OperatorDbtInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(

View File

@@ -66,6 +66,7 @@ services:
- MAX_SYNC_JOB_ATTEMPTS=${MAX_SYNC_JOB_ATTEMPTS}
- MAX_SYNC_TIMEOUT_DAYS=${MAX_SYNC_TIMEOUT_DAYS}
- INTERNAL_API_HOST=${INTERNAL_API_HOST}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
volumes:
- workspace:${WORKSPACE_ROOT}
- ${LOCAL_ROOT}:${LOCAL_ROOT}
@@ -108,6 +109,7 @@ services:
- MAX_SYNC_JOB_ATTEMPTS=${MAX_SYNC_JOB_ATTEMPTS}
- MAX_SYNC_TIMEOUT_DAYS=${MAX_SYNC_TIMEOUT_DAYS}
- INTERNAL_API_HOST=${INTERNAL_API_HOST}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
@@ -143,6 +145,7 @@ services:
- RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT}
- RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST}
- RESOURCE_MEMORY_LIMIT=${RESOURCE_MEMORY_LIMIT}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
ports:
- 8001:8001
volumes:

View File

@@ -21,4 +21,4 @@ echo "Running config persistence integration tests..."
SUB_BUILD=PLATFORM USE_EXTERNAL_DEPLOYMENT=true \
SECRET_STORE_GCP_CREDENTIALS=${SECRET_STORE_GCP_CREDENTIALS} \
SECRET_STORE_GCP_PROJECT_ID=${SECRET_STORE_GCP_PROJECT_ID} \
SECRET_STORE_FOR_CONFIGS=${SECRET_STORE_FOR_CONFIGS} ./gradlew :airbyte-config:persistence:integrationTest --rerun-tasks --scan
./gradlew :airbyte-config:persistence:integrationTest --rerun-tasks --scan