1
0
mirror of synced 2025-12-25 02:09:19 -05:00

standardize configuration interface (#101)

* DiscoveryWorker => DiscoverSchemaWorker 

* Map from spec id to the docker image that should be used to for the worker.
This commit is contained in:
Charles
2020-08-24 16:03:24 -07:00
committed by GitHub
parent 68b07fc5f2
commit c8ce024081
26 changed files with 369 additions and 146 deletions

View File

@@ -0,0 +1,17 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/JobCheckConnectionConfig.json",
"title": "JobCheckConnectionConfig",
"description": "job check connection config",
"type": "object",
"additionalProperties": false,
"required": ["connectionConfiguration", "dockerImage"],
"properties": {
"connectionConfiguration": {
"description": "type any. information here varies by integration. this data is validated against a specification"
},
"dockerImage": {
"type": "string"
}
}
}

View File

@@ -10,20 +10,16 @@
"configType": {
"type": "string",
"enum": [
"checkConnectionSource",
"checkConnectionDestination",
"checkConnection",
"discoverSchema",
"sync"
]
},
"checkConnectionSource": {
"$ref": "SourceConnectionImplementation.json"
},
"checkConnectionDestination": {
"$ref": "DestinationConnectionImplementation.json"
"checkConnection": {
"$ref": "JobCheckConnectionConfig.json"
},
"discoverSchema": {
"$ref": "SourceConnectionImplementation.json"
"$ref": "JobDiscoverSchemaConfig.json"
},
"sync": {
"$ref": "JobSyncConfig.json"

View File

@@ -0,0 +1,17 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/JobCheckConnectionConfig.json",
"title": "JobCheckConnectionConfig",
"description": "job check connection config",
"type": "object",
"additionalProperties": false,
"required": ["connectionConfiguration", "dockerImage"],
"properties": {
"connectionConfiguration": {
"description": "type any. information here varies by integration. this data is validated against a specification"
},
"dockerImage": {
"type": "string"
}
}
}

View File

@@ -16,13 +16,13 @@
]
},
"checkConnection": {
"$ref": "StandardConnectionStatus.json"
"$ref": "StandardCheckConnectionOutput.json"
},
"discoverSchema": {
"$ref": "StandardDiscoveryOutput.json"
"$ref": "StandardDiscoverSchemaOutput.json"
},
"sync": {
"$ref": "JobSyncOutput.json"
"$ref": "StandardSyncOutput.json"
}
}
}

View File

@@ -8,7 +8,9 @@
"required": [
"sourceConnectionImplementation",
"destinationConnectionImplementation",
"standardSync"
"standardSync",
"sourceDockerImage",
"destinationDockerImage"
],
"properties": {
"sourceConnectionImplementation": {
@@ -19,6 +21,12 @@
},
"standardSync": {
"$ref": "StandardSync.json"
},
"sourceDockerImage": {
"type": "string"
},
"destinationDockerImage": {
"type": "string"
}
}
}

View File

@@ -1,16 +1,15 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/ConnectionConfiguration.json",
"title": "ConnectionConfiguration",
"description": "information required for connection to a destination.",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/PublicCheckConnection.json",
"title": "PublicCheckConnection",
"description": "information required for connection.",
"type": "object",
"required": [
"dockerImage",
"configuration"
"connectionConfiguration"
],
"additionalProperties": false,
"properties": {
"configuration": {
"connectionConfiguration": {
"description": "type any. information here varies by integration. this data is validated against a specification"
}
}

View File

@@ -1,8 +1,8 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/StandardConnectionStatus.json",
"title": "StandardConnectionStatus",
"description": "describes the result of a 'test connection' action.",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/StandardCheckConnectionOutput.json",
"title": "StandardCheckConnectionOutput",
"description": "describes the result of a 'check connection' action.",
"type": "object",
"required": ["status"],
"additionalProperties": false,

View File

@@ -0,0 +1,16 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/StandardDiscoverSchemaOutput.json",
"title": "StandardDiscoverSchemaOutput",
"description": "information required for connection.",
"type": "object",
"required": [
"connectionConfiguration"
],
"additionalProperties": false,
"properties": {
"connectionConfiguration": {
"description": "type any. information here varies by integration. this data is validated against a specification"
}
}
}

View File

@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/JobSyncConfig.json",
"title": "JobSyncConfig",
"description": "job sync config",
"type": "object",
"additionalProperties": false,
"required": [
"sourceConnectionImplementation",
"destinationConnectionImplementation",
"standardSync"
],
"properties": {
"sourceConnectionImplementation": {
"$ref": "SourceConnectionImplementation.json"
},
"destinationConnectionImplementation": {
"$ref": "DestinationConnectionImplementation.json"
},
"standardSync": {
"$ref": "StandardSync.json"
}
}
}

View File

@@ -1,7 +1,7 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/JobSyncConfig.json",
"title": "JobSyncConfig",
"$id": "https://github.com/datalineio/dataline/blob/master/dataline-config/src/main/resources/json/StandardSyncOutput.json",
"title": "StandardSyncOutput",
"description": "job sync config",
"type": "object",
"additionalProperties": false,

View File

@@ -26,7 +26,9 @@ package io.dataline.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.config.DestinationConnectionImplementation;
import io.dataline.config.JobCheckConnectionConfig;
import io.dataline.config.JobConfig;
import io.dataline.config.JobDiscoverSchemaConfig;
import io.dataline.config.JobOutput;
import io.dataline.config.JobSyncConfig;
import io.dataline.config.SourceConnectionImplementation;
@@ -57,9 +59,16 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
final String scope =
"checkConnection:source:" + sourceImplementation.getSourceImplementationId();
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig();
jobCheckConnectionConfig.setConnectionConfiguration(sourceImplementation.getConfiguration());
jobCheckConnectionConfig.setDockerImage(
IntegrationConstants.SPEC_ID_TO_IMPL
.get(sourceImplementation.getSourceSpecificationId())
.getCheckConnection());
final JobConfig jobConfig = new JobConfig();
jobConfig.setConfigType(JobConfig.ConfigType.CHECK_CONNECTION_SOURCE);
jobConfig.setCheckConnectionSource(sourceImplementation);
jobConfig.setConfigType(JobConfig.ConfigType.CHECK_CONNECTION);
jobConfig.setCheckConnection(jobCheckConnectionConfig);
return createPendingJob(scope, jobConfig);
}
@@ -70,9 +79,17 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
final String scope =
"checkConnection:destination:" + destinationImplementation.getDestinationImplementationId();
final JobCheckConnectionConfig jobCheckConnectionConfig = new JobCheckConnectionConfig();
jobCheckConnectionConfig.setConnectionConfiguration(
destinationImplementation.getConfiguration());
jobCheckConnectionConfig.setDockerImage(
IntegrationConstants.SPEC_ID_TO_IMPL
.get(destinationImplementation.getDestinationSpecificationId())
.getCheckConnection());
final JobConfig jobConfig = new JobConfig();
jobConfig.setConfigType(JobConfig.ConfigType.CHECK_CONNECTION_DESTINATION);
jobConfig.setCheckConnectionDestination(destinationImplementation);
jobConfig.setConfigType(JobConfig.ConfigType.CHECK_CONNECTION);
jobConfig.setCheckConnection(jobCheckConnectionConfig);
return createPendingJob(scope, jobConfig);
}
@@ -83,9 +100,16 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
final String scope = "discoverSchema:" + sourceImplementation.getSourceImplementationId();
final JobDiscoverSchemaConfig jobDiscoverSchemaConfig = new JobDiscoverSchemaConfig();
jobDiscoverSchemaConfig.setConnectionConfiguration(sourceImplementation.getConfiguration());
jobDiscoverSchemaConfig.setDockerImage(
IntegrationConstants.SPEC_ID_TO_IMPL
.get(sourceImplementation.getSourceSpecificationId())
.getDiscoverSchema());
final JobConfig jobConfig = new JobConfig();
jobConfig.setConfigType(JobConfig.ConfigType.DISCOVER_SCHEMA);
jobConfig.setDiscoverSchema(sourceImplementation);
jobConfig.setDiscoverSchema(jobDiscoverSchemaConfig);
return createPendingJob(scope, jobConfig);
}
@@ -101,7 +125,15 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
final JobSyncConfig jobSyncConfig = new JobSyncConfig();
jobSyncConfig.setSourceConnectionImplementation(sourceImplementation);
jobSyncConfig.setSourceDockerImage(
IntegrationConstants.SPEC_ID_TO_IMPL
.get(sourceImplementation.getSourceSpecificationId())
.getSync());
jobSyncConfig.setDestinationConnectionImplementation(destinationImplementation);
jobSyncConfig.setDestinationDockerImage(
IntegrationConstants.SPEC_ID_TO_IMPL
.get(destinationImplementation.getDestinationSpecificationId())
.getSync());
jobSyncConfig.setStandardSync(standardSync);
final JobConfig jobConfig = new JobConfig();

View File

@@ -0,0 +1,69 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.scheduler;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.UUID;
public class IntegrationConstants {
public static Map<UUID, IntegrationMapping> SPEC_ID_TO_IMPL =
ImmutableMap.of(
UUID.fromString("2168516a-5c9a-4582-90dc-5e3a01e3f607"),
new IntegrationMapping("dataline/integration-singer-postgres-source"),
UUID.fromString("71cb2b91-1b2f-4c42-8f4d-f1ab2bd72080"),
new IntegrationMapping(
"dataline/integration-singer-postgres-destination",
"dataline/integration-singer-postgres-destination",
"dataline/integration-singer-postgres-destination"));
public static class IntegrationMapping {
private final String checkConnection;
private final String discoverSchema;
private final String sync;
public IntegrationMapping(String checkConnection, String discoverSchema, String sync) {
this.checkConnection = checkConnection;
this.discoverSchema = discoverSchema;
this.sync = sync;
}
public IntegrationMapping(String image) {
this(image, image, image);
}
public String getCheckConnection() {
return checkConnection;
}
public String getDiscoverSchema() {
return discoverSchema;
}
public String getSync() {
return sync;
}
}
}

View File

@@ -180,8 +180,7 @@ public class JobSubmitter implements Runnable {
persistence));
LOGGER.info("Submitting job to thread pool...");
break;
case CHECK_CONNECTION_SOURCE:
case CHECK_CONNECTION_DESTINATION:
case CHECK_CONNECTION:
case SYNC:
throw new RuntimeException("not implemented");
// todo: handle threadPool.submit(new WorkerWrapper<>(job.getId(), new EchoWorker(),

View File

@@ -26,15 +26,14 @@ package io.dataline.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.dataline.db.DatabaseHelper;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch

View File

@@ -26,9 +26,11 @@ package io.dataline.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.api.model.JobRead;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.DestinationConnectionImplementation;
import io.dataline.config.SourceConnectionImplementation;
import io.dataline.config.JobCheckConnectionConfig;
import io.dataline.config.JobSyncConfig;
import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardSyncInput;
import io.dataline.db.DatabaseHelper;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
@@ -62,35 +64,6 @@ public class WorkerWrapper<InputType, OutputType> implements Runnable {
this.persistence = persistence;
}
@SuppressWarnings("unchecked")
private InputType getInput(io.dataline.scheduler.Job job) {
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE:
final DestinationConnectionImplementation checkConnectionSource =
job.getConfig().getCheckConnectionDestination();
return (InputType) buildConnectionImplementation(checkConnectionSource.getConfiguration());
case CHECK_CONNECTION_DESTINATION:
final SourceConnectionImplementation checkConnectionDestination =
job.getConfig().getCheckConnectionSource();
return (InputType)
buildConnectionImplementation(checkConnectionDestination.getConfiguration());
case DISCOVER_SCHEMA:
return (InputType) job.getConfig().getDiscoverSchema();
case SYNC:
return (InputType) job.getConfig().getSync();
default:
throw new RuntimeException("Unrecognized config type: " + job.getConfig().getConfigType());
}
}
private static ConnectionImplementation buildConnectionImplementation(Object configuration) {
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
connectionImplementation.setConfiguration(configuration);
return connectionImplementation;
}
@Override
public void run() {
LOGGER.info("Executing worker wrapper...");
@@ -102,7 +75,7 @@ public class WorkerWrapper<InputType, OutputType> implements Runnable {
final InputType input = getInput(job);
// todo (cgardens) - replace this with whatever the correct path is. probably dependency
// inject it based via env.
// inject it based via env.
final Path workspacesRoot = Path.of("/tmp/dataline/workspaces/");
FileUtils.forceMkdir(workspacesRoot.toFile());
final Path workspaceRoot = workspacesRoot.resolve(String.valueOf(jobId));
@@ -132,6 +105,37 @@ public class WorkerWrapper<InputType, OutputType> implements Runnable {
}
}
@SuppressWarnings("unchecked")
private InputType getInput(io.dataline.scheduler.Job job) {
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION:
final JobCheckConnectionConfig checkConnection = job.getConfig().getCheckConnection();
final StandardCheckConnectionInput checkConnectionInput =
new StandardCheckConnectionInput();
checkConnectionInput.setConnectionConfiguration(
checkConnection.getConnectionConfiguration());
return (InputType) checkConnectionInput;
case DISCOVER_SCHEMA:
final JobCheckConnectionConfig discoverSchema = job.getConfig().getCheckConnection();
final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput();
discoverSchemaInput.setConnectionConfiguration(discoverSchema.getConnectionConfiguration());
return (InputType) discoverSchemaInput;
case SYNC:
final JobSyncConfig sync = job.getConfig().getSync();
final StandardSyncInput syncInput = new StandardSyncInput();
syncInput.setSourceConnectionImplementation(sync.getSourceConnectionImplementation());
syncInput.setDestinationConnectionImplementation(
sync.getDestinationConnectionImplementation());
syncInput.setStandardSync(sync.getStandardSync());
return (InputType) syncInput;
default:
throw new RuntimeException("Unrecognized config type: " + job.getConfig().getConfigType());
}
}
private static void setJobStatus(
BasicDataSource connectionPool, long jobId, JobRead.StatusEnum status) {
LOGGER.info("Setting job status to " + status + " for job " + jobId);

View File

@@ -33,8 +33,8 @@ import io.dataline.api.model.SourceImplementationIdRequestBody;
import io.dataline.commons.enums.Enums;
import io.dataline.config.DestinationConnectionImplementation;
import io.dataline.config.SourceConnectionImplementation;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardCheckConnectionOutput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.config.StandardSync;
import io.dataline.config.persistence.ConfigPersistence;
import io.dataline.scheduler.Job;
@@ -114,16 +114,16 @@ public class SchedulerHandler {
LOGGER.info("jobId = " + jobId);
final Job job = waitUntilJobIsTerminalOrTimeout(jobId);
final StandardDiscoveryOutput discoveryOutput =
final StandardDiscoverSchemaOutput output =
job.getOutput()
.orElseThrow(() -> new RuntimeException("Terminal job does not have an output"))
.getDiscoverSchema();
LOGGER.info("discoveryOutput = " + discoveryOutput);
LOGGER.info("output = " + output);
final SourceImplementationDiscoverSchemaRead read =
new SourceImplementationDiscoverSchemaRead();
read.setSchema(SchemaConverter.toApiSchema(discoveryOutput.getSchema()));
read.setSchema(SchemaConverter.toApiSchema(output.getSchema()));
return read;
}
@@ -192,7 +192,7 @@ public class SchedulerHandler {
}
private CheckConnectionRead reportConnectionStatus(Job job) {
final StandardConnectionStatus connectionStatus =
final StandardCheckConnectionOutput output =
job.getOutput()
.orElseThrow(() -> new RuntimeException("Terminal job does not have an output"))
.getCheckConnection();
@@ -200,8 +200,8 @@ public class SchedulerHandler {
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead();
checkConnectionRead.setStatus(
Enums.convertTo(connectionStatus.getStatus(), CheckConnectionRead.StatusEnum.class));
checkConnectionRead.setMessage(connectionStatus.getMessage());
Enums.convertTo(output.getStatus(), CheckConnectionRead.StatusEnum.class));
checkConnectionRead.setMessage(output.getMessage());
return checkConnectionRead;
}

View File

@@ -24,8 +24,8 @@
package io.dataline.workers;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardCheckConnectionOutput;
public interface CheckConnectionWorker
extends Worker<ConnectionImplementation, StandardConnectionStatus> {}
extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {}

View File

@@ -24,8 +24,8 @@
package io.dataline.workers;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardDiscoverSchemaOutput;
public interface DiscoverSchemaWorker
extends Worker<ConnectionImplementation, StandardDiscoveryOutput> {}
extends Worker<StandardDiscoverSchemaInput, StandardDiscoverSchemaOutput> {}

View File

@@ -0,0 +1,30 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.workers;
import io.dataline.config.StandardSyncInput;
import io.dataline.config.StandardSyncOutput;
public interface SyncWorker extends Worker<StandardSyncInput, StandardSyncOutput> {}

View File

@@ -24,9 +24,10 @@
package io.dataline.workers.singer;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardCheckConnectionOutput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.workers.CheckConnectionWorker;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
@@ -35,41 +36,46 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerCheckConnectionWorker
extends BaseSingerWorker<ConnectionImplementation, StandardConnectionStatus>
extends BaseSingerWorker<StandardCheckConnectionInput, StandardCheckConnectionOutput>
implements CheckConnectionWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(SingerCheckConnectionWorker.class);
private final SingerDiscoveryWorker singerDiscoveryWorker;
private final SingerDiscoverSchemaWorker singerDiscoverSchemaWorker;
public SingerCheckConnectionWorker(SingerConnector connector) {
super(connector);
this.singerDiscoveryWorker = new SingerDiscoveryWorker(connector);
this.singerDiscoverSchemaWorker = new SingerDiscoverSchemaWorker(connector);
}
@Override
OutputAndStatus<StandardConnectionStatus> runInternal(
ConnectionImplementation connectionImplementation, Path workspaceRoot) {
OutputAndStatus<StandardDiscoveryOutput> outputAndStatus =
singerDiscoveryWorker.runInternal(connectionImplementation, workspaceRoot);
StandardConnectionStatus connectionStatus = new StandardConnectionStatus();
OutputAndStatus<StandardCheckConnectionOutput> runInternal(
StandardCheckConnectionInput input, Path workspaceRoot) {
final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput();
discoverSchemaInput.setConnectionConfiguration(input.getConnectionConfiguration());
OutputAndStatus<StandardDiscoverSchemaOutput> outputAndStatus =
singerDiscoverSchemaWorker.runInternal(discoverSchemaInput, workspaceRoot);
StandardCheckConnectionOutput output = new StandardCheckConnectionOutput();
JobStatus jobStatus;
if (outputAndStatus.getStatus() == JobStatus.SUCCESSFUL
&& outputAndStatus.getOutput().isPresent()) {
connectionStatus.setStatus(StandardConnectionStatus.Status.SUCCESS);
output.setStatus(StandardCheckConnectionOutput.Status.SUCCESS);
jobStatus = JobStatus.SUCCESSFUL;
} else {
LOGGER.info("Connection check unsuccessful. Discovery output: {}", outputAndStatus);
jobStatus = JobStatus.FAILED;
connectionStatus.setStatus(StandardConnectionStatus.Status.FAILURE);
output.setStatus(StandardCheckConnectionOutput.Status.FAILURE);
// TODO add better error log parsing to specify the exact reason for failure as the message
connectionStatus.setMessage("Failed to connect.");
output.setMessage("Failed to connect.");
}
return new OutputAndStatus<>(jobStatus, connectionStatus);
return new OutputAndStatus<>(jobStatus, output);
}
@Override
public void cancel() {
singerDiscoveryWorker.cancel();
singerDiscoverSchemaWorker.cancel();
}
}

View File

@@ -29,10 +29,10 @@ import static io.dataline.workers.JobStatus.SUCCESSFUL;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.Schema;
import io.dataline.config.SingerCatalog;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.workers.DiscoverSchemaWorker;
import io.dataline.workers.OutputAndStatus;
import java.io.IOException;
@@ -41,11 +41,11 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerDiscoveryWorker
extends BaseSingerWorker<ConnectionImplementation, StandardDiscoveryOutput>
public class SingerDiscoverSchemaWorker
extends BaseSingerWorker<StandardDiscoverSchemaInput, StandardDiscoverSchemaOutput>
implements DiscoverSchemaWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(SingerDiscoveryWorker.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SingerDiscoverSchemaWorker.class);
// TODO log errors to specified file locations
private static String CONFIG_JSON_FILENAME = "config.json";
@@ -54,19 +54,20 @@ public class SingerDiscoveryWorker
private volatile Process workerProcess;
public SingerDiscoveryWorker(SingerConnector connector) {
public SingerDiscoverSchemaWorker(SingerConnector connector) {
super(connector);
}
@Override
OutputAndStatus<StandardDiscoveryOutput> runInternal(
ConnectionImplementation connectionImplementation, Path workspaceRoot) {
OutputAndStatus<StandardDiscoverSchemaOutput> runInternal(
StandardDiscoverSchemaInput discoverSchemaInput, Path workspaceRoot) {
// todo (cgardens) - just getting original impl to line up with new iface for now. this can be
// reduced.
final ObjectMapper objectMapper = new ObjectMapper();
final String configDotJson;
try {
configDotJson = objectMapper.writeValueAsString(connectionImplementation.getConfiguration());
configDotJson =
objectMapper.writeValueAsString(discoverSchemaInput.getConnectionConfiguration());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@@ -102,7 +103,7 @@ public class SingerDiscoveryWorker
if (exitCode == 0) {
String catalog = readFile(workspaceRoot, CATALOG_JSON_FILENAME);
final SingerCatalog singerCatalog = jsonCatalogToTyped(catalog);
final StandardDiscoveryOutput discoveryOutput = toDiscoveryOutput(singerCatalog);
final StandardDiscoverSchemaOutput discoveryOutput = toDiscoveryOutput(singerCatalog);
return new OutputAndStatus<>(SUCCESSFUL, discoveryOutput);
} else {
String errLog = readFile(workspaceRoot, ERROR_LOG_FILENAME);
@@ -126,12 +127,12 @@ public class SingerDiscoveryWorker
}
}
private static StandardDiscoveryOutput toDiscoveryOutput(SingerCatalog catalog) {
private static StandardDiscoverSchemaOutput toDiscoveryOutput(SingerCatalog catalog) {
final Schema schema = SingerCatalogConverters.toDatalineSchema(catalog);
final StandardDiscoveryOutput discoveryOutput = new StandardDiscoveryOutput();
discoveryOutput.setSchema(schema);
final StandardDiscoverSchemaOutput output = new StandardDiscoverSchemaOutput();
output.setSchema(schema);
return discoveryOutput;
return output;
}
@Override

View File

@@ -24,10 +24,10 @@
package io.dataline.workers.singer.postgres_tap;
import io.dataline.workers.singer.SingerDiscoveryWorker;
import io.dataline.workers.singer.SingerDiscoverSchemaWorker;
import io.dataline.workers.singer.SingerTap;
public class SingerPostgresTapDiscoverWorker extends SingerDiscoveryWorker {
public class SingerPostgresTapDiscoverWorker extends SingerDiscoverSchemaWorker {
public SingerPostgresTapDiscoverWorker() {
super(SingerTap.POSTGRES);

View File

@@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.*;
import io.dataline.config.Schema;
import io.dataline.config.SingerCatalog;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.workers.BaseWorkerTestCase;
import org.junit.jupiter.api.Test;
@@ -41,7 +41,8 @@ class SingerCatalogConvertersTest extends BaseWorkerTestCase {
final SingerCatalog expectedCatalog =
getJsonAsTyped("simple_postgres_singer_catalog.json", SingerCatalog.class);
final Schema datalineSchema =
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoveryOutput.class).getSchema();
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoverSchemaOutput.class)
.getSchema();
final SingerCatalog actualCatalog =
SingerCatalogConverters.applySchemaToDiscoveredCatalog(catalog, datalineSchema);
@@ -58,7 +59,8 @@ class SingerCatalogConvertersTest extends BaseWorkerTestCase {
final SingerCatalog catalog =
getJsonAsTyped("simple_postgres_singer_catalog.json", SingerCatalog.class);
final Schema expectedSchema =
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoveryOutput.class).getSchema();
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoverSchemaOutput.class)
.getSchema();
expectedSchema.getTables().get(0).setSelected(false);
expectedSchema.getTables().get(0).getColumns().get(0).setSelected(true);
expectedSchema.getTables().get(0).getColumns().get(1).setSelected(true);

View File

@@ -30,8 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardCheckConnectionOutput;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.PostgreSQLContainerHelper;
@@ -64,17 +64,18 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
PostgreSQLContainerHelper.getSingerConfigJson(
"user", "pass", "localhost", "postgres", "111111");
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(fakeDbCreds, Object.class);
connectionImplementation.setConfiguration(o);
final StandardCheckConnectionInput standardCheckConnectionInput =
new StandardCheckConnectionInput();
standardCheckConnectionInput.setConnectionConfiguration(o);
SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
OutputAndStatus<StandardConnectionStatus> run =
worker.run(connectionImplementation, createWorkspacePath(jobId));
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
assertEquals(StandardCheckConnectionOutput.Status.FAILURE, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}
@@ -94,16 +95,17 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(incorrectCreds, Object.class);
connectionImplementation.setConfiguration(o);
final StandardCheckConnectionInput standardCheckConnectionInput =
new StandardCheckConnectionInput();
standardCheckConnectionInput.setConnectionConfiguration(o);
OutputAndStatus<StandardConnectionStatus> run =
worker.run(connectionImplementation, createWorkspacePath(jobId));
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
assertEquals(StandardCheckConnectionOutput.Status.FAILURE, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}
@@ -115,17 +117,18 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
final String jobId = "1";
String creds = PostgreSQLContainerHelper.getSingerConfigJson(db);
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(creds, Object.class);
connectionImplementation.setConfiguration(o);
final StandardCheckConnectionInput standardCheckConnectionInput =
new StandardCheckConnectionInput();
standardCheckConnectionInput.setConnectionConfiguration(o);
SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
OutputAndStatus<StandardConnectionStatus> run =
worker.run(connectionImplementation, createWorkspacePath(jobId));
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
assertEquals(SUCCESSFUL, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.SUCCESS, run.getOutput().get().getStatus());
assertEquals(StandardCheckConnectionOutput.Status.SUCCESS, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}

View File

@@ -30,8 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.config.ConnectionImplementation;
import io.dataline.config.StandardDiscoveryOutput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.PostgreSQLContainerHelper;
@@ -48,7 +48,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
public class SingerDiscoverSchemaWorkerTest extends BaseWorkerTestCase {
PostgreSQLContainer db;
@@ -66,14 +66,14 @@ public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
public void testPostgresDiscovery() throws IOException {
final String jobId = "1";
String postgresCreds = PostgreSQLContainerHelper.getSingerConfigJson(db);
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(postgresCreds, Object.class);
connectionImplementation.setConfiguration(o);
final StandardDiscoverSchemaInput input = new StandardDiscoverSchemaInput();
input.setConnectionConfiguration(o);
SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SingerTap.POSTGRES);
SingerDiscoverSchemaWorker worker = new SingerDiscoverSchemaWorker(SingerTap.POSTGRES);
OutputAndStatus<StandardDiscoveryOutput> run =
worker.run(connectionImplementation, createWorkspacePath(jobId));
OutputAndStatus<StandardDiscoverSchemaOutput> run =
worker.run(input, createWorkspacePath(jobId));
assertEquals(SUCCESSFUL, run.getStatus());
@@ -90,18 +90,19 @@ public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
final String jobId = "1";
String postgresCreds = PostgreSQLContainerHelper.getSingerConfigJson(db);
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(postgresCreds, Object.class);
connectionImplementation.setConfiguration(o);
SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SingerTap.POSTGRES);
final StandardDiscoverSchemaInput input = new StandardDiscoverSchemaInput();
input.setConnectionConfiguration(o);
SingerDiscoverSchemaWorker worker = new SingerDiscoverSchemaWorker(SingerTap.POSTGRES);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<?> workerWasCancelled =
threadPool.submit(
() -> {
OutputAndStatus<StandardDiscoveryOutput> output =
worker.run(connectionImplementation, createWorkspacePath(jobId));
OutputAndStatus<StandardDiscoverSchemaOutput> output =
worker.run(input, createWorkspacePath(jobId));
assertEquals(FAILED, output.getStatus());
});