diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index cf605d97da5..791de533546 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2935,6 +2935,8 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean + connectionStatus: + $ref: "#/components/schemas/ConnectionStatus" SourceSearch: type: object properties: diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index e74c9109441..f4f3fd9eaf3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -7,6 +7,7 @@ package io.airbyte.server; import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.temporal.ConnectionManagerUtils; @@ -213,6 +214,8 @@ public class ServerApp implements ServerRunnable { final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); + final EnvVariableFeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags(); + final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); final JobErrorReporter jobErrorReporter = @@ -286,7 +289,8 @@ public class ServerApp implements ServerRunnable { configs.getWorkerEnvironment(), configs.getLogConfigs(), eventRunner, - connectionsHandler); + connectionsHandler, + envVariableFeatureFlags); final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 49a4a75940e..7621cb4baf9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -15,6 +15,8 @@ import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; import io.airbyte.api.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -27,6 +29,7 @@ import io.airbyte.api.model.generated.JobConfigType; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.LogRead; +import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -39,6 +42,7 @@ import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.ErrorCode; import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult; @@ -95,6 +99,7 @@ public class SchedulerHandler { private final JobPersistence jobPersistence; private final JobConverter jobConverter; private final EventRunner eventRunner; + private final EnvVariableFeatureFlags envVariableFeatureFlags; public SchedulerHandler(final ConfigRepository configRepository, final SecretsRepositoryReader secretsRepositoryReader, @@ -104,7 +109,8 @@ public class SchedulerHandler { final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, final EventRunner eventRunner, - final ConnectionsHandler connectionsHandler) { + final ConnectionsHandler connectionsHandler, + final EnvVariableFeatureFlags envVariableFeatureFlags) { this( configRepository, secretsRepositoryWriter, @@ -114,7 +120,8 @@ public class SchedulerHandler { jobPersistence, eventRunner, new JobConverter(workerEnvironment, logConfigs), - connectionsHandler); + connectionsHandler, + envVariableFeatureFlags); } @VisibleForTesting @@ -126,7 +133,8 @@ public class SchedulerHandler { final JobPersistence jobPersistence, final EventRunner eventRunner, final JobConverter jobConverter, - final ConnectionsHandler connectionsHandler) { + final ConnectionsHandler connectionsHandler, + final EnvVariableFeatureFlags envVariableFeatureFlags) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -136,6 +144,7 @@ public class SchedulerHandler { this.eventRunner = eventRunner; this.jobConverter = jobConverter; this.connectionsHandler = connectionsHandler; + this.envVariableFeatureFlags = envVariableFeatureFlags; } public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) @@ -360,18 +369,34 @@ public class SchedulerHandler { throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); + final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = - connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); + connectionRead.getSyncCatalog(); CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); boolean containsBreakingChange = containsBreakingChange(diff); ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); + ConnectionStatus connectionStatus; + if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { + connectionStatus = ConnectionStatus.INACTIVE; + } else { + connectionStatus = ConnectionStatus.ACTIVE; + } + updateObject.status(connectionStatus); connectionsHandler.updateConnection(updateObject); - discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange); + discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); } + private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) { + if (!envVariableFeatureFlags.autoDetectSchema()) { + return false; + } + + return containsBreakingChange || (preference == NonBreakingChangesPreference.DISABLE && !diff.getTransforms().isEmpty()); + } + private CheckConnectionRead reportConnectionStatus(final SynchronousResponse response) { final CheckConnectionRead checkConnectionRead = new CheckConnectionRead() .jobInfo(jobConverter.getSynchronousJobRead(response)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index a76d0e6bf61..fa4822a6da6 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -353,6 +353,7 @@ public class WebBackendConnectionsHandler { */ diff = refreshedCatalog.get().getCatalogDiff(); connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); + connection.setStatus(refreshedCatalog.get().getConnectionStatus()); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 2eb3e9be56d..f06b4d6a8d6 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -25,6 +25,7 @@ import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -34,6 +35,7 @@ import io.airbyte.api.model.generated.DestinationUpdate; import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; +import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -45,6 +47,7 @@ import io.airbyte.api.model.generated.StreamTransform; import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.temporal.ErrorCode; @@ -106,9 +109,11 @@ class SchedulerHandlerTest { private static final String DESTINATION_PROTOCOL_VERSION = "0.7.9"; private static final String NAME = "name"; private static final String DOGS = "dogs"; + private static final String SHOES = "shoes"; + private static final String SKU = "sku"; - private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", - Field.of("sku", JsonSchemaType.STRING)); + private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog(SHOES, + Field.of(SKU, JsonSchemaType.STRING)); private static final SourceConnection SOURCE = new SourceConnection() .withName("my postgres db") @@ -145,6 +150,7 @@ class SchedulerHandlerTest { private EventRunner eventRunner; private JobConverter jobConverter; private ConnectionsHandler connectionsHandler; + private EnvVariableFeatureFlags envVariableFeatureFlags; @BeforeEach void setup() { @@ -162,6 +168,7 @@ class SchedulerHandlerTest { jobPersistence = mock(JobPersistence.class); eventRunner = mock(EventRunner.class); connectionsHandler = mock(ConnectionsHandler.class); + envVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -174,7 +181,8 @@ class SchedulerHandlerTest { jobPersistence, eventRunner, jobConverter, - connectionsHandler); + connectionsHandler, + envVariableFeatureFlags); } @Test @@ -555,7 +563,7 @@ class SchedulerHandlerTest { when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -577,6 +585,110 @@ class SchedulerHandlerTest { assertEquals(actual.getCatalog(), expectedActorCatalog); } + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceNoFeatureFlag() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(false); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + } + + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlag() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); + } + @Test void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); @@ -603,7 +715,7 @@ class SchedulerHandlerTest { when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -619,14 +731,119 @@ class SchedulerHandlerTest { final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate().connectionId(connectionId).breakingChange(true); + final ConnectionUpdate expectedConnectionUpdate = + new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.ACTIVE); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); verify(connectionsHandler).updateConnection(expectedConnectionUpdate); } + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) + .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( + FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); + final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + final ConnectionUpdate expectedConnectionUpdate = + new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.INACTIVE); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); + verify(connectionsHandler).updateConnection(expectedConnectionUpdate); + } + + @Test + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlagNoDiff() + throws IOException, JsonValidationException, ConfigNotFoundException { + final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); + final UUID connectionId = UUID.randomUUID(); + final UUID discoveredCatalogId = UUID.randomUUID(); + final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; + final SourceDiscoverSchemaRequestBody request = + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + final CatalogDiff catalogDiff = new CatalogDiff(); + when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); + when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withSourceDefinitionId(source.getSourceDefinitionId())); + when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); + when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) + .thenReturn(discoverResponse); + + when(discoverResponse.isSuccess()).thenReturn(true); + when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); + + final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); + + final ConnectionRead connectionRead = + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( + NonBreakingChangesPreference.DISABLE); + when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); + when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); + + final ActorCatalog actorCatalog = new ActorCatalog() + .withCatalog(Jsons.jsonNode(airbyteCatalog)) + .withCatalogHash("") + .withId(discoveredCatalogId); + when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); + + final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class); + final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); + + final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); + assertEquals(actual.getCatalogDiff(), catalogDiff); + assertEquals(actual.getCatalog(), expectedActorCatalog); + assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + } + @Test void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException { final SourceConnection source = new SourceConnection() diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 78b92113529..b390d7a4c3f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -76,6 +76,7 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; @@ -173,9 +174,10 @@ class WebBackendConnectionsHandlerTest { final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition); final StandardSync standardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false); + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false, Status.ACTIVE); final StandardSync brokenStandardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true); + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true, Status.INACTIVE); + when(configRepository.listWorkspaceStandardSyncs(sourceRead.getWorkspaceId(), false)) .thenReturn(Collections.singletonList(standardSync)); when(configRepository.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId()))) @@ -277,7 +279,7 @@ class WebBackendConnectionsHandlerTest { .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) .updateStream(null)))); - expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead, + expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead, new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null) .catalogDiff(new CatalogDiff().transforms(List.of( new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) @@ -418,7 +420,7 @@ class WebBackendConnectionsHandlerTest { when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(false); + .breakingChange(false).connectionStatus(ConnectionStatus.ACTIVE); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, @@ -435,10 +437,10 @@ class WebBackendConnectionsHandlerTest { when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(true); + .breakingChange(true).connectionStatus(ConnectionStatus.INACTIVE); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); - final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, + final WebBackendConnectionRead result = testWebBackendGetConnection(true, brokenConnectionRead, operationReadList); assertEquals(expectedWithNewSchemaAndBreakingChange, result); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index ab742a079b3..908a5749e72 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -33,6 +33,7 @@ import io.airbyte.config.Schedule; import io.airbyte.config.Schedule.TimeUnit; import io.airbyte.config.ScheduleData; import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.Status; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -107,7 +108,10 @@ public class ConnectionHelpers { .withManual(true); } - public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, final UUID destinationId, final boolean isBroken) { + public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, + final UUID destinationId, + final boolean isBroken, + final Status status) { final UUID connectionId = UUID.randomUUID(); return new StandardSync() @@ -116,7 +120,7 @@ public class ConnectionHelpers { .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) .withNamespaceFormat(null) .withPrefix(STANDARD_SYNC_PREFIX) - .withStatus(StandardSync.Status.ACTIVE) + .withStatus(status) .withCatalog(generateBasicConfiguredAirbyteCatalog()) .withSourceCatalogId(UUID.randomUUID()) .withSourceId(sourceId) @@ -166,7 +170,6 @@ public class ConnectionHelpers { .namespaceDefinition(io.airbyte.api.model.generated.NamespaceDefinitionType.SOURCE) .namespaceFormat(null) .prefix("presto_to_hudi") - .status(ConnectionStatus.ACTIVE) .schedule(generateBasicConnectionSchedule()) .scheduleType(ConnectionScheduleType.BASIC) .scheduleData(generateBasicConnectionScheduleData()) @@ -199,6 +202,14 @@ public class ConnectionHelpers { .units(standardSync.getSchedule().getUnits())); } + if (standardSync.getStatus() == Status.INACTIVE) { + connectionRead.setStatus(ConnectionStatus.INACTIVE); + } else if (standardSync.getStatus() == Status.ACTIVE) { + connectionRead.setStatus(ConnectionStatus.ACTIVE); + } else if (standardSync.getStatus() == Status.DEPRECATED) { + connectionRead.setStatus(ConnectionStatus.DEPRECATED); + } + return connectionRead; } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 24555ce4519..b1b26c98dd6 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -11891,6 +11891,7 @@ if oauth parameters were contained inside the top level, rootObject=[] If they w
catalogId (optional)
UUID format: uuid
catalogDiff (optional)
breakingChange (optional)
+
connectionStatus (optional)
diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index 84c985df000..81d2de72e78 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -2493,6 +2493,8 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean + connectionStatus: + $ref: "#/components/schemas/ConnectionStatus" required: - jobInfo type: object