1
0
mirror of synced 2025-12-25 02:09:19 -05:00

set up check/discover to pass around necessary parts

This commit is contained in:
jrhizor
2021-09-24 21:45:32 -07:00
parent d9fa73d0db
commit 489d2d5f5d
30 changed files with 234 additions and 61 deletions

View File

@@ -13,5 +13,12 @@ properties:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
spec:
description: Integration spec. Must be a valid JSON schema.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
workspaceId:
type: string
format: uuid
dockerImage:
type: string

View File

@@ -13,5 +13,12 @@ properties:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
spec:
description: Integration spec. Must be a valid JSON schema.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
workspaceId:
type: string
format: uuid
dockerImage:
type: string

View File

@@ -12,3 +12,10 @@ properties:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
spec:
description: Integration spec. Must be a valid JSON schema.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
workspaceId:
type: string
format: uuid

View File

@@ -12,3 +12,10 @@ properties:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
spec:
description: Integration spec. Must be a valid JSON schema.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
workspaceId:
type: string
format: uuid

View File

@@ -37,14 +37,13 @@ import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import io.airbyte.commons.lang.Exceptions;
import org.joda.time.Days;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.joda.time.Days;
/**
* Uses Google Secret Manager (https://cloud.google.com/secret-manager) as a K/V store to access
@@ -66,8 +65,8 @@ public class GoogleSecretManagerPersistence implements SecretPersistence {
private static final String LATEST = "latest";
private static final Duration EPHEMERAL_TTL = Duration.newBuilder()
.setSeconds(Days.days(5).toStandardSeconds().getSeconds())
.build();
.setSeconds(Days.days(5).toStandardSeconds().getSeconds())
.build();
private final String gcpProjectId;
private final Supplier<SecretManagerServiceClient> clientSupplier;
@@ -75,17 +74,17 @@ public class GoogleSecretManagerPersistence implements SecretPersistence {
private final @Nullable Duration ttl;
/**
* Creates a persistence with an infinite TTL for stored secrets.
* Used for source/destination config secret storage.
* Creates a persistence with an infinite TTL for stored secrets. Used for source/destination config
* secret storage.
*/
public static GoogleSecretManagerPersistence getLongLived(final String gcpProjectId, final String gcpCredentialsJson) {
return new GoogleSecretManagerPersistence(gcpProjectId, gcpCredentialsJson, null);
}
/**
* Creates a persistence with a relatively short TTL for stored secrets.
* Used for temporary operations such as check/discover operations where we need to use secret storage to
* communicate from the server to Temporal, but where we don't want to maintain the secrets indefinitely.
* Creates a persistence with a relatively short TTL for stored secrets. Used for temporary
* operations such as check/discover operations where we need to use secret storage to communicate
* from the server to Temporal, but where we don't want to maintain the secrets indefinitely.
*/
public static GoogleSecretManagerPersistence getEphemeral(final String gcpProjectId, final String gcpCredentialsJson) {
return new GoogleSecretManagerPersistence(gcpProjectId, gcpCredentialsJson, EPHEMERAL_TTL);
@@ -118,7 +117,7 @@ public class GoogleSecretManagerPersistence implements SecretPersistence {
final var secretBuilder = Secret.newBuilder().setReplication(replicationPolicy);
if(ttl != null) {
if (ttl != null) {
secretBuilder.setTtl(ttl);
}

View File

@@ -208,6 +208,7 @@ public class SchedulerApp {
.getInitialized();
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation();
final SecretPersistence secretPersistence = new LocalTestingSecretPersistence(configDatabase); // todo: feature flag on env var
final SecretPersistence ephemeralSecretPersistence = new LocalTestingSecretPersistence(configDatabase); // todo: feature flag on env var
final ConfigRepository configRepository = new ConfigRepository(configPersistence, Optional.of(secretPersistence));
final JobCleaner jobCleaner = new JobCleaner(
configs.getWorkspaceRetentionConfig(),
@@ -224,7 +225,7 @@ public class SchedulerApp {
configs.getAirbyteVersion(),
configRepository);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, ephemeralSecretPersistence, workspaceRoot);
LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient)

View File

@@ -52,12 +52,20 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
private final JobTracker jobTracker;
private final OAuthConfigSupplier oAuthConfigSupplier;
private SpecFetcher specFetcher;
public DefaultSynchronousSchedulerClient(TemporalClient temporalClient, JobTracker jobTracker, OAuthConfigSupplier oAuthConfigSupplier) {
this.temporalClient = temporalClient;
this.jobTracker = jobTracker;
this.oAuthConfigSupplier = oAuthConfigSupplier;
}
// this isn't set in the constructor because spec fetcher depends on a temporal client...
// todo: remove this circular dependency
public void setSpecFetcher(SpecFetcher specFetcher) {
this.specFetcher = specFetcher;
}
@Override
public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJob(final SourceConnection source, final String dockerImage)
throws IOException {
@@ -65,9 +73,14 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
final JsonNode spec = specFetcher.execute(dockerImage).getConnectionSpecification();
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withWorkspaceId(source.getWorkspaceId())
.withSpec(spec);
return execute(
ConfigType.CHECK_CONNECTION_SOURCE,
@@ -84,9 +97,14 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
destination.getDestinationId(),
destination.getWorkspaceId(),
destination.getConfiguration());
final JsonNode spec = specFetcher.execute(dockerImage).getConnectionSpecification();
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(destinationConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withWorkspaceId(destination.getWorkspaceId())
.withSpec(spec);
return execute(
ConfigType.CHECK_CONNECTION_DESTINATION,
@@ -101,9 +119,14 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
final JsonNode spec = specFetcher.execute(dockerImage).getConnectionSpecification();
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);
.withDockerImage(dockerImage)
.withWorkspaceId(source.getWorkspaceId())
.withSpec(spec);
return execute(
ConfigType.DISCOVER_SCHEMA,

View File

@@ -22,12 +22,10 @@
* SOFTWARE.
*/
package io.airbyte.server.converters;
package io.airbyte.scheduler.client;
import com.google.common.base.Preconditions;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import java.io.IOException;
public class SpecFetcher {

View File

@@ -80,11 +80,13 @@ class DefaultSynchronousSchedulerClientTest {
private static final SourceConnection SOURCE_CONNECTION = new SourceConnection()
.withSourceId(UUID1)
.withSourceDefinitionId(UUID2)
.withConfiguration(CONFIGURATION);
.withConfiguration(CONFIGURATION)
.withWorkspaceId(WORKSPACE_ID);
private static final DestinationConnection DESTINATION_CONNECTION = new DestinationConnection()
.withDestinationId(UUID1)
.withDestinationDefinitionId(UUID2)
.withConfiguration(CONFIGURATION);
.withConfiguration(CONFIGURATION)
.withWorkspaceId(WORKSPACE_ID);
private TemporalClient temporalClient;
private JobTracker jobTracker;
@@ -100,6 +102,10 @@ class DefaultSynchronousSchedulerClientTest {
when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
final var specFetcher = mock(SpecFetcher.class);
when(specFetcher.execute(DOCKER_IMAGE)).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.emptyObject()));
schedulerClient.setSpecFetcher(specFetcher);
}
private static JobMetadata createMetadata(boolean succeeded) {
@@ -181,7 +187,9 @@ class DefaultSynchronousSchedulerClientTest {
void testCreateSourceCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(SOURCE_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
.withDockerImage(DOCKER_IMAGE)
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
@@ -195,7 +203,9 @@ class DefaultSynchronousSchedulerClientTest {
void testCreateDestinationCheckConnectionJob() throws IOException {
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig()
.withConnectionConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
.withDockerImage(DOCKER_IMAGE)
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
@@ -209,7 +219,9 @@ class DefaultSynchronousSchedulerClientTest {
void testCreateDiscoverSchemaJob() throws IOException {
final JobDiscoverCatalogConfig jobDiscoverCatalogConfig = new JobDiscoverCatalogConfig()
.withConnectionConfiguration(SOURCE_CONNECTION.getConfiguration())
.withDockerImage(DOCKER_IMAGE);
.withDockerImage(DOCKER_IMAGE)
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
final AirbyteCatalog mockOutput = mock(AirbyteCatalog.class);
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(jobDiscoverCatalogConfig)))

View File

@@ -0,0 +1,74 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.scheduler.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class SpecFetcherTest {
private static final String IMAGE_NAME = "foo:bar";
private SynchronousSchedulerClient schedulerJobClient;
private SynchronousResponse<ConnectorSpecification> response;
private ConnectorSpecification connectorSpecification;
@SuppressWarnings("unchecked")
@BeforeEach
void setup() {
schedulerJobClient = mock(SynchronousSchedulerClient.class);
response = mock(SynchronousResponse.class);
connectorSpecification = new ConnectorSpecification().withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar")));
}
@Test
void testFetch() throws IOException {
when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response);
when(response.isSuccess()).thenReturn(true);
when(response.getOutput()).thenReturn(connectorSpecification);
final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient);
assertEquals(connectorSpecification, specFetcher.execute(IMAGE_NAME));
}
@Test
void testFetchEmpty() throws IOException {
when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response);
when(response.isSuccess()).thenReturn(false);
final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient);
assertThrows(IllegalStateException.class, () -> specFetcher.execute(IMAGE_NAME));
}
}

View File

@@ -49,10 +49,10 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;

View File

@@ -28,8 +28,8 @@ import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.migrate.MigrateConfig;
import io.airbyte.migrate.MigrationRunner;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
import java.io.IOException;

View File

@@ -49,13 +49,13 @@ import io.airbyte.scheduler.client.DefaultSchedulerJobClient;
import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SpecCachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InvalidInputExceptionMapper;
import io.airbyte.server.errors.InvalidJsonExceptionMapper;
import io.airbyte.server.errors.InvalidJsonInputExceptionMapper;
@@ -188,6 +188,7 @@ public class ServerApp implements ServerRunnable {
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.migrateFileConfigs(configs);
final SecretPersistence secretPersistence = new LocalTestingSecretPersistence(configDatabase); // todo: feature flag on env var
final SecretPersistence ephemeralSecretPersistence = new LocalTestingSecretPersistence(configDatabase); // todo: feature flag on env var
final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation(), Optional.of(secretPersistence));
LOGGER.info("Creating Scheduler persistence...");
@@ -220,7 +221,8 @@ public class ServerApp implements ServerRunnable {
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());
final TemporalClient temporalClient =
TemporalClient.production(configs.getTemporalHost(), ephemeralSecretPersistence, configs.getWorkspaceRoot());
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, false);
final SchedulerJobClient schedulerJobClient = new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
@@ -229,6 +231,7 @@ public class ServerApp implements ServerRunnable {
new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);
syncSchedulerClient.setSpecFetcher(specFetcher);
Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {

View File

@@ -103,10 +103,10 @@ import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.ArchiveHandler;

View File

@@ -35,6 +35,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.UUID;

View File

@@ -33,11 +33,11 @@ import io.airbyte.commons.io.FileTtlManager;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.YamlSeedConfigPersistence;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.ConfigDumpExporter;
import io.airbyte.server.ConfigDumpImporter;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;

View File

@@ -42,8 +42,8 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;

View File

@@ -60,6 +60,7 @@ import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.models.Job;
@@ -69,7 +70,6 @@ import io.airbyte.server.converters.CatalogConverter;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.converters.OauthModelConverter;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalUtils;
@@ -87,6 +87,8 @@ public class SchedulerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private static final UUID NO_WORKSPACE = UUID.fromString("00000000-0000-0000-0000-000000000000");
private final ConfigRepository configRepository;
private final SchedulerJobClient schedulerJobClient;
private final SynchronousSchedulerClient synchronousSchedulerClient;
@@ -153,7 +155,8 @@ public class SchedulerHandler {
// technically declared as required.
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceConfig.getSourceDefinitionId())
.withConfiguration(sourceConfig.getConnectionConfiguration());
.withConfiguration(sourceConfig.getConnectionConfiguration())
.withWorkspaceId(NO_WORKSPACE);
return reportConnectionStatus(synchronousSchedulerClient.createSourceCheckConnectionJob(source, imageName));
}
@@ -189,7 +192,8 @@ public class SchedulerHandler {
// technically declared as required.
final DestinationConnection destination = new DestinationConnection()
.withDestinationDefinitionId(destinationConfig.getDestinationDefinitionId())
.withConfiguration(destinationConfig.getConnectionConfiguration());
.withConfiguration(destinationConfig.getConnectionConfiguration())
.withWorkspaceId(NO_WORKSPACE);
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
}

View File

@@ -41,8 +41,8 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;

View File

@@ -26,8 +26,8 @@ package io.airbyte.server.validators;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
public class DockerImageValidator {

View File

@@ -45,10 +45,10 @@ import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;

View File

@@ -42,6 +42,7 @@ import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.UUID;

View File

@@ -25,13 +25,13 @@
package io.airbyte.server.converters;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import java.io.IOException;
@@ -64,13 +64,4 @@ class SpecFetcherTest {
assertEquals(connectorSpecification, specFetcher.execute(IMAGE_NAME));
}
@Test
void testFetchEmpty() throws IOException {
when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response);
when(response.isSuccess()).thenReturn(false);
final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient);
assertThrows(IllegalStateException.class, () -> specFetcher.execute(IMAGE_NAME));
}
}

View File

@@ -54,10 +54,10 @@ import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
import java.io.IOException;

View File

@@ -52,8 +52,8 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.ConnectorSpecificationHelpers;
import io.airbyte.server.helpers.DestinationHelpers;

View File

@@ -74,6 +74,7 @@ import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
@@ -82,7 +83,6 @@ import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;

View File

@@ -51,8 +51,8 @@ import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.ConnectorSpecificationHelpers;
import io.airbyte.server.helpers.SourceHelpers;

View File

@@ -51,10 +51,10 @@ import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.Database;
import io.airbyte.migrate.Migrations;
import io.airbyte.scheduler.client.SpecFetcher;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.RunMigration;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
import java.io.IOException;

View File

@@ -34,6 +34,9 @@ import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
@@ -46,17 +49,19 @@ import java.util.function.Supplier;
public class TemporalClient {
private final SecretPersistence secretPersistence;
private final Path workspaceRoot;
private final WorkflowClient client;
public static TemporalClient production(String temporalHost, Path workspaceRoot) {
return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), workspaceRoot);
public static TemporalClient production(String temporalHost, SecretPersistence secretPersistence, Path workspaceRoot) {
return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), secretPersistence, workspaceRoot);
}
// todo (cgardens) - there are two sources of truth on workspace root. we need to get this down to
// one. either temporal decides and can report it or it is injected into temporal runs.
public TemporalClient(WorkflowClient client, Path workspaceRoot) {
public TemporalClient(WorkflowClient client, SecretPersistence secretPersistence, Path workspaceRoot) {
this.client = client;
this.secretPersistence = secretPersistence;
this.workspaceRoot = workspaceRoot;
}
@@ -78,7 +83,18 @@ public class TemporalClient {
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput().withConnectionConfiguration(config.getConnectionConfiguration());
final SplitSecretConfig splitConfig = SecretsHelpers.splitConfig(
config.getWorkspaceId(),
config.getConnectionConfiguration(),
new ConnectorSpecification().withConnectionSpecification(config.getSpec()));
splitConfig.getCoordinateToPayload().forEach(secretPersistence::write);
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withConnectionConfiguration(splitConfig.getPartialConfig())
.withWorkspaceId(config.getWorkspaceId())
.withSpec(config.getSpec());
return execute(jobRunConfig,
() -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input));
@@ -90,7 +106,18 @@ public class TemporalClient {
.withJobId(jobId.toString())
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage());
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput().withConnectionConfiguration(config.getConnectionConfiguration());
final SplitSecretConfig splitConfig = SecretsHelpers.splitConfig(
config.getWorkspaceId(),
config.getConnectionConfiguration(),
new ConnectorSpecification().withConnectionSpecification(config.getSpec()));
splitConfig.getCoordinateToPayload().forEach(secretPersistence::write);
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(splitConfig.getPartialConfig())
.withWorkspaceId(config.getWorkspaceId())
.withSpec(config.getSpec());
return execute(jobRunConfig,
() -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input));

View File

@@ -41,6 +41,7 @@ import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
@@ -59,6 +60,7 @@ import org.junit.jupiter.api.Test;
class TemporalClientTest {
private static final UUID JOB_UUID = UUID.randomUUID();
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final long JOB_ID = 11L;
private static final int ATTEMPT_ID = 21;
private static final JobRunConfig JOB_RUN_CONFIG = new JobRunConfig()
@@ -84,7 +86,7 @@ class TemporalClientTest {
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test");
logPath = workspaceRoot.resolve(String.valueOf(JOB_ID)).resolve(String.valueOf(ATTEMPT_ID)).resolve(LogClientSingleton.LOG_FILENAME);
workflowClient = mock(WorkflowClient.class);
temporalClient = new TemporalClient(workflowClient, workspaceRoot);
temporalClient = new TemporalClient(workflowClient, new MemorySecretPersistence(), workspaceRoot);
}
@Nested
@@ -144,9 +146,13 @@ class TemporalClientTest {
.thenReturn(checkConnectionWorkflow);
final JobCheckConnectionConfig checkConnectionConfig = new JobCheckConnectionConfig()
.withDockerImage(IMAGE_NAME1)
.withConnectionConfiguration(Jsons.emptyObject());
.withConnectionConfiguration(Jsons.emptyObject())
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration())
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, checkConnectionConfig);
checkConnectionWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
@@ -160,9 +166,14 @@ class TemporalClientTest {
.thenReturn(discoverCatalogWorkflow);
final JobDiscoverCatalogConfig checkConnectionConfig = new JobDiscoverCatalogConfig()
.withDockerImage(IMAGE_NAME1)
.withConnectionConfiguration(Jsons.emptyObject());
.withConnectionConfiguration(Jsons.emptyObject())
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration())
.withWorkspaceId(WORKSPACE_ID)
.withSpec(Jsons.emptyObject());
temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, checkConnectionConfig);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);