diff --git a/.env b/.env index 66854ce51dc..f035e06dfd2 100644 --- a/.env +++ b/.env @@ -93,3 +93,4 @@ MAX_DISCOVER_WORKERS=5 NEW_SCHEDULER=false AUTO_DISABLE_FAILING_CONNECTIONS=false EXPOSE_SECRETS_IN_EXPORT=false +MIGRATE_SECRET_STORE=false diff --git a/airbyte-bootloader/build.gradle b/airbyte-bootloader/build.gradle index 499b05531c0..4fb63889cba 100644 --- a/airbyte-bootloader/build.gradle +++ b/airbyte-bootloader/build.gradle @@ -10,8 +10,8 @@ dependencies { implementation project(':airbyte-config:persistence') implementation project(':airbyte-db:lib') implementation project(":airbyte-json-validation") + implementation project(':airbyte-protocol:models') implementation project(':airbyte-scheduler:persistence') - implementation project(':airbyte-scheduler:models') implementation 'io.temporal:temporal-sdk:1.8.1' implementation "org.flywaydb:flyway-core:7.14.0" diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java index 3b495c40f90..4c5d3d67fde 100644 --- a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/BootloaderApp.java @@ -17,20 +17,16 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.db.Database; import io.airbyte.db.instance.DatabaseMigrator; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; -import io.airbyte.scheduler.models.Job; -import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.validation.json.JsonValidationException; -import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; -import io.temporal.serviceclient.WorkflowServiceStubsOptions; import java.io.IOException; import java.util.Optional; import java.util.UUID; @@ -55,13 +51,15 @@ public class BootloaderApp { private static final AirbyteVersion VERSION_BREAK = new AirbyteVersion("0.32.0-alpha"); private final Configs configs; - private Runnable postLoadExecution; + private final Runnable postLoadExecution; private final FeatureFlags featureFlags; + private final SecretMigrator secretMigrator; + private ConfigPersistence configPersistence; + private Database configDatabase; @VisibleForTesting - public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) { - this.configs = configs; - this.featureFlags = featureFlags; + public BootloaderApp(final Configs configs, final FeatureFlags featureFlags, final SecretMigrator secretMigrator) { + this(configs, () -> {}, featureFlags, secretMigrator); } /** @@ -72,41 +70,54 @@ public class BootloaderApp { * @param configs * @param postLoadExecution */ - public BootloaderApp(final Configs configs, final Runnable postLoadExecution, final FeatureFlags featureFlags) { + public BootloaderApp(final Configs configs, + final Runnable postLoadExecution, + final FeatureFlags featureFlags, + final SecretMigrator secretMigrator) { this.configs = configs; this.postLoadExecution = postLoadExecution; this.featureFlags = featureFlags; + this.secretMigrator = secretMigrator; + + initPersistences(); + } + + public BootloaderApp(final Configs configs, final FeatureFlags featureFlags) { + this.configs = configs; + this.featureFlags = featureFlags; + + try { + initPersistences(); + final Optional secretPersistence = SecretPersistence.getLongLived(configs); + secretMigrator = new SecretMigrator(configPersistence, secretPersistence); + + postLoadExecution = () -> { + try { + configPersistence.loadData(YamlSeedConfigPersistence.getDefault()); + + if (featureFlags.runSecretMigration()) { + secretMigrator.migrateSecrets(); + } + LOGGER.info("Loaded seed data.."); + } catch (final IOException | JsonValidationException e) { + throw new RuntimeException(e); + } + }; + + } catch (final IOException e) { + throw new RuntimeException(e); + } } public BootloaderApp() { - configs = new EnvConfigs(); - featureFlags = new EnvVariableFeatureFlags(); - postLoadExecution = () -> { - try { - final Database configDatabase = - new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getAndInitialize(); - final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) - .copySecrets(true) - .build(); - final ConfigPersistence configPersistence = - DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); - configPersistence.loadData(YamlSeedConfigPersistence.getDefault()); - LOGGER.info("Loaded seed data.."); - } catch (final IOException e) { - e.printStackTrace(); - } - }; + this(new EnvConfigs(), new EnvVariableFeatureFlags()); } public void load() throws Exception { LOGGER.info("Setting up config database and default workspace.."); try ( - final Database configDatabase = - new ConfigsDatabaseInstance(configs.getConfigDatabaseUser(), configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) - .getAndInitialize(); + final Database jobDatabase = new JobsDatabaseInstance(configs.getDatabaseUser(), configs.getDatabasePassword(), configs.getDatabaseUrl()).getAndInitialize()) { LOGGER.info("Created initial jobs and configs database..."); @@ -118,11 +129,6 @@ public class BootloaderApp { runFlywayMigration(configs, configDatabase, jobDatabase); LOGGER.info("Ran Flyway migrations..."); - final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() - .maskSecrets(!featureFlags.exposeSecretsInExport()) - .copySecrets(false) - .build(); - final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase); @@ -136,14 +142,30 @@ public class BootloaderApp { LOGGER.info("Set version to {}", currAirbyteVersion); } - if (postLoadExecution != null) { - postLoadExecution.run(); - LOGGER.info("Finished running post load Execution.."); - } + postLoadExecution.run(); + LOGGER.info("Finished running post load Execution.."); LOGGER.info("Finished bootstrapping Airbyte environment.."); } + private void initPersistences() { + try { + configDatabase = new ConfigsDatabaseInstance( + configs.getConfigDatabaseUser(), + configs.getConfigDatabasePassword(), + configs.getConfigDatabaseUrl()).getAndInitialize(); + + final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() + .maskSecrets(true) + .copySecrets(true) + .build(); + + configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase, jsonSecretsProcessor); + } catch (final IOException e) { + e.printStackTrace(); + } + } + public static void main(final String[] args) throws Exception { final var bootloader = new BootloaderApp(); bootloader.load(); @@ -230,16 +252,4 @@ public class BootloaderApp { } } - private static void cleanupZombies(final JobPersistence jobPersistence) throws IOException { - final Configs configs = new EnvConfigs(); - final WorkflowClient wfClient = - WorkflowClient.newInstance(WorkflowServiceStubs.newInstance( - WorkflowServiceStubsOptions.newBuilder().setTarget(configs.getTemporalHost()).build())); - for (final Job zombieJob : jobPersistence.listJobsWithStatus(JobStatus.RUNNING)) { - LOGGER.info("Kill zombie job {} for connection {}", zombieJob.getId(), zombieJob.getScope()); - wfClient.newUntypedWorkflowStub("sync_" + zombieJob.getId()) - .terminate("Zombie"); - } - } - } diff --git a/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java new file mode 100644 index 00000000000..b18edc29a15 --- /dev/null +++ b/airbyte-bootloader/src/main/java/io/airbyte/bootloader/SecretMigrator.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import static io.airbyte.config.persistence.split_secrets.SecretsHelpers.COORDINATE_FIELD; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.JsonPaths; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.split_secrets.SecretCoordinate; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.config.persistence.split_secrets.SecretsHelpers; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; + +@AllArgsConstructor +@Slf4j +public class SecretMigrator { + + private final ConfigPersistence configPersistence; + private final Optional secretPersistence; + + @Value + static class ConnectorConfiguration { + + private final UUID workspace; + private final JsonNode configuration; + private final JsonNode spec; + + } + + /** + * Perform a secret migration. It will load all the actor specs extract the secret JsonPath from it. + * Then for all the secret that are stored in a plain text format, it will save the plain text in + * the secret manager and store the coordinate in the config DB. + */ + public void migrateSecrets() throws JsonValidationException, IOException { + if (secretPersistence.isEmpty()) { + log.info("No secret persistence is provided, the migration won't be run "); + + return; + } + final List standardSourceDefinitions = + configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); + + final Map definitionIdToSourceSpecs = standardSourceDefinitions + .stream().collect(Collectors.toMap(StandardSourceDefinition::getSourceDefinitionId, + def -> def.getSpec().getConnectionSpecification())); + + final List sources = configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); + + migrateSources(sources, definitionIdToSourceSpecs); + + final List standardDestinationDefinitions = + configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, + StandardDestinationDefinition.class); + + final Map definitionIdToDestinationSpecs = standardDestinationDefinitions.stream() + .collect(Collectors.toMap(StandardDestinationDefinition::getDestinationDefinitionId, + def -> def.getSpec().getConnectionSpecification())); + + final List destinations = configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); + + migrateDestinations(destinations, definitionIdToDestinationSpecs); + } + + /** + * This is migrating the secrets for the source actors + */ + @VisibleForTesting + void migrateSources(final List sources, final Map definitionIdToSourceSpecs) + throws JsonValidationException, IOException { + log.info("Migrating Sources"); + final List sourceConnections = sources.stream() + .map(source -> { + final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( + source.getWorkspaceId(), + source.getConfiguration(), + definitionIdToSourceSpecs.get(source.getSourceDefinitionId())), + () -> UUID.randomUUID()); + source.setConfiguration(migratedConfig); + return source; + }) + .toList(); + + for (final SourceConnection source : sourceConnections) { + configPersistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source); + } + } + + /** + * This is migrating the secrets for the destination actors + */ + @VisibleForTesting + void migrateDestinations(final List destinations, final Map definitionIdToDestinationSpecs) + throws JsonValidationException, IOException { + log.info("Migration Destinations"); + + final List destinationConnections = destinations.stream().map(destination -> { + final JsonNode migratedConfig = migrateConfiguration(new ConnectorConfiguration( + destination.getWorkspaceId(), + destination.getConfiguration(), + definitionIdToDestinationSpecs.get(destination.getDestinationDefinitionId())), + () -> UUID.randomUUID()); + destination.setConfiguration(migratedConfig); + return destination; + }) + .toList(); + for (final DestinationConnection destination : destinationConnections) { + configPersistence.writeConfig(ConfigSchema.DESTINATION_CONNECTION, destination.getDestinationId().toString(), destination); + } + } + + /** + * This is a generic method to migrate an actor configuration It will extract the secret path form + * the provided spec and then replace them by coordinates in the actor configuration + */ + @VisibleForTesting + JsonNode migrateConfiguration(final ConnectorConfiguration connectorConfiguration, final Supplier uuidProvider) { + if (connectorConfiguration.getSpec() == null) { + throw new IllegalStateException("No connector definition to match the connector"); + } + + final AtomicReference connectorConfigurationJson = new AtomicReference<>(connectorConfiguration.getConfiguration()); + final List uniqSecretPaths = getSecretPath(connectorConfiguration.getSpec()) + .stream() + .flatMap(secretPath -> getAllExplodedPath(connectorConfigurationJson.get(), secretPath).stream()) + .toList(); + + final UUID workspaceId = connectorConfiguration.getWorkspace(); + uniqSecretPaths.forEach(secretPath -> { + final Optional secretValue = getValueForPath(connectorConfigurationJson.get(), secretPath); + if (secretValue.isEmpty()) { + throw new IllegalStateException("Missing secret for the path: " + secretPath); + } + + // Only migrate plain text. + if (secretValue.get().isTextual()) { + final JsonNode stringSecretValue = secretValue.get(); + + final SecretCoordinate coordinate = + new SecretCoordinate(SecretsHelpers.getCoordinatorBase("airbyte_workspace_", workspaceId, uuidProvider), 1); + secretPersistence.get().write(coordinate, stringSecretValue.textValue()); + connectorConfigurationJson.set(replaceAtJsonNode(connectorConfigurationJson.get(), secretPath, + Jsons.jsonNode(Map.of(COORDINATE_FIELD, coordinate.getFullCoordinate())))); + } else { + log.error("Not migrating already migrated secrets"); + } + + }); + + return connectorConfigurationJson.get(); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + JsonNode replaceAtJsonNode(final JsonNode connectorConfigurationJson, final String secretPath, final JsonNode replacement) { + return JsonPaths.replaceAtJsonNode(connectorConfigurationJson, secretPath, replacement); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + List getSecretPath(final JsonNode specs) { + return SecretsHelpers.getSortedSecretPaths(specs); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + List getAllExplodedPath(final JsonNode node, final String path) { + return JsonPaths.getPaths(node, path); + } + + /** + * Wrapper to help to mock static methods + */ + @VisibleForTesting + Optional getValueForPath(final JsonNode node, final String path) { + return JsonPaths.getSingleValue(node, path); + } + +} diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 243bbb8227b..62e8e969b08 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -4,6 +4,7 @@ package io.airbyte.bootloader; +import static io.airbyte.config.Configs.SecretPersistenceType.TESTING_CONFIG_DB_TABLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -11,15 +12,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.init.YamlSeedConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.DatabaseConfigPersistence; +import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import lombok.val; import org.junit.jupiter.api.Test; @@ -57,6 +69,8 @@ public class BootloaderAppTest { val mockedFeatureFlags = mock(FeatureFlags.class); when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); + val mockedSecretMigrator = mock(SecretMigrator.class); + // Although we are able to inject mocked configs into the Bootloader, a particular migration in the // configs database // requires the env var to be set. Flyway prevents injection, so we dynamically set this instead. @@ -64,7 +78,7 @@ public class BootloaderAppTest { environmentVariables.set("DATABASE_PASSWORD", "docker"); environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); - val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags); + val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator); bootloader.load(); val jobDatabase = new JobsDatabaseInstance( @@ -88,6 +102,90 @@ public class BootloaderAppTest { assertNotEquals(Optional.empty(), jobsPersistence.getDeployment().get()); } + @Test + void testBootloaderAppRunSecretMigration() throws Exception { + val container = new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName("public") + .withUsername("docker") + .withPassword("docker"); + container.start(); + val version = "0.33.0-alpha"; + + val mockedConfigs = mock(Configs.class); + when(mockedConfigs.getConfigDatabaseUrl()).thenReturn(container.getJdbcUrl()); + when(mockedConfigs.getConfigDatabaseUser()).thenReturn(container.getUsername()); + when(mockedConfigs.getConfigDatabasePassword()).thenReturn(container.getPassword()); + when(mockedConfigs.getDatabaseUrl()).thenReturn(container.getJdbcUrl()); + when(mockedConfigs.getDatabaseUser()).thenReturn(container.getUsername()); + when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword()); + when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version)); + when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true); + when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE); + + val mockedFeatureFlags = mock(FeatureFlags.class); + when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); + + val mockedSecretMigrator = mock(SecretMigrator.class); + + // Although we are able to inject mocked configs into the Bootloader, a particular migration in the + // configs database + // requires the env var to be set. Flyway prevents injection, so we dynamically set this instead. + environmentVariables.set("DATABASE_USER", "docker"); + environmentVariables.set("DATABASE_PASSWORD", "docker"); + environmentVariables.set("DATABASE_URL", container.getJdbcUrl()); + + val initBootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator); + initBootloader.load(); + + final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder() + .copySecrets(true) + .maskSecrets(true) + .build(); + final Database configDatabase = Databases.createPostgresDatabase(container.getUsername(), container.getPassword(), container.getJdbcUrl()); + final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor); + final ConfigPersistence localSchema = YamlSeedConfigPersistence.getDefault(); + final ConfigRepository configRepository = new ConfigRepository(configPersistence, configDatabase); + configRepository.loadDataNoSecrets(localSchema); + + final String sourceSpecs = """ + { + "account_id": "1234567891234567", + "start_date": "2022-04-01T00:00:00Z", + "access_token": "nonhiddensecret", + "include_deleted": false, + "fetch_thumbnail_images": false + } + + """; + + final ObjectMapper mapper = new ObjectMapper(); + + final UUID workspaceId = UUID.randomUUID(); + configRepository.writeStandardWorkspace(new StandardWorkspace() + .withWorkspaceId(workspaceId) + .withName("wName") + .withSlug("wSlug") + .withEmail("email@mail.com") + .withTombstone(false) + .withInitialSetupComplete(false)); + final UUID sourceId = UUID.randomUUID(); + configRepository.writeSourceConnectionNoSecrets(new SourceConnection() + .withSourceDefinitionId(UUID.fromString("e7778cfc-e97c-4458-9ecb-b4f2bba8946c")) // Facebook Marketing + .withSourceId(sourceId) + .withName("test source") + .withWorkspaceId(workspaceId) + .withConfiguration(mapper.readTree(sourceSpecs))); + + when(mockedFeatureFlags.runSecretMigration()).thenReturn(true); + val bootloader = new BootloaderApp(mockedConfigs, mockedFeatureFlags); + bootloader.load(); + + final SourceConnection sourceConnection = configRepository.getSourceConnection(sourceId); + + assertFalse(sourceConnection.getConfiguration().toString().contains("nonhiddensecret")); + assertTrue(sourceConnection.getConfiguration().toString().contains("_secret")); + } + @Test void testIsLegalUpgradePredicate() { // starting from no previous version is always legal. @@ -134,7 +232,9 @@ public class BootloaderAppTest { val mockedFeatureFlags = mock(FeatureFlags.class); when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false); - new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags).load(); + val mockedSecretMigrator = mock(SecretMigrator.class); + + new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags, mockedSecretMigrator).load(); assertTrue(testTriggered.get()); } diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java new file mode 100644 index 00000000000..7d8998e37dc --- /dev/null +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/SecretMigratorTest.java @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.bootloader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.bootloader.SecretMigrator.ConnectorConfiguration; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.split_secrets.SecretCoordinate; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class SecretMigratorTest { + + private final UUID workspaceId = UUID.randomUUID(); + + @Mock + private ConfigPersistence configPersistence; + + @Mock + private SecretPersistence secretPersistence; + + private SecretMigrator secretMigrator; + + @BeforeEach + void setup() { + secretMigrator = Mockito.spy(new SecretMigrator(configPersistence, Optional.of(secretPersistence))); + } + + @Test + public void testMigrateSecret() throws JsonValidationException, IOException { + final JsonNode sourceSpec = Jsons.jsonNode("sourceSpec"); + final UUID sourceDefinitionId = UUID.randomUUID(); + final StandardSourceDefinition standardSourceDefinition = new StandardSourceDefinition() + .withSourceDefinitionId(sourceDefinitionId) + .withSpec( + new ConnectorSpecification() + .withConnectionSpecification(sourceSpec)); + final Map standardSourceDefinitions = new HashMap<>(); + standardSourceDefinitions.put(sourceDefinitionId, standardSourceDefinition.getSpec().getConnectionSpecification()); + Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(Lists.newArrayList(standardSourceDefinition)); + + final JsonNode sourceConfiguration = Jsons.jsonNode("sourceConfiguration"); + final SourceConnection sourceConnection = new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withSourceDefinitionId(sourceDefinitionId) + .withConfiguration(sourceConfiguration) + .withWorkspaceId(workspaceId); + final List sourceConnections = Lists.newArrayList(sourceConnection); + Mockito.when(configPersistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class)) + .thenReturn(sourceConnections); + + final JsonNode destinationSpec = Jsons.jsonNode("destinationSpec"); + final UUID destinationDefinitionId = UUID.randomUUID(); + final StandardDestinationDefinition standardDestinationDefinition = new StandardDestinationDefinition() + .withDestinationDefinitionId(destinationDefinitionId) + .withSpec( + new ConnectorSpecification() + .withConnectionSpecification(destinationSpec)); + final Map standardDestinationDefinitions = new HashMap<>(); + standardDestinationDefinitions.put(destinationDefinitionId, standardDestinationDefinition.getSpec().getConnectionSpecification()); + Mockito.when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(Lists.newArrayList(standardDestinationDefinition)); + + final JsonNode destinationConfiguration = Jsons.jsonNode("destinationConfiguration"); + final DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(UUID.randomUUID()) + .withDestinationDefinitionId(destinationDefinitionId) + .withConfiguration(destinationConfiguration) + .withWorkspaceId(workspaceId); + final List destinationConnections = Lists.newArrayList(destinationConnection); + Mockito.when(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class)) + .thenReturn(destinationConnections); + + // Mockito.doNothing().when(secretMigrator).migrateDestinations(Mockito.any(), Mockito.any()); + + final String path = "Mocked static call source"; + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(sourceSpec); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(sourceConfiguration, path); + final String sourceSecret = "sourceSecret"; + Mockito.doReturn(Optional.of(Jsons.jsonNode(sourceSecret))).when(secretMigrator).getValueForPath(sourceConfiguration, path); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getSecretPath(destinationSpec); + Mockito.doReturn(Lists.newArrayList(path)).when(secretMigrator).getAllExplodedPath(destinationConfiguration, path); + final String destinationSecret = "destinationSecret"; + Mockito.doReturn(Optional.of(Jsons.jsonNode(destinationSecret))).when(secretMigrator).getValueForPath(destinationConfiguration, path); + + Mockito.doReturn(Jsons.jsonNode("sanitized")).when(secretMigrator).replaceAtJsonNode(Mockito.any(), Mockito.any(), Mockito.any()); + secretMigrator.migrateSecrets(); + + Mockito.verify(secretMigrator).migrateSources(sourceConnections, standardSourceDefinitions); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(sourceSecret)); + secretPersistence.write(Mockito.any(), Mockito.any()); + Mockito.verify(secretMigrator).migrateDestinations(destinationConnections, standardDestinationDefinitions); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.eq(destinationSecret)); + } + + @Test + void testSourceMigration() throws JsonValidationException, IOException { + final UUID definitionId1 = UUID.randomUUID(); + final UUID definitionId2 = UUID.randomUUID(); + final UUID sourceId1 = UUID.randomUUID(); + final UUID sourceId2 = UUID.randomUUID(); + final JsonNode sourceConfiguration1 = Jsons.jsonNode("conf1"); + final JsonNode sourceConfiguration2 = Jsons.jsonNode("conf2"); + final JsonNode sourceDefinition1 = Jsons.jsonNode("def1"); + final JsonNode sourceDefinition2 = Jsons.jsonNode("def2"); + final SourceConnection sourceConnection1 = new SourceConnection() + .withSourceId(sourceId1) + .withSourceDefinitionId(definitionId1) + .withConfiguration(sourceConfiguration1); + final SourceConnection sourceConnection2 = new SourceConnection() + .withSourceId(sourceId2) + .withSourceDefinitionId(definitionId2) + .withConfiguration(sourceConfiguration2); + + final List sources = Lists.newArrayList(sourceConnection1, sourceConnection2); + final Map definitionIdToDestinationSpecs = new HashMap<>(); + definitionIdToDestinationSpecs.put(definitionId1, sourceDefinition1); + definitionIdToDestinationSpecs.put(definitionId2, sourceDefinition2); + + Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration( + Mockito.any(), + Mockito.any()); + + secretMigrator.migrateSources(sources, definitionIdToDestinationSpecs); + + Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId1.toString(), sourceConnection1); + Mockito.verify(configPersistence).writeConfig(ConfigSchema.SOURCE_CONNECTION, sourceId2.toString(), sourceConnection2); + } + + @Test + void testDestinationMigration() throws JsonValidationException, IOException { + final UUID definitionId1 = UUID.randomUUID(); + final UUID definitionId2 = UUID.randomUUID(); + final UUID destinationId1 = UUID.randomUUID(); + final UUID destinationId2 = UUID.randomUUID(); + final JsonNode destinationConfiguration1 = Jsons.jsonNode("conf1"); + final JsonNode destinationConfiguration2 = Jsons.jsonNode("conf2"); + final JsonNode destinationDefinition1 = Jsons.jsonNode("def1"); + final JsonNode destinationDefinition2 = Jsons.jsonNode("def2"); + final DestinationConnection destinationConnection1 = new DestinationConnection() + .withDestinationId(destinationId1) + .withDestinationDefinitionId(definitionId1) + .withConfiguration(destinationConfiguration1); + final DestinationConnection destinationConnection2 = new DestinationConnection() + .withDestinationId(destinationId2) + .withDestinationDefinitionId(definitionId2) + .withConfiguration(destinationConfiguration2); + + final List destinations = Lists.newArrayList(destinationConnection1, destinationConnection2); + final Map definitionIdToDestinationSpecs = new HashMap<>(); + definitionIdToDestinationSpecs.put(definitionId1, destinationDefinition1); + definitionIdToDestinationSpecs.put(definitionId2, destinationDefinition2); + + Mockito.doReturn(Jsons.emptyObject()).when(secretMigrator).migrateConfiguration( + Mockito.any(), + Mockito.any()); + + secretMigrator.migrateDestinations(destinations, definitionIdToDestinationSpecs); + + Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId1.toString(), destinationConnection1); + Mockito.verify(configPersistence).writeConfig(ConfigSchema.DESTINATION_CONNECTION, destinationId2.toString(), destinationConnection2); + } + + @Test + void testMigrateConfigurationWithoutSpecs() { + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(null, null, null); + + Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, null)); + } + + @Test + void testMissingSecret() { + final List secretPathList = Lists.newArrayList("secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + Mockito.doReturn(Optional.empty()).when(secretMigrator).getValueForPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + Assertions.assertThrows(IllegalStateException.class, () -> secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID())); + } + + @Test + void testMigrateConfiguration() { + final List secretPathList = Lists.newArrayList("$.secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + Mockito.doReturn(Optional.of(Jsons.jsonNode(secretPathList.get(0)))).when(secretMigrator).getValueForPath(Mockito.any(), + Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + + secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID()); + Mockito.verify(secretPersistence).write(Mockito.any(), Mockito.any()); + } + + @Test + void testMigrateConfigurationAlreadyInSecretManager() { + final List secretPathList = Lists.newArrayList("$.secretPath"); + + Mockito.doReturn(secretPathList).when(secretMigrator).getSecretPath(Mockito.any()); + Mockito.doReturn(secretPathList).when(secretMigrator).getAllExplodedPath(Mockito.any(), Mockito.eq(secretPathList.get(0))); + + final SecretCoordinate fakeCoordinate = new SecretCoordinate("fake", 1); + Mockito.doReturn(Optional.of(Jsons.jsonNode(fakeCoordinate))).when(secretMigrator).getValueForPath(Mockito.any(), + Mockito.eq(secretPathList.get(0))); + + final ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(UUID.randomUUID(), Jsons.emptyObject(), Jsons.emptyObject()); + + secretMigrator.migrateConfiguration(connectorConfiguration, () -> UUID.randomUUID()); + Mockito.verify(secretPersistence, Mockito.times(0)).write(Mockito.any(), Mockito.any()); + } + +} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index a0963e0765f..8b30b3fc9dc 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -28,4 +28,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags { return Boolean.parseBoolean(System.getenv("EXPOSE_SECRETS_IN_EXPORT")); } + @Override + public boolean runSecretMigration() { + return Boolean.parseBoolean(System.getenv("MIGRATE_SECRET_STORE")); + } + } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 78516bcb00a..fa0ea3dbd4b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -16,4 +16,6 @@ public interface FeatureFlags { boolean exposeSecretsInExport(); + boolean runSecretMigration(); + } diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4730203c20d..3234c416529 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -714,6 +714,15 @@ type: "string" description: "Api key of bamboo hr" airbyte_secret: true + custom_reports_fields: + type: "string" + default: "" + description: "Comma-separated list of fields to include in custom reports." + custom_reports_include_default_fields: + type: "boolean" + default: true + description: "If true, the custom reports endpoint will include the default\ + \ fields defined here: https://documentation.bamboohr.com/docs/list-of-field-names." supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] @@ -1409,7 +1418,7 @@ supported_destination_sync_modes: [] - dockerImage: "airbyte/source-confluence:0.1.1" spec: - documentationUrl: "https://docs.airbyte.io/integrations/sources/confluence" + documentationUrl: "https://docsurl.com" connectionSpecification: $schema: "http://json-schema.org/draft-07/schema#" title: "Confluence Spec" @@ -2372,8 +2381,7 @@ title: "Domain" examples: - "myaccount.freshdesk.com" - pattern: - - "^[a-zA-Z0-9._-]*\\.freshdesk\\.com$" + pattern: "^[a-zA-Z0-9._-]*\\.freshdesk\\.com$" api_key: type: "string" title: "API Key" @@ -4054,6 +4062,7 @@ - "OAUTHBEARER" - "SCRAM-SHA-256" - "SCRAM-SHA-512" + - "PLAIN" sasl_jaas_config: title: "SASL JAAS Config" description: "The JAAS login context parameters for SASL connections\ @@ -4178,7 +4187,7 @@ - "append" - dockerImage: "airbyte/source-lemlist:0.1.0" spec: - documentationUrl: "https://docs.airbyte.io/integrations/sources/source-lemlist" + documentationUrl: "https://docsurl.com" connectionSpecification: $schema: "http://json-schema.org/draft-07/schema#" title: "Lemlist Spec" @@ -8474,6 +8483,15 @@ description: "Access Token for making authenticated requests. See the docs\ \ for information on how to generate this key." + survey_ids: + type: "array" + items: + type: "string" + pattern: "^[0-9]{8,9}$" + title: "Survey Monkey survey IDs" + description: "IDs of the surveys from which you'd like to replicate data.\ + \ If left empty, data from all boards to which you have access will be\ + \ replicated." supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] @@ -9691,11 +9709,13 @@ additionalProperties: true properties: api_token: + title: "API Token" type: "string" description: "Kustomer API Token. See the docs on how to obtain this" airbyte_secret: true start_date: + title: "Start Date" type: "string" description: "The date from which you'd like to replicate the data" examples: diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretCoordinate.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretCoordinate.java index 969f46c6430..4ae410d95ee 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretCoordinate.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretCoordinate.java @@ -6,6 +6,7 @@ package io.airbyte.config.persistence.split_secrets; import com.google.api.client.util.Preconditions; import java.util.Objects; +import lombok.ToString; /** * A secret coordinate represents a specific secret at a specific version stored within a @@ -25,6 +26,7 @@ import java.util.Objects; * This coordinate system was designed to work well with Google Secrets Manager but should work with * other secret storage backends as well. */ +@ToString public class SecretCoordinate { private final String coordinateBase; diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretsHelpers.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretsHelpers.java index 63134333fce..e7d5fa5b81c 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretsHelpers.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/split_secrets/SecretsHelpers.java @@ -304,6 +304,10 @@ public class SecretsHelpers { return getSecretCoordinate("airbyte_workspace_", newSecret, secretReader, workspaceId, uuidSupplier, oldSecretFullCoordinate); } + public static String getCoordinatorBase(final String secretBasePrefix, final UUID secretBaseId, final Supplier uuidSupplier) { + return secretBasePrefix + secretBaseId + "_secret_" + uuidSupplier.get(); + } + private static SecretCoordinate getSecretCoordinate(final String secretBasePrefix, final String newSecret, final ReadOnlySecretPersistence secretReader, @@ -329,7 +333,7 @@ public class SecretsHelpers { if (coordinateBase == null) { // IMPORTANT: format of this cannot be changed without introducing migrations for secrets // persistences - coordinateBase = secretBasePrefix + secretBaseId + "_secret_" + uuidSupplier.get(); + coordinateBase = getCoordinatorBase(secretBasePrefix, secretBaseId, uuidSupplier); } if (version == null) {