diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 8676cd8c94c..42d7e14c6a7 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -91,9 +91,6 @@ public class DefaultJobPersistence implements JobPersistence { private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS; private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); - private static final Set SYSTEM_SCHEMA = Set - .of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal", - "catalog_history"); public static final String ATTEMPT_NUMBER = "attempt_number"; private static final String JOB_ID = "job_id"; private static final String WHERE = "WHERE "; @@ -797,18 +794,6 @@ public class DefaultJobPersistence implements JobPersistence { setMetadata(SECRET_MIGRATION_STATUS, "true"); } - private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration"; - - @Override - public boolean isSchedulerMigrated() throws IOException { - return getMetadata(SCHEDULER_MIGRATION_STATUS).count() == 1; - } - - @Override - public void setSchedulerMigrationDone() throws IOException { - setMetadata(SCHEDULER_MIGRATION_STATUS, "true"); - } - @Override public Optional getVersion() throws IOException { return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst(); @@ -915,27 +900,6 @@ public class DefaultJobPersistence implements JobPersistence { return exportDatabase(DEFAULT_SCHEMA); } - /** - * This is different from {@link #exportDatabase()} cause it exports all the tables in all the - * schemas available - */ - @Override - public Map> dump() throws IOException { - final Map> result = new HashMap<>(); - for (final String schema : listSchemas()) { - final List tables = listAllTables(schema); - - for (final String table : tables) { - if (result.containsKey(table)) { - throw new RuntimeException("Multiple tables found with the same name " + table); - } - result.put(table.toUpperCase(), exportTable(schema, table)); - } - } - - return result; - } - private Map> exportDatabase(final String schema) throws IOException { final List tables = listTables(schema); final Map> result = new HashMap<>(); @@ -982,25 +946,6 @@ public class DefaultJobPersistence implements JobPersistence { } } - private List listAllTables(final String schema) throws IOException { - if (schema != null) { - return jobDatabase.query(context -> context.meta().getSchemas(schema).stream() - .flatMap(s -> context.meta(s).getTables().stream()) - .map(Named::getName) - .collect(Collectors.toList())); - } else { - return List.of(); - } - } - - private List listSchemas() throws IOException { - return jobDatabase.query(context -> context.meta().getSchemas().stream() - .map(Named::getName) - .filter(c -> !SYSTEM_SCHEMA.contains(c)) - .collect(Collectors.toList())); - - } - private Stream exportTable(final String schema, final String tableName) throws IOException { final Table tableSql = getTable(schema, tableName); try (final Stream records = jobDatabase.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) { diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 91f3905ca6c..003e0387c6e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -280,8 +280,6 @@ public interface JobPersistence { */ Map> exportDatabase() throws IOException; - Map> dump() throws IOException; - /** * Import all SQL tables from streams of JsonNode objects. * @@ -306,22 +304,6 @@ public interface JobPersistence { */ void setSecretMigrationDone() throws IOException; - /** - * Check if the scheduler has been migrated to temporal. - * - * TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next - * "major" version bump as it will no longer be needed. - */ - boolean isSchedulerMigrated() throws IOException; - - /** - * Set that the scheduler migration has been performed. - * - * TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next - * "major" version bump as it will no longer be needed. - */ - void setSchedulerMigrationDone() throws IOException; - List getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException; } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 633bffb9571..74da5251d22 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -566,15 +566,6 @@ class DefaultJobPersistenceTest { assertTrue(isMigrated); } - @Test - void testSchedulerMigrationMetadata() throws IOException { - boolean isMigrated = jobPersistence.isSchedulerMigrated(); - assertFalse(isMigrated); - jobPersistence.setSchedulerMigrationDone(); - isMigrated = jobPersistence.isSchedulerMigrated(); - assertTrue(isMigrated); - } - @Test void testAirbyteProtocolVersionMaxMetadata() throws IOException { assertTrue(jobPersistence.getAirbyteProtocolVersionMax().isEmpty()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 6d2dd863338..c18b5eb84f6 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -4,7 +4,6 @@ package io.airbyte.server; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; @@ -18,10 +17,7 @@ import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.helpers.LogClientSingleton; -import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; @@ -70,16 +66,12 @@ import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient; import io.airbyte.server.scheduler.EventRunner; import io.airbyte.server.scheduler.TemporalEventRunner; import io.airbyte.validation.json.JsonSchemaValidator; -import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.temporal.serviceclient.WorkflowServiceStubs; -import java.io.IOException; import java.net.http.HttpClient; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; import javax.sql.DataSource; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -256,14 +248,6 @@ public class ServerApp implements ServerRunnable { final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); final EventRunner eventRunner = new TemporalEventRunner(temporalClient); - // It is important that the migration to the temporal scheduler is performed before the server - // accepts any requests. - // This is why this migration is performed here instead of in the bootloader - so that the server - // blocks on this. - // TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next - // "major" version bump as it will no longer be needed. - migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner); - final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); final JsonSchemaValidator schemaValidator = new JsonSchemaValidator(); @@ -370,28 +354,6 @@ public class ServerApp implements ServerRunnable { workspacesHandler); } - @VisibleForTesting - static void migrateExistingConnectionsToTemporalScheduler(final ConfigRepository configRepository, - final JobPersistence jobPersistence, - final EventRunner eventRunner) - throws JsonValidationException, ConfigNotFoundException, IOException { - // Skip the migration if it was already performed, to save on resources/startup time - if (jobPersistence.isSchedulerMigrated()) { - LOGGER.info("Migration to temporal scheduler has already been performed"); - return; - } - - LOGGER.info("Start migration to the new scheduler..."); - final Set connectionIds = - configRepository.listStandardSyncs().stream() - .filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) - .map(StandardSync::getConnectionId) - .collect(Collectors.toSet()); - eventRunner.migrateSyncIfNeeded(connectionIds); - jobPersistence.setSchedulerMigrationDone(); - LOGGER.info("Done migrating to the new scheduler..."); - } - public static void main(final String[] args) { try { final Configs configs = new EnvConfigs(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java b/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java deleted file mode 100644 index 11e31422b47..00000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server; - -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.server.scheduler.EventRunner; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class ServerAppTest { - - @Mock - private ConfigRepository mConfigRepository; - - @Mock - private JobPersistence mJobPersistence; - - @Mock - private EventRunner mEventRunner; - - @Test - void testMigrationAlreadyPerformed() throws Exception { - when(mJobPersistence.isSchedulerMigrated()).thenReturn(true); - - ServerApp.migrateExistingConnectionsToTemporalScheduler(mConfigRepository, mJobPersistence, mEventRunner); - - verifyNoMoreInteractions(mJobPersistence); - verifyNoMoreInteractions(mConfigRepository); - verifyNoMoreInteractions(mEventRunner); - } - - @Test - void testPerformMigration() throws Exception { - when(mJobPersistence.isSchedulerMigrated()).thenReturn(false); - - final StandardSync activeConnection = new StandardSync().withStatus(Status.ACTIVE).withConnectionId(UUID.randomUUID()); - final StandardSync inactiveConnection = new StandardSync().withStatus(Status.INACTIVE).withConnectionId(UUID.randomUUID()); - final StandardSync deprecatedConnection = new StandardSync().withStatus(Status.DEPRECATED).withConnectionId(UUID.randomUUID()); - when(mConfigRepository.listStandardSyncs()).thenReturn(List.of(activeConnection, inactiveConnection, deprecatedConnection)); - - ServerApp.migrateExistingConnectionsToTemporalScheduler(mConfigRepository, mJobPersistence, mEventRunner); - - verify(mEventRunner).migrateSyncIfNeeded(Set.of(activeConnection.getConnectionId(), inactiveConnection.getConnectionId())); - verify(mJobPersistence).setSchedulerMigrationDone(); - verifyNoMoreInteractions(mJobPersistence); - verifyNoMoreInteractions(mConfigRepository); - verifyNoMoreInteractions(mEventRunner); - } - -}