1
0
mirror of synced 2026-01-06 06:04:16 -05:00

Migrate secret from a non secret store to a secret store (#12088)

Introduce a migration to a secret manager

If a secret manager is specify, it will go though all the config, save the secret in the configured secret store. If the secret is already in a store, it will not migrate the secret to the secret store.
This commit is contained in:
Benoit Moriceau
2022-05-04 16:40:33 -07:00
committed by GitHub
parent 3c1eab3e27
commit 07359ffd77
11 changed files with 651 additions and 61 deletions

1
.env
View File

@@ -93,3 +93,4 @@ MAX_DISCOVER_WORKERS=5
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
EXPOSE_SECRETS_IN_EXPORT=false
MIGRATE_SECRET_STORE=false

View File

@@ -10,8 +10,8 @@ dependencies {
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db:lib')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:persistence')
implementation project(':airbyte-scheduler:models')
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation "org.flywaydb:flyway-core:7.14.0"

View File

@@ -17,20 +17,16 @@ import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
@@ -55,13 +51,15 @@ public class BootloaderApp {
private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha");
private final Configs configs;
private Runnable postLoadExecution;
private final Runnable postLoadExecution;
private final FeatureFlags featureFlags;
private final SecretMigrator secretMigrator;
private ConfigPersistence configPersistence;
private Database configDatabase;
@VisibleForTesting
public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) {
this.configs = configs;
this.featureFlags = featureFlags;
public BootloaderApp(final Configs configs, final FeatureFlags featureFlags, final SecretMigrator secretMigrator) {
this(configs, () -> {}, featureFlags, secretMigrator);
}
/**
@@ -72,41 +70,54 @@ public class BootloaderApp {
* @param configs
* @param postLoadExecution
*/
public BootloaderApp(final Configs configs, final Runnable postLoadExecution, final FeatureFlags featureFlags) {
public BootloaderApp(final Configs configs,
final Runnable postLoadExecution,
final FeatureFlags featureFlags,
final SecretMigrator secretMigrator) {
this.configs = configs;
this.postLoadExecution = postLoadExecution;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;
initPersistences();
}
public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) {
this.configs = configs;
this.featureFlags = featureFlags;
try {
initPersistences();
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
secretMigrator = new SecretMigrator(configPersistence, secretPersistence);
postLoadExecution = () -> {
try {
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
if (featureFlags.runSecretMigration()) {
secretMigrator.migrateSecrets();
}
LOGGER.info("Loaded seed data..");
} catch (final IOException | JsonValidationException e) {
throw new RuntimeException(e);
}
};
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
public BootloaderApp() {
configs = new EnvConfigs();
featureFlags = new EnvVariableFeatureFlags();
postLoadExecution = () -> {
try {
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(true)
.build();
final ConfigPersistence configPersistence =
DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
configPersistence.loadData(YamlSeedConfigPersistence.getDefault());
LOGGER.info("Loaded seed data..");
} catch (final IOException e) {
e.printStackTrace();
}
};
this(new EnvConfigs(), new EnvVariableFeatureFlags());
}
public void load() throws Exception {
LOGGER.info("Setting up config database and default workspace..");
try (
final Database configDatabase =
new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl())
.getAndInitialize();
final Database jobDatabase =
new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) {
LOGGER.info("Created initial jobs and configs database...");
@@ -118,11 +129,6 @@ public class BootloaderApp {
runFlywayMigration(configs, configDatabase, jobDatabase);
LOGGER.info("Ran Flyway migrations...");
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
.copySecrets(false)
.build();
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, configDatabase);
@@ -136,14 +142,30 @@ public class BootloaderApp {
LOGGER.info("Set version to {}", currAirbyteVersion);
}
if (postLoadExecution != null) {
postLoadExecution.run();
LOGGER.info("Finished running post load Execution..");
}
postLoadExecution.run();
LOGGER.info("Finished running post load Execution..");
LOGGER.info("Finished bootstrapping Airbyte environment..");
}
private void initPersistences() {
try {
configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl()).getAndInitialize();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.maskSecrets(true)
.copySecrets(true)
.build();
configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor);
} catch (final IOException e) {
e.printStackTrace();
}
}
public static void main(final String[] args) throws Exception {
final var bootloader = new BootloaderApp();
bootloader.load();
@@ -230,16 +252,4 @@ public class BootloaderApp {
}
}
private static void cleanupZombies(final JobPersistence jobPersistence) throws IOException {
final Configs configs = new EnvConfigs();
final WorkflowClient wfClient =
WorkflowClient.newInstance(WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder().setTarget(configs.getTemporalHost()).build()));
for (final Job zombieJob : jobPersistence.listJobsWithStatus(JobStatus.RUNNING)) {
LOGGER.info("Kill zombie job {} for connection {}", zombieJob.getId(), zombieJob.getScope());
wfClient.newUntypedWorkflowStub("sync_" + zombieJob.getId())
.terminate("Zombie");
}
}
}

View File

@@ -0,0 +1,206 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.bootloader;
import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.JsonPaths;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@AllArgsConstructor
@Slf4j
public class SecretMigrator {
private final ConfigPersistence configPersistence;
private final Optional<SecretPersistence> secretPersistence;
@Value
static class ConnectorConfiguration {
private final UUID workspace;
private final JsonNode configuration;
private final JsonNode spec;
}
/**
* Perform a secret migration. It will load all the actor specs extract the secret JsonPath from it.
* Then for all the secret that are stored in a plain text format, it will save the plain text in
* the secret manager and store the coordinate in the config DB.
*/
public void migrateSecrets() throws JsonValidationException, IOException {
if (secretPersistence.isEmpty()) {
log.info("No secret persistence is provided, the migration won't be run ");
return;
}
final List<StandardSourceDefinition> standardSourceDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class);
final Map<UUID, JsonNode> definitionIdToSourceSpecs = standardSourceDefinitions
.stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final List<SourceConnection> sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
migrateSources(sources, definitionIdToSourceSpecs);
final List<StandardDestinationDefinition> standardDestinationDefinitions =
configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
StandardDestinationDefinition.class);
final Map<UUID, JsonNode> definitionIdToDestinationSpecs = standardDestinationDefinitions.stream()
.collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId,
def -> def.getSpec().getConnectionSpecification()));
final List<DestinationConnection> destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class);
migrateDestinations(destinations, definitionIdToDestinationSpecs);
}
/**
* This is migrating the secrets for the source actors
*/
@VisibleForTesting
void migrateSources(final List<SourceConnection> sources, final Map<UUID, JsonNode> definitionIdToSourceSpecs)
throws JsonValidationException, IOException {
log.info("Migrating Sources");
final List<SourceConnection> sourceConnections = sources.stream()
.map(source -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
source.getWorkspaceId(),
source.getConfiguration(),
definitionIdToSourceSpecs.get(source.getSourceDefinitionId())),
() -> UUID.randomUUID());
source.setConfiguration(migratedConfig);
return source;
})
.toList();
for (final SourceConnection source : sourceConnections) {
configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
}
}
/**
* This is migrating the secrets for the destination actors
*/
@VisibleForTesting
void migrateDestinations(final List<DestinationConnection> destinations, final Map<UUID, JsonNode> definitionIdToDestinationSpecs)
throws JsonValidationException, IOException {
log.info("Migration Destinations");
final List<DestinationConnection> destinationConnections = destinations.stream().map(destination -> {
final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration(
destination.getWorkspaceId(),
destination.getConfiguration(),
definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())),
() -> UUID.randomUUID());
destination.setConfiguration(migratedConfig);
return destination;
})
.toList();
for (final DestinationConnection destination : destinationConnections) {
configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination);
}
}
/**
* This is a generic method to migrate an actor configuration It will extract the secret path form
* the provided spec and then replace them by coordinates in the actor configuration
*/
@VisibleForTesting
JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier<UUID> uuidProvider) {
if (connectorConfiguration.getSpec() == null) {
throw new IllegalStateException("No connector definition to match the connector");
}
final AtomicReference<JsonNode> connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration());
final List<String> uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec())
.stream()
.flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream())
.toList();
final UUID workspaceId = connectorConfiguration.getWorkspace();
uniqSecretPaths.forEach(secretPath -> {
final Optional<JsonNode> secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath);
if (secretValue.isEmpty()) {
throw new IllegalStateException("Missing secret for the path: " + secretPath);
}
// Only migrate plain text.
if (secretValue.get().isTextual()) {
final JsonNode stringSecretValue = secretValue.get();
final SecretCoordinate coordinate =
new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1);
secretPersistence.get().write(coordinate, stringSecretValue.textValue());
connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath,
Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate()))));
} else {
log.error("Not migrating already migrated secrets");
}
});
return connectorConfigurationJson.get();
}
/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) {
return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement);
}
/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getSecretPath(final JsonNode specs) {
return SecretsHelpers.getSortedSecretPaths(specs);
}
/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
List<String> getAllExplodedPath(final JsonNode node, final String path) {
return JsonPaths.getPaths(node, path);
}
/**
* Wrapper to help to mock static methods
*/
@VisibleForTesting
Optional<JsonNode> getValueForPath(final JsonNode node, final String path) {
return JsonPaths.getSingleValue(node, path);
}
}

View File

@@ -4,6 +4,7 @@
package io.airbyte.bootloader;
import static io.airbyte.config.Configs.SecretPersistenceType.TESTING_CONFIG_DB_TABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -11,15 +12,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.init.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.val;
import org.junit.jupiter.api.Test;
@@ -57,6 +69,8 @@ public class BootloaderAppTest {
val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
val mockedSecretMigrator = mock(SecretMigrator.class);
// Although we are able to inject mocked configs into the Bootloader, a particular migration in the
// configs database
// requires the env var to be set. Flyway prevents injection, so we dynamically set this instead.
@@ -64,7 +78,7 @@ public class BootloaderAppTest {
environmentVariables.set("DATABASE_PASSWORD", "docker");
environmentVariables.set("DATABASE_URL", container.getJdbcUrl());
val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags);
val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator);
bootloader.load();
val jobDatabase = new JobsDatabaseInstance(
@@ -88,6 +102,90 @@ public class BootloaderAppTest {
assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get());
}
@Test
void testBootloaderAppRunSecretMigration() throws Exception {
val container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("public")
.withUsername("docker")
.withPassword("docker");
container.start();
val version = "0.33.0-alpha";
val mockedConfigs = mock(Configs.class);
when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl());
when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl());
when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername());
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE);
val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
val mockedSecretMigrator = mock(SecretMigrator.class);
// Although we are able to inject mocked configs into the Bootloader, a particular migration in the
// configs database
// requires the env var to be set. Flyway prevents injection, so we dynamically set this instead.
environmentVariables.set("DATABASE_USER", "docker");
environmentVariables.set("DATABASE_PASSWORD", "docker");
environmentVariables.set("DATABASE_URL", container.getJdbcUrl());
val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator);
initBootloader.load();
final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.copySecrets(true)
.maskSecrets(true)
.build();
final Database configDatabase = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl());
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
final ConfigPersistence localSchema = YamlSeedConfigPersistence.getDefault();
final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase);
configRepository.loadDataNoSecrets(localSchema);
final String sourceSpecs = """
{
"account_id": "1234567891234567",
"start_date": "2022-04-01T00:00:00Z",
"access_token": "nonhiddensecret",
"include_deleted": false,
"fetch_thumbnail_images": false
}
""";
final ObjectMapper mapper = new ObjectMapper();
final UUID workspaceId = UUID.randomUUID();
configRepository.writeStandardWorkspace(new StandardWorkspace()
.withWorkspaceId(workspaceId)
.withName("wName")
.withSlug("wSlug")
.withEmail("email@mail.com")
.withTombstone(false)
.withInitialSetupComplete(false));
final UUID sourceId = UUID.randomUUID();
configRepository.writeSourceConnectionNoSecrets(new SourceConnection()
.withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing
.withSourceId(sourceId)
.withName("test source")
.withWorkspaceId(workspaceId)
.withConfiguration(mapper.readTree(sourceSpecs)));
when(mockedFeatureFlags.runSecretMigration()).thenReturn(true);
val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags);
bootloader.load();
final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId);
assertFalse(sourceConnection.getConfiguration().toString().contains("nonhiddensecret"));
assertTrue(sourceConnection.getConfiguration().toString().contains("_secret"));
}
@Test
void testIsLegalUpgradePredicate() {
// starting from no previous version is always legal.
@@ -134,7 +232,9 @@ public class BootloaderAppTest {
val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags).load();
val mockedSecretMigrator = mock(SecretMigrator.class);
new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags, mockedSecretMigrator).load();
assertTrue(testTriggered.get());
}

View File

@@ -0,0 +1,240 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.bootloader;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.bootloader.SecretMigrator.ConnectorConfiguration;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretCoordinate;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class SecretMigratorTest {
private final UUID workspaceId = UUID.randomUUID();
@Mock
private ConfigPersistence configPersistence;
@Mock
private SecretPersistence secretPersistence;
private SecretMigrator secretMigrator;
@BeforeEach
void setup() {
secretMigrator = Mockito.spy(new SecretMigrator(configPersistence, Optional.of(secretPersistence)));
}
@Test
public void testMigrateSecret() throws JsonValidationException, IOException {
final JsonNode sourceSpec = Jsons.jsonNode("sourceSpec");
final UUID sourceDefinitionId = UUID.randomUUID();
final StandardSourceDefinition standardSourceDefinition = new StandardSourceDefinition()
.withSourceDefinitionId(sourceDefinitionId)
.withSpec(
new ConnectorSpecification()
.withConnectionSpecification(sourceSpec));
final Map<UUID, JsonNode> standardSourceDefinitions = new HashMap<>();
standardSourceDefinitions.put(sourceDefinitionId, standardSourceDefinition.getSpec().getConnectionSpecification());
Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(Lists.newArrayList(standardSourceDefinition));
final JsonNode sourceConfiguration = Jsons.jsonNode("sourceConfiguration");
final SourceConnection sourceConnection = new SourceConnection()
.withSourceId(UUID.randomUUID())
.withSourceDefinitionId(sourceDefinitionId)
.withConfiguration(sourceConfiguration)
.withWorkspaceId(workspaceId);
final List<SourceConnection> sourceConnections = Lists.newArrayList(sourceConnection);
Mockito.when(configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class))
.thenReturn(sourceConnections);
final JsonNode destinationSpec = Jsons.jsonNode("destinationSpec");
final UUID destinationDefinitionId = UUID.randomUUID();
final StandardDestinationDefinition standardDestinationDefinition = new StandardDestinationDefinition()
.withDestinationDefinitionId(destinationDefinitionId)
.withSpec(
new ConnectorSpecification()
.withConnectionSpecification(destinationSpec));
final Map<UUID, JsonNode> standardDestinationDefinitions = new HashMap<>();
standardDestinationDefinitions.put(destinationDefinitionId, standardDestinationDefinition.getSpec().getConnectionSpecification());
Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(Lists.newArrayList(standardDestinationDefinition));
final JsonNode destinationConfiguration = Jsons.jsonNode("destinationConfiguration");
final DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(UUID.randomUUID())
.withDestinationDefinitionId(destinationDefinitionId)
.withConfiguration(destinationConfiguration)
.withWorkspaceId(workspaceId);
final List<DestinationConnection> destinationConnections = Lists.newArrayList(destinationConnection);
Mockito.when(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class))
.thenReturn(destinationConnections);
// Mockito.doNothing().when(secretMigrator).migrateDestinations(Mockito.any(), Mockito.any());
final String path = "Mocked static call source";
Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(sourceSpec);
Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(sourceConfiguration, path);
final String sourceSecret = "sourceSecret";
Mockito.doReturn(Optional.of(Jsons.jsonNode(sourceSecret))).when(secretMigrator).getValueForPath(sourceConfiguration, path);
Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(destinationSpec);
Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(destinationConfiguration, path);
final String destinationSecret = "destinationSecret";
Mockito.doReturn(Optional.of(Jsons.jsonNode(destinationSecret))).when(secretMigrator).getValueForPath(destinationConfiguration, path);
Mockito.doReturn(Jsons.jsonNode("sanitized")).when(secretMigrator).replaceAtJsonNode(Mockito.any(), Mockito.any(), Mockito.any());
secretMigrator.migrateSecrets();
Mockito.verify(secretMigrator).migrateSources(sourceConnections, standardSourceDefinitions);
Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(sourceSecret));
secretPersistence.write(Mockito.any(), Mockito.any());
Mockito.verify(secretMigrator).migrateDestinations(destinationConnections, standardDestinationDefinitions);
Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(destinationSecret));
}
@Test
void testSourceMigration() throws JsonValidationException, IOException {
final UUID definitionId1 = UUID.randomUUID();
final UUID definitionId2 = UUID.randomUUID();
final UUID sourceId1 = UUID.randomUUID();
final UUID sourceId2 = UUID.randomUUID();
final JsonNode sourceConfiguration1 = Jsons.jsonNode("conf1");
final JsonNode sourceConfiguration2 = Jsons.jsonNode("conf2");
final JsonNode sourceDefinition1 = Jsons.jsonNode("def1");
final JsonNode sourceDefinition2 = Jsons.jsonNode("def2");
final SourceConnection sourceConnection1 = new SourceConnection()
.withSourceId(sourceId1)
.withSourceDefinitionId(definitionId1)
.withConfiguration(sourceConfiguration1);
final SourceConnection sourceConnection2 = new SourceConnection()
.withSourceId(sourceId2)
.withSourceDefinitionId(definitionId2)
.withConfiguration(sourceConfiguration2);
final List<SourceConnection> sources = Lists.newArrayList(sourceConnection1, sourceConnection2);
final Map<UUID, JsonNode> definitionIdToDestinationSpecs = new HashMap<>();
definitionIdToDestinationSpecs.put(definitionId1, sourceDefinition1);
definitionIdToDestinationSpecs.put(definitionId2, sourceDefinition2);
Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration(
Mockito.any(),
Mockito.any());
secretMigrator.migrateSources(sources, definitionIdToDestinationSpecs);
Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId1.toString(), sourceConnection1);
Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId2.toString(), sourceConnection2);
}
@Test
void testDestinationMigration() throws JsonValidationException, IOException {
final UUID definitionId1 = UUID.randomUUID();
final UUID definitionId2 = UUID.randomUUID();
final UUID destinationId1 = UUID.randomUUID();
final UUID destinationId2 = UUID.randomUUID();
final JsonNode destinationConfiguration1 = Jsons.jsonNode("conf1");
final JsonNode destinationConfiguration2 = Jsons.jsonNode("conf2");
final JsonNode destinationDefinition1 = Jsons.jsonNode("def1");
final JsonNode destinationDefinition2 = Jsons.jsonNode("def2");
final DestinationConnection destinationConnection1 = new DestinationConnection()
.withDestinationId(destinationId1)
.withDestinationDefinitionId(definitionId1)
.withConfiguration(destinationConfiguration1);
final DestinationConnection destinationConnection2 = new DestinationConnection()
.withDestinationId(destinationId2)
.withDestinationDefinitionId(definitionId2)
.withConfiguration(destinationConfiguration2);
final List<DestinationConnection> destinations = Lists.newArrayList(destinationConnection1, destinationConnection2);
final Map<UUID, JsonNode> definitionIdToDestinationSpecs = new HashMap<>();
definitionIdToDestinationSpecs.put(definitionId1, destinationDefinition1);
definitionIdToDestinationSpecs.put(definitionId2, destinationDefinition2);
Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration(
Mockito.any(),
Mockito.any());
secretMigrator.migrateDestinations(destinations, definitionIdToDestinationSpecs);
Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId1.toString(), destinationConnection1);
Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId2.toString(), destinationConnection2);
}
@Test
void testMigrateConfigurationWithoutSpecs() {
final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(null, null, null);
Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, null));
}
@Test
void testMissingSecret() {
final List<String> secretPathList = Lists.newArrayList("secretPath");
Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any());
Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0)));
Mockito.doReturn(Optional.empty()).when(secretMigrator).getValueForPath(Mockito.any(), Mockito.eq(secretPathList.get(0)));
final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject());
Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID()));
}
@Test
void testMigrateConfiguration() {
final List<String> secretPathList = Lists.newArrayList("$.secretPath");
Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any());
Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0)));
Mockito.doReturn(Optional.of(Jsons.jsonNode(secretPathList.get(0)))).when(secretMigrator).getValueForPath(Mockito.any(),
Mockito.eq(secretPathList.get(0)));
final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject());
secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID());
Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.any());
}
@Test
void testMigrateConfigurationAlreadyInSecretManager() {
final List<String> secretPathList = Lists.newArrayList("$.secretPath");
Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any());
Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0)));
final SecretCoordinate fakeCoordinate = new SecretCoordinate("fake", 1);
Mockito.doReturn(Optional.of(Jsons.jsonNode(fakeCoordinate))).when(secretMigrator).getValueForPath(Mockito.any(),
Mockito.eq(secretPathList.get(0)));
final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject());
secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID());
Mockito.verify(secretPersistence, Mockito.times(0)).write(Mockito.any(), Mockito.any());
}
}

View File

@@ -28,4 +28,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
return Boolean.parseBoolean(System.getenv("EXPOSE_SECRETS_IN_EXPORT"));
}
@Override
public boolean runSecretMigration() {
return Boolean.parseBoolean(System.getenv("MIGRATE_SECRET_STORE"));
}
}

View File

@@ -16,4 +16,6 @@ public interface FeatureFlags {
boolean exposeSecretsInExport();
boolean runSecretMigration();
}

View File

@@ -714,6 +714,15 @@
type: "string"
description: "Api key of bamboo hr"
airbyte_secret: true
custom_reports_fields:
type: "string"
default: ""
description: "Comma-separated list of fields to include in custom reports."
custom_reports_include_default_fields:
type: "boolean"
default: true
description: "If true, the custom reports endpoint will include the default\
\ fields defined here: https://documentation.bamboohr.com/docs/list-of-field-names."
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
@@ -1409,7 +1418,7 @@
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-confluence:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/confluence"
documentationUrl: "https://docsurl.com"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Confluence Spec"
@@ -2372,8 +2381,7 @@
title: "Domain"
examples:
- "myaccount.freshdesk.com"
pattern:
- "^[a-zA-Z0-9._-]*\\.freshdesk\\.com$"
pattern: "^[a-zA-Z0-9._-]*\\.freshdesk\\.com$"
api_key:
type: "string"
title: "API Key"
@@ -4054,6 +4062,7 @@
- "OAUTHBEARER"
- "SCRAM-SHA-256"
- "SCRAM-SHA-512"
- "PLAIN"
sasl_jaas_config:
title: "SASL JAAS Config"
description: "The JAAS login context parameters for SASL connections\
@@ -4178,7 +4187,7 @@
- "append"
- dockerImage: "airbyte/source-lemlist:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/source-lemlist"
documentationUrl: "https://docsurl.com"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Lemlist Spec"
@@ -8474,6 +8483,15 @@
description: "Access Token for making authenticated requests. See the <a\
\ href=\"https://docs.airbyte.io/integrations/sources/surveymonkey\">docs</a>\
\ for information on how to generate this key."
survey_ids:
type: "array"
items:
type: "string"
pattern: "^[0-9]{8,9}$"
title: "Survey Monkey survey IDs"
description: "IDs of the surveys from which you'd like to replicate data.\
\ If left empty, data from all boards to which you have access will be\
\ replicated."
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
@@ -9691,11 +9709,13 @@
additionalProperties: true
properties:
api_token:
title: "API Token"
type: "string"
description: "Kustomer API Token. See the <a href=\"https://developer.kustomer.com/kustomer-api-docs/reference/authentication\"\
>docs</a> on how to obtain this"
airbyte_secret: true
start_date:
title: "Start Date"
type: "string"
description: "The date from which you'd like to replicate the data"
examples:

View File

@@ -6,6 +6,7 @@ package io.airbyte.config.persistence.split_secrets;
import com.google.api.client.util.Preconditions;
import java.util.Objects;
import lombok.ToString;
/**
* A secret coordinate represents a specific secret at a specific version stored within a
@@ -25,6 +26,7 @@ import java.util.Objects;
* This coordinate system was designed to work well with Google Secrets Manager but should work with
* other secret storage backends as well.
*/
@ToString
public class SecretCoordinate {
private final String coordinateBase;

View File

@@ -304,6 +304,10 @@ public class SecretsHelpers {
return getSecretCoordinate("airbyte_workspace_", newSecret, secretReader, workspaceId, uuidSupplier, oldSecretFullCoordinate);
}
public static String getCoordinatorBase(final String secretBasePrefix, final UUID secretBaseId, final Supplier<UUID> uuidSupplier) {
return secretBasePrefix + secretBaseId + "_secret_" + uuidSupplier.get();
}
private static SecretCoordinate getSecretCoordinate(final String secretBasePrefix,
final String newSecret,
final ReadOnlySecretPersistence secretReader,
@@ -329,7 +333,7 @@ public class SecretsHelpers {
if (coordinateBase == null) {
// IMPORTANT: format of this cannot be changed without introducing migrations for secrets
// persistences
coordinateBase = secretBasePrefix + secretBaseId + "_secret_" + uuidSupplier.get();
coordinateBase = getCoordinatorBase(secretBasePrefix, secretBaseId, uuidSupplier);
}
if (version == null) {