Validate protocol version on connector update (#18639)
* Add helper class to check protocol version range * Check ProtocolVersion when modifying a destination definition * Check ProtocolVersion when modifying a source definition * Format * Add UnsupportedProtocolVersion exception * Rewrite AirbyteProtocolVersionRange as a record * Format * Rename exception
This commit is contained in:
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.server.errors;
|
||||
|
||||
import io.airbyte.commons.version.Version;
|
||||
|
||||
public class UnsupportedProtocolVersionException extends KnownException {
|
||||
|
||||
public UnsupportedProtocolVersionException(final Version current, final Version minSupported, final Version maxSupported) {
|
||||
this(current.serialize(), minSupported, maxSupported);
|
||||
}
|
||||
|
||||
public UnsupportedProtocolVersionException(final String current, final Version minSupported, final Version maxSupported) {
|
||||
super(String.format("Airbyte Protocol Version %s is not supported. (Must be within [%s:%s])",
|
||||
current, minSupported.serialize(), maxSupported.serialize()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHttpCode() {
|
||||
return 400;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,8 +24,11 @@ import io.airbyte.commons.docker.DockerUtils;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import io.airbyte.commons.util.MoreLists;
|
||||
import io.airbyte.commons.version.AirbyteProtocolVersion;
|
||||
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
|
||||
import io.airbyte.commons.version.Version;
|
||||
import io.airbyte.config.ActorDefinitionResourceRequirements;
|
||||
import io.airbyte.config.Configs;
|
||||
import io.airbyte.config.EnvConfigs;
|
||||
import io.airbyte.config.StandardDestinationDefinition;
|
||||
import io.airbyte.config.persistence.ConfigNotFoundException;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
@@ -34,6 +37,7 @@ import io.airbyte.server.converters.ApiPojoConverters;
|
||||
import io.airbyte.server.converters.SpecFetcher;
|
||||
import io.airbyte.server.errors.IdNotFoundKnownException;
|
||||
import io.airbyte.server.errors.InternalServerKnownException;
|
||||
import io.airbyte.server.errors.UnsupportedProtocolVersionException;
|
||||
import io.airbyte.server.scheduler.SynchronousResponse;
|
||||
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
|
||||
import io.airbyte.server.services.AirbyteGithubStore;
|
||||
@@ -56,6 +60,7 @@ public class DestinationDefinitionsHandler {
|
||||
private final SynchronousSchedulerClient schedulerSynchronousClient;
|
||||
private final AirbyteGithubStore githubStore;
|
||||
private final DestinationHandler destinationHandler;
|
||||
private final AirbyteProtocolVersionRange protocolVersionRange;
|
||||
|
||||
public DestinationDefinitionsHandler(final ConfigRepository configRepository,
|
||||
final SynchronousSchedulerClient schedulerSynchronousClient,
|
||||
@@ -74,6 +79,10 @@ public class DestinationDefinitionsHandler {
|
||||
this.schedulerSynchronousClient = schedulerSynchronousClient;
|
||||
this.githubStore = githubStore;
|
||||
this.destinationHandler = destinationHandler;
|
||||
|
||||
// TODO inject protocol min and max once this handler is being converted to micronaut
|
||||
final Configs configs = new EnvConfigs();
|
||||
protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -179,6 +188,10 @@ public class DestinationDefinitionsHandler {
|
||||
final StandardDestinationDefinition destinationDefinition = destinationDefinitionFromCreate(destinationDefCreate)
|
||||
.withPublic(false)
|
||||
.withCustom(false);
|
||||
if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) {
|
||||
throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(),
|
||||
protocolVersionRange.max());
|
||||
}
|
||||
configRepository.writeStandardDestinationDefinition(destinationDefinition);
|
||||
|
||||
return buildDestinationDefinitionRead(destinationDefinition);
|
||||
@@ -190,6 +203,10 @@ public class DestinationDefinitionsHandler {
|
||||
customDestinationDefinitionCreate.getDestinationDefinition())
|
||||
.withPublic(false)
|
||||
.withCustom(true);
|
||||
if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) {
|
||||
throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(),
|
||||
protocolVersionRange.max());
|
||||
}
|
||||
configRepository.writeCustomDestinationDefinition(destinationDefinition, customDestinationDefinitionCreate.getWorkspaceId());
|
||||
|
||||
return buildDestinationDefinitionRead(destinationDefinition);
|
||||
@@ -235,6 +252,9 @@ public class DestinationDefinitionsHandler {
|
||||
: currentDestination.getResourceRequirements();
|
||||
|
||||
final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());
|
||||
if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) {
|
||||
throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max());
|
||||
}
|
||||
|
||||
final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
|
||||
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
|
||||
|
||||
@@ -25,8 +25,11 @@ import io.airbyte.commons.docker.DockerUtils;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import io.airbyte.commons.util.MoreLists;
|
||||
import io.airbyte.commons.version.AirbyteProtocolVersion;
|
||||
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
|
||||
import io.airbyte.commons.version.Version;
|
||||
import io.airbyte.config.ActorDefinitionResourceRequirements;
|
||||
import io.airbyte.config.Configs;
|
||||
import io.airbyte.config.EnvConfigs;
|
||||
import io.airbyte.config.StandardSourceDefinition;
|
||||
import io.airbyte.config.persistence.ConfigNotFoundException;
|
||||
import io.airbyte.config.persistence.ConfigRepository;
|
||||
@@ -35,6 +38,7 @@ import io.airbyte.server.converters.ApiPojoConverters;
|
||||
import io.airbyte.server.converters.SpecFetcher;
|
||||
import io.airbyte.server.errors.IdNotFoundKnownException;
|
||||
import io.airbyte.server.errors.InternalServerKnownException;
|
||||
import io.airbyte.server.errors.UnsupportedProtocolVersionException;
|
||||
import io.airbyte.server.scheduler.SynchronousResponse;
|
||||
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
|
||||
import io.airbyte.server.services.AirbyteGithubStore;
|
||||
@@ -57,6 +61,7 @@ public class SourceDefinitionsHandler {
|
||||
private final AirbyteGithubStore githubStore;
|
||||
private final SynchronousSchedulerClient schedulerSynchronousClient;
|
||||
private final SourceHandler sourceHandler;
|
||||
private final AirbyteProtocolVersionRange protocolVersionRange;
|
||||
|
||||
public SourceDefinitionsHandler(final ConfigRepository configRepository,
|
||||
final SynchronousSchedulerClient schedulerSynchronousClient,
|
||||
@@ -74,6 +79,10 @@ public class SourceDefinitionsHandler {
|
||||
this.schedulerSynchronousClient = schedulerSynchronousClient;
|
||||
this.githubStore = githubStore;
|
||||
this.sourceHandler = sourceHandler;
|
||||
|
||||
// TODO inject protocol min and max once this handler is being converted to micronaut
|
||||
final Configs configs = new EnvConfigs();
|
||||
protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -185,6 +194,9 @@ public class SourceDefinitionsHandler {
|
||||
final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(sourceDefinitionCreate)
|
||||
.withPublic(false)
|
||||
.withCustom(false);
|
||||
if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) {
|
||||
throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max());
|
||||
}
|
||||
configRepository.writeStandardSourceDefinition(sourceDefinition);
|
||||
|
||||
return buildSourceDefinitionRead(sourceDefinition);
|
||||
@@ -195,6 +207,9 @@ public class SourceDefinitionsHandler {
|
||||
final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(customSourceDefinitionCreate.getSourceDefinition())
|
||||
.withPublic(false)
|
||||
.withCustom(true);
|
||||
if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) {
|
||||
throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max());
|
||||
}
|
||||
configRepository.writeCustomSourceDefinition(sourceDefinition, customSourceDefinitionCreate.getWorkspaceId());
|
||||
|
||||
return buildSourceDefinitionRead(sourceDefinition);
|
||||
@@ -238,6 +253,9 @@ public class SourceDefinitionsHandler {
|
||||
: currentSourceDefinition.getResourceRequirements();
|
||||
|
||||
final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());
|
||||
if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) {
|
||||
throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max());
|
||||
}
|
||||
|
||||
final StandardSourceDefinition newSource = new StandardSourceDefinition()
|
||||
.withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId())
|
||||
|
||||
Reference in New Issue
Block a user