1
0
mirror of synced 2025-12-19 18:14:56 -05:00

Changes project codestyle (#139)

This commit is contained in:
Michel Tricot
2020-08-31 13:49:40 -07:00
committed by GitHub
parent 656c34ef8f
commit 098a121c24
110 changed files with 2441 additions and 743 deletions

1162
.editorconfig Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
plugins {
id "base"
id "com.diffplug.spotless" version "5.1.1" apply false
id "com.diffplug.spotless" version "5.3.0" apply false
}
Properties env = new Properties()
@@ -47,16 +47,24 @@ subprojects {
spotless {
java {
importOrder()
eclipse('4.16.0').configFile(rootProject.file('tools/gradle/codestyle/java-google-style.xml'))
licenseHeaderFile createLicenseWith(rootProject.file('LICENSE'))
googleJavaFormat('1.8')
removeUnusedImports()
trimTrailingWhitespace()
}
groovyGradle {
target '**/*.gradle'
}
format 'json', {
target "**/*.json"
sql {
target '**/*.sql'
dbeaver().configFile(rootProject.file('tools/gradle/codestyle/sql-dbeaver.properties'))
}
format 'styling', {
target '**/*.json', '**/*.yaml'
prettier()
}

View File

@@ -27,28 +27,28 @@ info:
name: MIT
url: "https://opensource.org/licenses/MIT"
tags:
- name: workspace
description: Workspace related resources.
- name: source
description: Source related resources.
- name: source_specification
description: Source specification related resources.
- name: source_implementation
description: Source implementation related resources.
- name: destination
description: Destination related resources.
- name: destination_specification
description: Destination specification related resources.
- name: destination_implementation
description: Destination implementation related resources.
- name: connection
description: Connection between sources and destinations.
- name: workspace
description: Workspace related resources.
- name: source
description: Source related resources.
- name: source_specification
description: Source specification related resources.
- name: source_implementation
description: Source implementation related resources.
- name: destination
description: Destination related resources.
- name: destination_specification
description: Destination specification related resources.
- name: destination_implementation
description: Destination implementation related resources.
- name: connection
description: Connection between sources and destinations.
paths:
/v1/workspaces/get:
post:
tags:
- workspace
- workspace
summary: Find workspace by ID
operationId: getWorkspace
requestBody:
@@ -71,7 +71,7 @@ paths:
/v1/workspaces/get_by_slug:
post:
tags:
- workspace
- workspace
summary: Find workspace by slug
operationId: getWorkspaceBySlug
requestBody:
@@ -94,7 +94,7 @@ paths:
/v1/workspaces/update:
post:
tags:
- workspace
- workspace
summary: Update workspace state
operationId: updateWorkspace
requestBody:
@@ -117,7 +117,7 @@ paths:
/v1/sources/list:
post:
tags:
- source
- source
summary: List all of the sources that Dataline supports
operationId: listSources
responses:
@@ -130,7 +130,7 @@ paths:
/v1/sources/get:
post:
tags:
- source
- source
summary: Get source
operationId: getSource
requestBody:
@@ -153,7 +153,7 @@ paths:
/v1/source_specifications/get:
post:
tags:
- source_specification
- source_specification
summary: Get specification for a source.
operationId: getSourceSpecification
requestBody:
@@ -176,7 +176,7 @@ paths:
/v1/source_implementations/create:
post:
tags:
- source_implementation
- source_implementation
summary: Create a source implementation
operationId: createSourceImplementation
requestBody:
@@ -197,7 +197,7 @@ paths:
/v1/source_implementations/update:
post:
tags:
- source_implementation
- source_implementation
summary: Update a source
operationId: updateSourceImplementation
requestBody:
@@ -220,7 +220,7 @@ paths:
/v1/source_implementations/list:
post:
tags:
- source_implementation
- source_implementation
summary: List source implementations for workspace
operationId: listSourceImplementationsForWorkspace
requestBody:
@@ -243,7 +243,7 @@ paths:
/v1/source_implementations/get:
post:
tags:
- source_implementation
- source_implementation
summary: Get source implementation
operationId: getSourceImplementation
requestBody:
@@ -266,7 +266,7 @@ paths:
/v1/source_implementations/delete:
post:
tags:
- source_implementation
- source_implementation
summary: Delete a source implementation
operationId: deleteSourceImplementation
requestBody:
@@ -285,7 +285,7 @@ paths:
/v1/source_implementations/check_connection:
post:
tags:
- source_implementation
- source_implementation
summary: Check connection to the source implementation
operationId: checkConnectionToSourceImplementation
requestBody:
@@ -308,7 +308,7 @@ paths:
/v1/source_implementations/discover_schema:
post:
tags:
- source_implementation
- source_implementation
summary: Discover the schema of the source implementation
operationId: discoverSchemaForSourceImplementation
requestBody:
@@ -331,7 +331,7 @@ paths:
/v1/destinations/list:
post:
tags:
- destination
- destination
summary: List all of the destinations that Dataline supports
operationId: listDestinations
responses:
@@ -344,7 +344,7 @@ paths:
/v1/destinations/get:
post:
tags:
- destination
- destination
summary: Get destination
operationId: getDestination
requestBody:
@@ -367,7 +367,7 @@ paths:
/v1/destination_specifications/get:
post:
tags:
- destination_specification
- destination_specification
summary: Get specification for a destination
operationId: getDestinationSpecification
requestBody:
@@ -390,7 +390,7 @@ paths:
/v1/destination_implementations/create:
post:
tags:
- destination_implementation
- destination_implementation
summary: Create a destination implementation
operationId: createDestinationImplementation
requestBody:
@@ -411,7 +411,7 @@ paths:
/v1/destination_implementations/update:
post:
tags:
- destination_implementation
- destination_implementation
summary: Update a destination implementation
operationId: updateDestinationImplementation
requestBody:
@@ -432,7 +432,7 @@ paths:
/v1/destination_implementations/list:
post:
tags:
- destination_implementation
- destination_implementation
summary: List configured destinations for a workspace
operationId: listDestinationImplementationsForWorkspace
requestBody:
@@ -455,7 +455,7 @@ paths:
/v1/destination_implementations/get:
post:
tags:
- destination_implementation
- destination_implementation
summary: get configured destination
operationId: getDestinationImplementation
requestBody:
@@ -478,7 +478,7 @@ paths:
/v1/destination_implementations/check_connection:
post:
tags:
- destination_implementation
- destination_implementation
summary: Check connection to the destination implementation
operationId: checkConnectionToDestinationImplementation
requestBody:
@@ -501,7 +501,7 @@ paths:
/v1/connections/create:
post:
tags:
- connection
- connection
summary: Create a connection between a source implementation and a destination implementation
operationId: createConnection
requestBody:
@@ -522,7 +522,7 @@ paths:
/v1/connections/update:
post:
tags:
- connection
- connection
summary: Updated a connection status
operationId: updateConnection
requestBody:
@@ -543,7 +543,7 @@ paths:
/v1/connections/list:
post:
tags:
- connection
- connection
summary: Returns all connections for a workspace.
operationId: listConnectionsForWorkspace
requestBody:
@@ -566,7 +566,7 @@ paths:
/v1/connections/get:
post:
tags:
- connection
- connection
summary: Get a connection
operationId: getConnection
requestBody:
@@ -589,7 +589,7 @@ paths:
/v1/connections/sync:
post:
tags:
- connection
- connection
summary: Trigger a manual sync of the connection
operationId: syncConnection
requestBody:
@@ -612,7 +612,7 @@ paths:
/v1/jobs/list:
post:
tags:
- jobs
- jobs
summary: Returns recent jobs for a connection.
operationId: listJobsFor
requestBody:
@@ -635,7 +635,7 @@ paths:
/v1/jobs/get:
post:
tags:
- jobs
- jobs
summary: Get information about a job
operationId: getJobInfo
requestBody:
@@ -659,8 +659,8 @@ externalDocs:
description: Find out more about Dataline
url: "http://dataline.io"
servers:
- url: "https://virtserver.swaggerhub.com/cgardens6/dataline/1.0.0"
- url: "http://virtserver.swaggerhub.com/cgardens6/dataline/1.0.0"
- url: "https://virtserver.swaggerhub.com/cgardens6/dataline/1.0.0"
- url: "http://virtserver.swaggerhub.com/cgardens6/dataline/1.0.0"
components:
securitySchemes:
bearerAuth:
@@ -675,17 +675,17 @@ components:
WorkspaceIdRequestBody:
type: object
required:
- workspaceId
- workspaceId
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
WorkspaceRead:
type: object
required:
- workspaceId
- name
- slug
- initialSetupComplete
- workspaceId
- name
- slug
- initialSetupComplete
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
@@ -698,11 +698,11 @@ components:
WorkspaceUpdate:
type: object
required:
- workspaceId
- initialSetupComplete
- anonymousDataCollection
- news
- securityUpdates
- workspaceId
- initialSetupComplete
- anonymousDataCollection
- news
- securityUpdates
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
@@ -721,7 +721,7 @@ components:
SlugRequestBody:
type: object
required:
- slug
- slug
properties:
slug:
type: string
@@ -732,15 +732,15 @@ components:
SourceIdRequestBody:
type: object
required:
- sourceId
- sourceId
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
SourceRead:
type: object
required:
- sourceId
- name
- sourceId
- name
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
@@ -749,7 +749,7 @@ components:
SourceReadList:
type: object
required:
- sources
- sources
properties:
sources:
type: array
@@ -762,9 +762,9 @@ components:
SourceSpecificationRead:
type: object
required:
- sourceSpecificationId
- sourceId
- specification
- sourceSpecificationId
- sourceId
- specification
properties:
sourceSpecificationId:
$ref: "#/components/schemas/SourceSpecificationId"
@@ -780,7 +780,7 @@ components:
SourceImplementationIdRequestBody:
type: object
required:
- sourceId
- sourceId
properties:
sourceImplementationId:
$ref: "#/components/schemas/SourceImplementationId"
@@ -790,9 +790,9 @@ components:
SourceImplementationCreate:
type: object
required:
- workspaceId
- sourceSpecificationId
- connectionConfiguration
- workspaceId
- sourceSpecificationId
- connectionConfiguration
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
@@ -803,8 +803,8 @@ components:
SourceImplementationUpdate:
type: object
required:
- sourceImplementationId
- connectionConfiguration
- sourceImplementationId
- connectionConfiguration
properties:
sourceImplementationId:
$ref: "#/components/schemas/SourceImplementationId"
@@ -813,11 +813,11 @@ components:
SourceImplementationRead:
type: object
required:
- sourceId
- sourceImplementationId
- workspaceId
- sourceSpecificationId
- connectionConfiguration
- sourceId
- sourceImplementationId
- workspaceId
- sourceSpecificationId
- connectionConfiguration
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
@@ -832,7 +832,7 @@ components:
SourceImplementationReadList:
type: object
required:
- sources
- sources
properties:
sources:
type: array
@@ -841,7 +841,7 @@ components:
SourceImplementationDiscoverSchemaRead:
type: object
required:
- schema
- schema
properties:
schema:
$ref: "#/components/schemas/SourceSchema"
@@ -852,15 +852,15 @@ components:
DestinationIdRequestBody:
type: object
required:
- destinationId
- destinationId
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
DestinationRead:
type: object
required:
- destinationId
- name
- destinationId
- name
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
@@ -869,7 +869,7 @@ components:
DestinationReadList:
type: object
required:
- destinations
- destinations
properties:
destinations:
type: array
@@ -882,9 +882,9 @@ components:
DestinationSpecificationRead:
type: object
required:
- destinationSpecificationId
- destinationId
- specification
- destinationSpecificationId
- destinationId
- specification
properties:
destinationSpecificationId:
$ref: "#/components/schemas/DestinationSpecificationId"
@@ -900,16 +900,16 @@ components:
DestinationImplementationIdRequestBody:
type: object
required:
- destinationImplementationId
- destinationImplementationId
properties:
destinationImplementationId:
$ref: "#/components/schemas/SourceImplementationId"
DestinationImplementationCreate:
type: object
required:
- workspaceId
- destinationSpecificationId
- connectionConfiguration
- workspaceId
- destinationSpecificationId
- connectionConfiguration
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
@@ -921,8 +921,8 @@ components:
DestinationImplementationUpdate:
type: object
required:
- destinationImplementationId
- connectionConfiguration
- destinationImplementationId
- connectionConfiguration
properties:
destinationImplementationId:
$ref: "#/components/schemas/DestinationImplementationId"
@@ -932,11 +932,11 @@ components:
DestinationImplementationRead:
type: object
required:
- destinationId
- destinationImplementationId
- workspaceId
- destinationSpecificationId
- connectionConfiguration
- destinationId
- destinationImplementationId
- workspaceId
- destinationSpecificationId
- connectionConfiguration
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
@@ -952,7 +952,7 @@ components:
DestinationImplementationReadList:
type: object
required:
- destinations
- destinations
properties:
destinations:
type: array
@@ -965,17 +965,17 @@ components:
ConnectionIdRequestBody:
type: object
required:
- connectionId
- connectionId
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
ConnectionCreate:
type: object
required:
- sourceImplementationId
- destinationImplementationId
- syncMode
- status
- sourceImplementationId
- destinationImplementationId
- syncMode
- status
properties:
name:
type: string
@@ -987,8 +987,8 @@ components:
syncMode:
type: string
enum:
- full_refresh
- append
- full_refresh
- append
syncSchema:
$ref: "#/components/schemas/SourceSchema"
schedule:
@@ -998,10 +998,10 @@ components:
ConnectionUpdate:
type: object
required:
- sourceImplementationId
- destinationImplementationId
- syncSchema
- status
- sourceImplementationId
- destinationImplementationId
- syncSchema
- status
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
@@ -1014,13 +1014,13 @@ components:
ConnectionRead:
type: object
required:
- connectionId
- name
- sourceImplementationId
- destinationImplementationId
- syncMode
- syncSchema
- status
- connectionId
- name
- sourceImplementationId
- destinationImplementationId
- syncMode
- syncSchema
- status
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
@@ -1033,8 +1033,8 @@ components:
syncMode:
type: string
enum:
- full_refresh
- append
- full_refresh
- append
syncSchema:
$ref: "#/components/schemas/SourceSchema"
schedule:
@@ -1044,7 +1044,7 @@ components:
ConnectionReadList:
type: object
required:
- connections
- connections
properties:
connections:
type: array
@@ -1054,15 +1054,15 @@ components:
type: string
description: Active means that data is flowing through the connection. Inactive means it is not. Deprecated means the connection is off and cannot be re-activated. the schema field describes the elements of the schema that will be synced.
enum:
- active
- inactive
- deprecated
- active
- inactive
- deprecated
ConnectionSchedule:
description: if null, then no schedule is set.
type: object
required:
- units
- timeUnit
- units
- timeUnit
properties:
units:
type: integer
@@ -1070,27 +1070,27 @@ components:
timeUnit:
type: string
enum:
- minutes
- hours
- days
- weeks
- months
- minutes
- hours
- days
- weeks
- months
ConnectionSyncRead:
type: object
required:
- status
- status
properties:
status:
type: string
enum:
- success
- fail
- success
- fail
# SCHEMA
SourceSchema:
description: describes the available schema.
type: object
required:
- tables
- tables
properties:
tables:
type: array
@@ -1099,8 +1099,8 @@ components:
SourceSchemaTable:
type: object
required:
- name
- columns
- name
- columns
properties:
name:
type: string
@@ -1111,9 +1111,9 @@ components:
SourceSchemaColumn:
type: object
required:
- name
- dataType
- selected
- name
- dataType
- selected
properties:
name:
type: string
@@ -1124,9 +1124,9 @@ components:
DataType:
type: string
enum:
- string
- number
- boolean
- string
- number
- boolean
# SCHEDULER
JobId:
type: integer
@@ -1134,15 +1134,15 @@ components:
JobConfigType:
type: string
enum:
- checkConnectionSource
- checkConnectionDestination
- discoverSchema
- sync
- checkConnectionSource
- checkConnectionDestination
- discoverSchema
- sync
JobListRequestBody:
type: object
required:
- config_type
- config_id
- config_type
- config_id
properties:
config_type:
$ref: "#/components/schemas/JobConfigType"
@@ -1151,19 +1151,19 @@ components:
JobIdRequestBody:
type: object
required:
- id
- id
properties:
id:
$ref: "#/components/schemas/JobId"
JobRead:
type: object
required:
- id
- config_type
- config_id
- created_at
- updated_at
- status
- id
- config_type
- config_id
- created_at
- updated_at
- status
properties:
id:
$ref: "#/components/schemas/JobId"
@@ -1183,15 +1183,15 @@ components:
status:
type: string
enum:
- pending
- running
- failed
- completed
- cancelled
- pending
- running
- failed
- completed
- cancelled
JobReadList:
type: object
required:
- jobs
- jobs
properties:
jobs:
type: array
@@ -1200,8 +1200,8 @@ components:
JobInfoRead:
type: object
required:
- job
- logs
- job
- logs
properties:
job:
$ref: "#/components/schemas/JobRead"
@@ -1210,8 +1210,8 @@ components:
LogRead:
type: object
required:
- stdout
- stderr
- stdout
- stderr
properties:
stdout:
type: array
@@ -1225,13 +1225,13 @@ components:
CheckConnectionRead:
type: object
required:
- status
- status
properties:
status:
type: string
enum:
- success
- failure
- success
- failure
message:
type: string
responses:
@@ -1239,4 +1239,4 @@ components:
description: Invalid Input
# apply the security globally to all operations
security:
- bearerAuth: []
- bearerAuth: []

View File

@@ -35,14 +35,15 @@ public class Enums {
return Enum.valueOf(oe, ie.name());
}
public static <T1 extends Enum<T1>, T2 extends Enum<T2>> boolean isCompatible(
Class<T1> c1, Class<T2> c2) {
public static <T1 extends Enum<T1>, T2 extends Enum<T2>> boolean isCompatible(Class<T1> c1,
Class<T2> c2) {
Preconditions.checkArgument(c1.isEnum());
Preconditions.checkArgument(c2.isEnum());
return c1.getEnumConstants().length == c2.getEnumConstants().length
&& Sets.difference(
Arrays.stream(c1.getEnumConstants()).map(Enum::name).collect(Collectors.toSet()),
Arrays.stream(c2.getEnumConstants()).map(Enum::name).collect(Collectors.toSet()))
Arrays.stream(c1.getEnumConstants()).map(Enum::name).collect(Collectors.toSet()),
Arrays.stream(c2.getEnumConstants()).map(Enum::name).collect(Collectors.toSet()))
.isEmpty();
}
}

View File

@@ -27,6 +27,7 @@ package io.dataline.commons.env;
import java.util.Objects;
public enum Env {
TEST,
DOCKER;
@@ -40,4 +41,5 @@ public enum Env {
public static boolean isDocker() {
return CURRENT_ENV == Env.DOCKER;
}
}

View File

@@ -82,4 +82,5 @@ public class Jsons {
public static <T> JsonNode jsonNode(final T object) {
return OBJECT_MAPPER.valueToTree(object);
}
}

View File

@@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
class HelloWorldTest {
@Test
void name() {
assertEquals(1, 1);
}
}

View File

@@ -85,4 +85,5 @@ class EnumsTest {
void testNotCompatibleDifferentLength2() {
Assertions.assertFalse(isCompatible(E4.class, E1.class));
}
}

View File

@@ -102,9 +102,9 @@ class JsonsTest {
Assertions.assertEquals(
"{\"test\":\"abc\",\"test2\":\"def\"}",
Jsons.jsonNode(
ImmutableMap.of(
"test", "abc",
"test2", "def"))
ImmutableMap.of(
"test", "abc",
"test2", "def"))
.toString());
}
@@ -145,5 +145,7 @@ class JsonsTest {
public int hashCode() {
return Objects.hash(str, num, numLong);
}
}
}

View File

@@ -25,5 +25,7 @@
package io.dataline.config.init;
public class DatalineConfigInitConstants {
public static final String PLACEHOLDER = "";
}

View File

@@ -25,6 +25,7 @@
package io.dataline.config;
public enum ConfigSchema {
// workspace
STANDARD_WORKSPACE("StandardWorkspace.json"),
@@ -63,4 +64,5 @@ public enum ConfigSchema {
public static String getSchemaDirectory() {
return "json/";
}
}

View File

@@ -35,4 +35,5 @@ public interface Configs {
String getWorkspaceDockerMount();
String getDockerNetwork();
}

View File

@@ -90,4 +90,5 @@ public class EnvConfigs implements Configs {
}
return Path.of(value);
}
}

View File

@@ -90,4 +90,5 @@ class EnvConfigsTest {
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc");
Assertions.assertEquals("abc", config.getDockerNetwork());
}
}

View File

@@ -25,6 +25,7 @@
package io.dataline.config.persistence;
public class ConfigNotFoundException extends Exception {
public ConfigNotFoundException(String message) {
super(message);
}
@@ -32,4 +33,5 @@ public class ConfigNotFoundException extends Exception {
public ConfigNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -27,6 +27,7 @@ package io.dataline.config.persistence;
import java.util.Set;
public interface ConfigPersistence {
<T> T getConfig(PersistenceConfigType persistenceConfigType, String configId, Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException;
@@ -35,4 +36,5 @@ public interface ConfigPersistence {
<T> void writeConfig(PersistenceConfigType persistenceConfigType, String configId, T config)
throws JsonValidationException;
}

View File

@@ -46,6 +46,7 @@ import org.apache.commons.io.FileUtils;
// we force all interaction with disk storage to be effectively single threaded.
public class DefaultConfigPersistence implements ConfigPersistence {
private static final String CONFIG_PATH_IN_JAR = "/json";
private static final String CONFIG_DIR = "schemas";
private static final Path configFilesRoot = getConfigFiles();
@@ -98,16 +99,18 @@ public class DefaultConfigPersistence implements ConfigPersistence {
}
@Override
public <T> T getConfig(
PersistenceConfigType persistenceConfigType, String configId, Class<T> clazz)
public <T> T getConfig(PersistenceConfigType persistenceConfigType,
String configId,
Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException {
synchronized (lock) {
return getConfigInternal(persistenceConfigType, configId, clazz);
}
}
private <T> T getConfigInternal(
PersistenceConfigType persistenceConfigType, String configId, Class<T> clazz)
private <T> T getConfigInternal(PersistenceConfigType persistenceConfigType,
String configId,
Class<T> clazz)
throws ConfigNotFoundException, JsonValidationException {
// validate file with schema
try {
@@ -139,8 +142,9 @@ public class DefaultConfigPersistence implements ConfigPersistence {
}
@Override
public <T> void writeConfig(
PersistenceConfigType persistenceConfigType, String configId, T config)
public <T> void writeConfig(PersistenceConfigType persistenceConfigType,
String configId,
T config)
throws JsonValidationException {
synchronized (lock) {
// validate config with schema
@@ -211,8 +215,7 @@ public class DefaultConfigPersistence implements ConfigPersistence {
return String.format("%s.json", id);
}
private ConfigSchema standardConfigTypeToConfigSchema(
PersistenceConfigType persistenceConfigType) {
private ConfigSchema standardConfigTypeToConfigSchema(PersistenceConfigType persistenceConfigType) {
switch (persistenceConfigType) {
case STANDARD_WORKSPACE:
return ConfigSchema.STANDARD_WORKSPACE;
@@ -259,13 +262,12 @@ public class DefaultConfigPersistence implements ConfigPersistence {
throws ConfigNotFoundException {
return getFile(persistenceConfigType, configId)
.orElseThrow(
() ->
new ConfigNotFoundException(
String.format(
"config type: %s id: %s not found in path %s",
persistenceConfigType,
configId,
getConfigPath(persistenceConfigType, configId))));
() -> new ConfigNotFoundException(
String.format(
"config type: %s id: %s not found in path %s",
persistenceConfigType,
configId,
getConfigPath(persistenceConfigType, configId))));
}
private void ensureDirectory(Path path) {
@@ -275,4 +277,5 @@ public class DefaultConfigPersistence implements ConfigPersistence {
throw new RuntimeException(e);
}
}
}

View File

@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class JsonSchemaValidation {
private final SchemaValidatorsConfig schemaValidatorsConfig;
private final JsonSchemaFactory jsonSchemaFactory;
@@ -62,4 +63,5 @@ public class JsonSchemaValidation {
configJson.toPrettyString()));
}
}
}

View File

@@ -25,6 +25,7 @@
package io.dataline.config.persistence;
public class JsonValidationException extends Exception {
public JsonValidationException(String message) {
super(message);
}
@@ -32,4 +33,5 @@ public class JsonValidationException extends Exception {
public JsonValidationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -27,8 +27,10 @@ package io.dataline.config.persistence;
import java.util.UUID;
public class PersistenceConstants {
public static String DEFAULT_TEST_ROOT = "/tmp/data/config/";
// for MVP we only support one workspace per deployment and we hard code its id.
public static UUID DEFAULT_WORKSPACE_ID = UUID.fromString("5ae6b09b-fdec-41af-aaf7-7d94cfc33ef6");
}

View File

@@ -191,4 +191,5 @@ class DefaultConfigPersistenceTest {
}
fail("expected to throw invalid json exception.");
}
}

View File

@@ -29,5 +29,7 @@ import org.jooq.DSLContext;
@FunctionalInterface
public interface ContextExecutionFunction {
void execute(DSLContext context) throws SQLException;
}

View File

@@ -29,5 +29,7 @@ import org.jooq.DSLContext;
@FunctionalInterface
public interface ContextQueryFunction<T> {
T apply(DSLContext context) throws SQLException;
}

View File

@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DatabaseHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHelper.class);
public static BasicDataSource getConnectionPoolFromEnv() {
@@ -43,8 +44,9 @@ public class DatabaseHelper {
System.getenv("DATABASE_URL"));
}
public static BasicDataSource getConnectionPool(
String username, String password, String jdbcConnectionString) {
public static BasicDataSource getConnectionPool(String username,
String password,
String jdbcConnectionString) {
BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName("org.postgresql.Driver");
@@ -67,8 +69,8 @@ public class DatabaseHelper {
}
}
public static void execute(
BasicDataSource connectionPool, ContextExecutionFunction executionFunction)
public static void execute(BasicDataSource connectionPool,
ContextExecutionFunction executionFunction)
throws SQLException {
query(
connectionPool,
@@ -77,4 +79,5 @@ public class DatabaseHelper {
return 1;
});
}
}

View File

@@ -33,9 +33,10 @@ import org.jooq.Record;
import org.jooq.Result;
/*
The server UUID identifies a specific database installation of Dataline for analytics purposes.
* The server UUID identifies a specific database installation of Dataline for analytics purposes.
*/
public class ServerUuid {
public static Optional<String> get(BasicDataSource connectionPool) throws SQLException {
return DatabaseHelper.query(
connectionPool,
@@ -51,4 +52,5 @@ public class ServerUuid {
}
});
}
}

View File

@@ -1,31 +1,55 @@
-- database
CREATE DATABASE dataline;
CREATE
DATABASE dataline;
\connect dataline;
-- extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE
EXTENSION IF NOT EXISTS "uuid-ossp";
-- types
CREATE TYPE JOB_STATUS AS ENUM ('pending', 'running', 'failed', 'completed', 'cancelled');
CREATE
TYPE JOB_STATUS AS ENUM(
'pending',
'running',
'failed',
'completed',
'cancelled'
);
-- tables
CREATE TABLE DATALINE_METADATA (key VARCHAR (255) PRIMARY KEY, value VARCHAR (255));
CREATE
TABLE
DATALINE_METADATA(
KEY VARCHAR(255) PRIMARY KEY,
value VARCHAR(255)
);
CREATE TABLE JOBS (
id BIGSERIAL PRIMARY KEY,
scope VARCHAR (255),
created_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
status JOB_STATUS,
config JSONB,
output JSONB,
stdout_path VARCHAR (255),
stderr_path VARCHAR (255)
);
CREATE
TABLE
JOBS(
id BIGSERIAL PRIMARY KEY,
SCOPE VARCHAR(255),
created_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ,
status JOB_STATUS,
config JSONB,
OUTPUT JSONB,
stdout_path VARCHAR(255),
stderr_path VARCHAR(255)
);
-- entries
INSERT INTO DATALINE_METADATA VALUES ('server-uuid', uuid_generate_v4());
INSERT
INTO
DATALINE_METADATA
VALUES(
'server-uuid',
uuid_generate_v4()
);
-- grants
GRANT ALL ON DATABASE dataline TO docker;
GRANT ALL ON
DATABASE dataline TO docker;

View File

@@ -37,6 +37,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
public class TestServerUuid {
private static PostgreSQLContainer container;
private static BasicDataSource connectionPool;
@@ -46,8 +47,7 @@ public class TestServerUuid {
new PostgreSQLContainer("postgres:13-alpine")
.withDatabaseName("dataline")
.withUsername("docker")
.withPassword("docker");
;
.withPassword("docker");;
container.start();
try {
@@ -82,4 +82,5 @@ public class TestServerUuid {
assertEquals(uuid1, uuid2);
}
}

View File

@@ -27,6 +27,7 @@ package io.dataline.integrations;
import java.util.UUID;
public enum Integrations {
POSTGRES_TAP(
UUID.fromString("2168516a-5c9a-4582-90dc-5e3a01e3f607"),
new IntegrationMapping("dataline/integration-singer-postgres-source")),
@@ -38,7 +39,7 @@ public enum Integrations {
private final IntegrationMapping integrationMapping;
// todo (cgardens) - turn this into a map if we have enough integrations that iterating through
// the enum becomes expensive.
// the enum becomes expensive.
public static Integrations findBySpecId(UUID specId) {
for (Integrations value : values()) {
if (value.getSpecId().equals(specId)) {
@@ -70,6 +71,7 @@ public enum Integrations {
}
public static class IntegrationMapping {
private final String checkConnection;
private final String discoverSchema;
private final String sync;
@@ -95,5 +97,7 @@ public enum Integrations {
public String getSync() {
return sync;
}
}
}

View File

@@ -36,16 +36,16 @@ import java.util.Set;
import java.util.UUID;
// todo (cgardens) - deduplicate this with the ConfigFetchers in dataline-server. requires creating
// a class that takes in an exception provider. also requires figuring out the dependency DAG to
// avoid circular dependency issues.
// a class that takes in an exception provider. also requires figuring out the dependency DAG to
// avoid circular dependency issues.
/**
* These helpers catch exceptions thrown in the config persistence and throws them as
* RuntimeExceptions
*/
public class ConfigFetchers {
public static SourceConnectionImplementation getSourceConnectionImplementation(
ConfigPersistence configPersistence, UUID sourceImplementationId) {
public static SourceConnectionImplementation getSourceConnectionImplementation(ConfigPersistence configPersistence,
UUID sourceImplementationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
@@ -58,8 +58,8 @@ public class ConfigFetchers {
}
}
public static DestinationConnectionImplementation getDestinationConnectionImplementation(
ConfigPersistence configPersistence, UUID destinationImplementationId) {
public static DestinationConnectionImplementation getDestinationConnectionImplementation(ConfigPersistence configPersistence,
UUID destinationImplementationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
@@ -73,8 +73,8 @@ public class ConfigFetchers {
}
}
public static StandardSync getStandardSync(
ConfigPersistence configPersistence, UUID connectionId) {
public static StandardSync getStandardSync(ConfigPersistence configPersistence,
UUID connectionId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC, connectionId.toString(), StandardSync.class);
@@ -93,8 +93,8 @@ public class ConfigFetchers {
}
}
public static StandardSyncSchedule getStandardSyncSchedule(
ConfigPersistence configPersistence, UUID connectionId) {
public static StandardSyncSchedule getStandardSyncSchedule(ConfigPersistence configPersistence,
UUID connectionId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
@@ -107,8 +107,9 @@ public class ConfigFetchers {
}
}
private static RuntimeException getConfigNotFoundException(
Throwable e, String configName, UUID id) {
private static RuntimeException getConfigNotFoundException(Throwable e,
String configName,
UUID id) {
return new RuntimeException(
String.format("Could not find sync configuration for %s: %s.", configName, id), e);
}
@@ -120,4 +121,5 @@ public class ConfigFetchers {
e.getMessage()),
e);
}
}

View File

@@ -82,8 +82,8 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
}
@Override
public long createDestinationCheckConnectionJob(
DestinationConnectionImplementation destinationImplementation) throws IOException {
public long createDestinationCheckConnectionJob(DestinationConnectionImplementation destinationImplementation)
throws IOException {
final String scope =
ScopeHelper.createScope(
JobConfig.ConfigType.CHECK_CONNECTION_DESTINATION,
@@ -127,10 +127,9 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
}
@Override
public long createSyncJob(
SourceConnectionImplementation sourceImplementation,
DestinationConnectionImplementation destinationImplementation,
StandardSync standardSync)
public long createSyncJob(SourceConnectionImplementation sourceImplementation,
DestinationConnectionImplementation destinationImplementation,
StandardSync standardSync)
throws IOException {
final UUID connectionId = standardSync.getConnectionId();
@@ -171,18 +170,17 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
try {
record =
DatabaseHelper.query(
connectionPool,
ctx ->
ctx.fetch(
"INSERT INTO jobs(scope, created_at, updated_at, status, config, output, stdout_path, stderr_path) VALUES(?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB), ?, ?, ?) RETURNING id",
scope,
now,
now,
JobStatus.PENDING.toString().toLowerCase(),
configJson,
null,
JobLogs.getLogDirectory(scope),
JobLogs.getLogDirectory(scope)))
connectionPool,
ctx -> ctx.fetch(
"INSERT INTO jobs(scope, created_at, updated_at, status, config, output, stdout_path, stderr_path) VALUES(?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB), ?, ?, ?) RETURNING id",
scope,
now,
now,
JobStatus.PENDING.toString().toLowerCase(),
configJson,
null,
JobLogs.getLogDirectory(scope),
JobLogs.getLogDirectory(scope)))
.stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("This should not happen"));
@@ -218,10 +216,9 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
String scope = ScopeHelper.createScope(configType, configId);
return DatabaseHelper.query(
connectionPool,
ctx ->
ctx.fetch("SELECT * FROM jobs WHERE scope = ?", scope).stream()
.map(DefaultSchedulerPersistence::getJobFromRecord)
.collect(Collectors.toList()));
ctx -> ctx.fetch("SELECT * FROM jobs WHERE scope = ?", scope).stream()
.map(DefaultSchedulerPersistence::getJobFromRecord)
.collect(Collectors.toList()));
} catch (SQLException e) {
throw new IOException(e);
}
@@ -252,4 +249,5 @@ public class DefaultSchedulerPersistence implements SchedulerPersistence {
private static long getEpoch(Record record, String fieldName) {
return record.getValue(fieldName, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC);
}
}

View File

@@ -42,17 +42,16 @@ public class Job {
private final String stdoutPath;
private final String stderrPath;
public Job(
long id,
String scope,
JobStatus status,
JobConfig config,
@Nullable JobOutput output,
String stdoutPath,
String stderrPath,
long createdAt,
@Nullable Long startedAt,
long updatedAt) {
public Job(long id,
String scope,
JobStatus status,
JobConfig config,
@Nullable JobOutput output,
String stdoutPath,
String stderrPath,
long createdAt,
@Nullable Long startedAt,
long updatedAt) {
this.id = id;
this.scope = scope;
this.status = status;
@@ -134,4 +133,5 @@ public class Job {
+ '\''
+ '}';
}
}

View File

@@ -27,10 +27,12 @@ package io.dataline.scheduler;
import java.nio.file.Paths;
public class JobLogs {
public static String ROOT_PATH = "logs/jobs";
// todo: add scoping by job id
public static String getLogDirectory(String scope) {
return Paths.get(ROOT_PATH, scope).toString();
}
}

View File

@@ -46,10 +46,9 @@ public class JobScheduler implements Runnable {
private final SchedulerPersistence schedulerPersistence;
private final ConfigPersistence configPersistence;
public JobScheduler(
BasicDataSource connectionPool,
SchedulerPersistence schedulerPersistence,
ConfigPersistence configPersistence) {
public JobScheduler(BasicDataSource connectionPool,
SchedulerPersistence schedulerPersistence,
ConfigPersistence configPersistence) {
this.connectionPool = connectionPool;
this.schedulerPersistence = schedulerPersistence;
this.configPersistence = configPersistence;
@@ -102,7 +101,7 @@ public class JobScheduler implements Runnable {
schedulerPersistence, configPersistence, connectionId);
}
break;
// todo (cgardens) - add max retry concept
// todo (cgardens) - add max retry concept
case FAILED:
JobUtils.createSyncJobFromConnectionId(
schedulerPersistence, configPersistence, connectionId);
@@ -138,4 +137,5 @@ public class JobScheduler implements Runnable {
private Set<StandardSync> getAllActiveConnections() {
return ConfigFetchers.getStandardSyncs(configPersistence);
}
}

View File

@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import java.util.Set;
public enum JobStatus {
PENDING,
RUNNING,
FAILED,
@@ -35,4 +36,5 @@ public enum JobStatus {
CANCELLED;
public static Set<JobStatus> TERMINAL_STATUSES = Sets.newHashSet(FAILED, COMPLETED, CANCELLED);
}

View File

@@ -45,12 +45,11 @@ public class JobSubmitter implements Runnable {
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public JobSubmitter(
final ExecutorService threadPool,
final BasicDataSource connectionPool,
final SchedulerPersistence persistence,
final Path workspaceRoot,
final ProcessBuilderFactory pbf) {
public JobSubmitter(final ExecutorService threadPool,
final BasicDataSource connectionPool,
final SchedulerPersistence persistence,
final Path workspaceRoot,
final ProcessBuilderFactory pbf) {
this.threadPool = threadPool;
this.connectionPool = connectionPool;
this.persistence = persistence;
@@ -97,4 +96,5 @@ public class JobSubmitter implements Runnable {
threadPool.submit(
new WorkerRunner(job.getId(), connectionPool, persistence, workspaceRoot, pbf));
}
}

View File

@@ -37,10 +37,10 @@ import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.Record;
public class JobUtils {
public static long createSyncJobFromConnectionId(
SchedulerPersistence schedulerPersistence,
ConfigPersistence configPersistence,
UUID connectionId) {
public static long createSyncJobFromConnectionId(SchedulerPersistence schedulerPersistence,
ConfigPersistence configPersistence,
UUID connectionId) {
final StandardSync standardSync;
standardSync = ConfigFetchers.getStandardSync(configPersistence, connectionId);
@@ -59,8 +59,9 @@ public class JobUtils {
}
}
public static Optional<Job> getLastSyncJobForConnectionId(
BasicDataSource connectionPool, UUID connectionId) throws IOException {
public static Optional<Job> getLastSyncJobForConnectionId(BasicDataSource connectionPool,
UUID connectionId)
throws IOException {
try {
return DatabaseHelper.query(
connectionPool,
@@ -86,4 +87,5 @@ public class JobUtils {
throw new IOException(throwables);
}
}
}

View File

@@ -62,11 +62,10 @@ public class SchedulerApp {
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public SchedulerApp(
BasicDataSource connectionPool,
Path configRoot,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
public SchedulerApp(BasicDataSource connectionPool,
Path configRoot,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
this.connectionPool = connectionPool;
this.configRoot = configRoot;
this.workspaceRoot = workspaceRoot;
@@ -119,4 +118,5 @@ public class SchedulerApp {
LOGGER.info("Launching scheduler...");
new SchedulerApp(connectionPool, configRoot, workspaceRoot, pbf).start();
}
}

View File

@@ -32,22 +32,23 @@ import java.io.IOException;
import java.util.List;
public interface SchedulerPersistence {
long createSourceCheckConnectionJob(SourceConnectionImplementation sourceImplementation)
throws IOException;
long createDestinationCheckConnectionJob(
DestinationConnectionImplementation destinationImplementation) throws IOException;
long createDestinationCheckConnectionJob(DestinationConnectionImplementation destinationImplementation)
throws IOException;
long createDiscoverSchemaJob(SourceConnectionImplementation sourceImplementation)
throws IOException;
long createSyncJob(
SourceConnectionImplementation sourceImplementation,
DestinationConnectionImplementation destinationImplementation,
StandardSync standardSync)
long createSyncJob(SourceConnectionImplementation sourceImplementation,
DestinationConnectionImplementation destinationImplementation,
StandardSync standardSync)
throws IOException;
Job getJob(long jobId) throws IOException;
List<Job> listJobs(JobConfig.ConfigType configType, String configId) throws IOException;
}

View File

@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulerShutdownHandler extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerShutdownHandler.class);
private ExecutorService[] threadPools;
@@ -51,4 +52,5 @@ public class SchedulerShutdownHandler extends Thread {
}
}
}
}

View File

@@ -27,6 +27,7 @@ package io.dataline.scheduler;
import io.dataline.config.JobConfig;
public class ScopeHelper {
public static String createScope(JobConfig.ConfigType configType, String configId) {
return configType.value() + ":" + configId;
}
@@ -34,4 +35,5 @@ public class ScopeHelper {
public static String getConfigId(String scope) {
return scope.split(":")[1];
}
}

View File

@@ -43,8 +43,9 @@ import org.slf4j.LoggerFactory;
* outputs are passed to the selected worker. It also makes sures that the outputs of the worker are
* persisted to the db.
*
* <p>todo (cgardens) - this line between this abstraction and WorkerRunner is a little blurry. we
* can clarify it later. the main benefit is of this class is that it gives us some type safety when
* <p>
* todo (cgardens) - this line between this abstraction and WorkerRunner is a little blurry. we can
* clarify it later. the main benefit is of this class is that it gives us some type safety when
* working with workers. you can probably make an argument that this class should not have access to
* the db.
*
@@ -61,12 +62,11 @@ public class WorkerRun<InputType, OutputType> implements Runnable {
private final Worker<InputType, OutputType> worker;
private final BasicDataSource connectionPool;
public WorkerRun(
long jobId,
Path jobRoot,
InputType input,
Worker<InputType, OutputType> worker,
BasicDataSource connectionPool) {
public WorkerRun(long jobId,
Path jobRoot,
InputType input,
Worker<InputType, OutputType> worker,
BasicDataSource connectionPool) {
this.jobId = jobId;
this.jobRoot = jobRoot;
this.input = input;
@@ -117,12 +117,11 @@ public class WorkerRun<InputType, OutputType> implements Runnable {
try {
DatabaseHelper.query(
connectionPool,
ctx ->
ctx.execute(
"UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?",
status.toString().toLowerCase(),
now,
jobId));
ctx -> ctx.execute(
"UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?",
status.toString().toLowerCase(),
now,
jobId));
} catch (SQLException e) {
LOGGER.error("SQL Error", e);
throw new RuntimeException(e);
@@ -135,15 +134,15 @@ public class WorkerRun<InputType, OutputType> implements Runnable {
try {
DatabaseHelper.query(
connectionPool,
ctx ->
ctx.execute(
"UPDATE jobs SET output = CAST(? as JSONB), updated_at = ? WHERE id = ?",
outputJson,
now,
jobId));
ctx -> ctx.execute(
"UPDATE jobs SET output = CAST(? as JSONB), updated_at = ? WHERE id = ?",
outputJson,
now,
jobId));
} catch (SQLException e) {
LOGGER.error("SQL Error", e);
throw new RuntimeException(e);
}
}
}

View File

@@ -52,12 +52,11 @@ public class WorkerRunner implements Runnable {
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public WorkerRunner(
long jobId,
BasicDataSource connectionPool,
SchedulerPersistence persistence,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
public WorkerRunner(long jobId,
BasicDataSource connectionPool,
SchedulerPersistence persistence,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
this.jobId = jobId;
this.connectionPool = connectionPool;
this.persistence = persistence;
@@ -82,58 +81,56 @@ public class WorkerRunner implements Runnable {
final StandardCheckConnectionInput checkConnectionInput =
getCheckConnectionInput(job.getConfig().getCheckConnection());
new WorkerRun<>(
jobId,
jobRoot,
checkConnectionInput,
new SingerCheckConnectionWorker(
job.getConfig().getCheckConnection().getDockerImage(), pbf),
connectionPool)
.run();
jobId,
jobRoot,
checkConnectionInput,
new SingerCheckConnectionWorker(
job.getConfig().getCheckConnection().getDockerImage(), pbf),
connectionPool)
.run();
break;
case DISCOVER_SCHEMA:
final StandardDiscoverSchemaInput discoverSchemaInput =
getDiscoverSchemaInput(job.getConfig().getDiscoverSchema());
new WorkerRun<>(
jobId,
jobRoot,
discoverSchemaInput,
new SingerDiscoverSchemaWorker(
job.getConfig().getDiscoverSchema().getDockerImage(), pbf),
connectionPool)
.run();
jobId,
jobRoot,
discoverSchemaInput,
new SingerDiscoverSchemaWorker(
job.getConfig().getDiscoverSchema().getDockerImage(), pbf),
connectionPool)
.run();
break;
case SYNC:
final StandardSyncInput syncInput = getSyncInput(job.getConfig().getSync());
new WorkerRun<>(
jobId,
jobRoot,
syncInput,
// todo (cgardens) - still locked into only using SingerTaps and Targets. Next step
// here is to create DefaultTap and DefaultTarget which will be able to
// interoperate with SingerTap and SingerTarget now that they are split and
// mediated in DefaultSyncWorker.
new DefaultSyncWorker(
new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage(), pbf),
new SingerTargetFactory(
job.getConfig().getSync().getDestinationDockerImage(), pbf)),
connectionPool)
.run();
jobId,
jobRoot,
syncInput,
// todo (cgardens) - still locked into only using SingerTaps and Targets. Next step
// here is to create DefaultTap and DefaultTarget which will be able to
// interoperate with SingerTap and SingerTarget now that they are split and
// mediated in DefaultSyncWorker.
new DefaultSyncWorker(
new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage(), pbf),
new SingerTargetFactory(
job.getConfig().getSync().getDestinationDockerImage(), pbf)),
connectionPool)
.run();
break;
default:
throw new RuntimeException("Unexpected config type: " + job.getConfig().getConfigType());
}
}
private static StandardCheckConnectionInput getCheckConnectionInput(
JobCheckConnectionConfig config) {
private static StandardCheckConnectionInput getCheckConnectionInput(JobCheckConnectionConfig config) {
final StandardCheckConnectionInput checkConnectionInput = new StandardCheckConnectionInput();
checkConnectionInput.setConnectionConfigurationJson(config.getConnectionConfigurationJson());
return checkConnectionInput;
}
private static StandardDiscoverSchemaInput getDiscoverSchemaInput(
JobDiscoverSchemaConfig config) {
private static StandardDiscoverSchemaInput getDiscoverSchemaInput(JobDiscoverSchemaConfig config) {
final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput();
discoverSchemaInput.setConnectionConfigurationJson(config.getConnectionConfigurationJson());
@@ -149,4 +146,5 @@ public class WorkerRunner implements Runnable {
return syncInput;
}
}

View File

@@ -52,4 +52,5 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
public void dispose(ConfigurationApi service) {
/* noop */
}
}

View File

@@ -36,10 +36,11 @@ public class CorsFilter implements ContainerResponseFilter {
private static final String ALLOW_METHODS = "Access-Control-Allow-Methods";
@Override
public void filter(
ContainerRequestContext requestContext, ContainerResponseContext responseContext) {
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) {
responseContext.getHeaders().add(ALLOW_ORIGIN, "*");
responseContext.getHeaders().add(ALLOW_HEADERS, "Origin, Content-Type, Accept");
responseContext.getHeaders().add(ALLOW_METHODS, "GET, POST, PUT, DELETE, OPTIONS, HEAD");
}
}

View File

@@ -71,19 +71,21 @@ public class ServerApp {
ResourceConfig rc =
new ResourceConfig()
// todo (cgardens) - the CORs settings are wide open. will need to revisit when we add
// auth.
// auth.
// cors
.register(new CorsFilter())
// api
.register(ConfigurationApi.class)
.register(
new AbstractBinder() {
@Override
public void configure() {
bindFactory(ConfigurationApiFactory.class)
.to(ConfigurationApi.class)
.in(RequestScoped.class);
}
})
// exception handling
.register(InvalidInputExceptionMapper.class)
@@ -122,4 +124,5 @@ public class ServerApp {
LOGGER.info("Starting server...");
new ServerApp(configRoot).start();
}
}

View File

@@ -79,6 +79,7 @@ import org.apache.commons.dbcp2.BasicDataSource;
@javax.ws.rs.Path("/v1")
public class ConfigurationApi implements io.dataline.api.V1Api {
private final WorkspacesHandler workspacesHandler;
private final SourcesHandler sourcesHandler;
private final SourceSpecificationsHandler sourceSpecificationsHandler;
@@ -142,53 +143,45 @@ public class ConfigurationApi implements io.dataline.api.V1Api {
// SOURCE SPECIFICATION
@Override
public SourceSpecificationRead getSourceSpecification(
@Valid SourceIdRequestBody sourceIdRequestBody) {
public SourceSpecificationRead getSourceSpecification(@Valid SourceIdRequestBody sourceIdRequestBody) {
return sourceSpecificationsHandler.getSourceSpecification(sourceIdRequestBody);
}
// SOURCE IMPLEMENTATION
@Override
public SourceImplementationRead createSourceImplementation(
@Valid SourceImplementationCreate sourceImplementationCreate) {
public SourceImplementationRead createSourceImplementation(@Valid SourceImplementationCreate sourceImplementationCreate) {
return sourceImplementationsHandler.createSourceImplementation(sourceImplementationCreate);
}
@Override
public SourceImplementationRead updateSourceImplementation(
@Valid SourceImplementationUpdate sourceImplementationUpdate) {
public SourceImplementationRead updateSourceImplementation(@Valid SourceImplementationUpdate sourceImplementationUpdate) {
return sourceImplementationsHandler.updateSourceImplementation(sourceImplementationUpdate);
}
@Override
public SourceImplementationReadList listSourceImplementationsForWorkspace(
@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
public SourceImplementationReadList listSourceImplementationsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
return sourceImplementationsHandler.listSourceImplementationsForWorkspace(
workspaceIdRequestBody);
}
@Override
public SourceImplementationRead getSourceImplementation(
@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public SourceImplementationRead getSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
return sourceImplementationsHandler.getSourceImplementation(sourceImplementationIdRequestBody);
}
@Override
public void deleteSourceImplementation(
@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public void deleteSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
sourceImplementationsHandler.deleteSourceImplementation(sourceImplementationIdRequestBody);
}
@Override
public CheckConnectionRead checkConnectionToSourceImplementation(
@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public CheckConnectionRead checkConnectionToSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
return schedulerHandler.checkSourceImplementationConnection(sourceImplementationIdRequestBody);
}
@Override
public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(
@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
return schedulerHandler.discoverSchemaForSourceImplementation(
sourceImplementationIdRequestBody);
}
@@ -208,43 +201,37 @@ public class ConfigurationApi implements io.dataline.api.V1Api {
// DESTINATION SPECIFICATION
@Override
public DestinationSpecificationRead getDestinationSpecification(
@Valid DestinationIdRequestBody destinationIdRequestBody) {
public DestinationSpecificationRead getDestinationSpecification(@Valid DestinationIdRequestBody destinationIdRequestBody) {
return destinationSpecificationsHandler.getDestinationSpecification(destinationIdRequestBody);
}
// DESTINATION IMPLEMENTATION
@Override
public DestinationImplementationRead createDestinationImplementation(
@Valid DestinationImplementationCreate destinationImplementationCreate) {
public DestinationImplementationRead createDestinationImplementation(@Valid DestinationImplementationCreate destinationImplementationCreate) {
return destinationImplementationsHandler.createDestinationImplementation(
destinationImplementationCreate);
}
@Override
public DestinationImplementationRead updateDestinationImplementation(
@Valid DestinationImplementationUpdate destinationImplementationUpdate) {
public DestinationImplementationRead updateDestinationImplementation(@Valid DestinationImplementationUpdate destinationImplementationUpdate) {
return destinationImplementationsHandler.updateDestinationImplementation(
destinationImplementationUpdate);
}
@Override
public DestinationImplementationReadList listDestinationImplementationsForWorkspace(
@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
public DestinationImplementationReadList listDestinationImplementationsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
return destinationImplementationsHandler.listDestinationImplementationsForWorkspace(
workspaceIdRequestBody);
}
@Override
public DestinationImplementationRead getDestinationImplementation(
@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
public DestinationImplementationRead getDestinationImplementation(@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
return destinationImplementationsHandler.getDestinationImplementation(
destinationImplementationIdRequestBody);
}
@Override
public CheckConnectionRead checkConnectionToDestinationImplementation(
@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
public CheckConnectionRead checkConnectionToDestinationImplementation(@Valid DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
return schedulerHandler.checkDestinationImplementationConnection(
destinationImplementationIdRequestBody);
}
@@ -262,8 +249,7 @@ public class ConfigurationApi implements io.dataline.api.V1Api {
}
@Override
public ConnectionReadList listConnectionsForWorkspace(
@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
public ConnectionReadList listConnectionsForWorkspace(@Valid WorkspaceIdRequestBody workspaceIdRequestBody) {
return connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody);
}
@@ -288,4 +274,5 @@ public class ConfigurationApi implements io.dataline.api.V1Api {
public JobInfoRead getJobInfo(@Valid JobIdRequestBody jobIdRequestBody) {
return jobHistoryHandler.getJobInfo(jobIdRequestBody);
}
}

View File

@@ -36,6 +36,7 @@ import java.util.List;
import java.util.stream.Collectors;
public class SchemaConverter {
public static Schema toPersistenceSchema(SourceSchema api) {
final List<Table> persistenceTables =
api.getTables().stream()
@@ -108,4 +109,5 @@ public class SchemaConverter {
public static io.dataline.api.model.DataType toApiDataType(DataType persistenceDataType) {
return Enums.convertTo(persistenceDataType, io.dataline.api.model.DataType.class);
}
}

View File

@@ -64,4 +64,5 @@ public class InvalidInputExceptionMapper implements ExceptionMapper<ConstraintVi
}
return message.toString();
}
}

View File

@@ -43,4 +43,5 @@ public class InvalidJsonExceptionMapper implements ExceptionMapper<JsonParseExce
.type("application/json")
.build();
}
}

View File

@@ -43,4 +43,5 @@ public class InvalidJsonInputExceptionMapper implements ExceptionMapper<JsonMapp
.type("application/json")
.build();
}
}

View File

@@ -25,6 +25,7 @@
package io.dataline.server.errors;
public class KnownException extends RuntimeException {
private final int httpCode;
public KnownException(int httpCode, String message) {
@@ -40,4 +41,5 @@ public class KnownException extends RuntimeException {
public int getHttpCode() {
return httpCode;
}
}

View File

@@ -45,4 +45,5 @@ public class KnownExceptionMapper implements ExceptionMapper<KnownException> {
.type("application/json")
.build();
}
}

View File

@@ -45,4 +45,5 @@ public class UncaughtExceptionMapper implements ExceptionMapper<Throwable> {
.type("application/json")
.build();
}
}

View File

@@ -151,8 +151,7 @@ public class ConnectionsHandler {
}
// todo (cgardens) - this is a disaster without a relational db.
public ConnectionReadList listConnectionsForWorkspace(
WorkspaceIdRequestBody workspaceIdRequestBody) {
public ConnectionReadList listConnectionsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) {
final List<ConnectionRead> reads =
// read all connections.
@@ -160,11 +159,10 @@ public class ConnectionsHandler {
// filter out connections attached to source implementations NOT associated with the
// workspace
.filter(
standardSync ->
ConfigFetchers.getSourceConnectionImplementation(
configPersistence, standardSync.getSourceImplementationId())
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
standardSync -> ConfigFetchers.getSourceConnectionImplementation(
configPersistence, standardSync.getSourceImplementationId())
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
// pull the sync schedule
// convert to api format
@@ -202,8 +200,8 @@ public class ConnectionsHandler {
return ConfigFetchers.getStandardSyncSchedule(configPersistence, connectionId);
}
private ConnectionRead toConnectionRead(
StandardSync standardSync, StandardSyncSchedule standardSyncSchedule) {
private ConnectionRead toConnectionRead(StandardSync standardSync,
StandardSyncSchedule standardSyncSchedule) {
final ConnectionSchedule apiSchedule;
standardSyncSchedule.setConnectionId(standardSyncSchedule.getConnectionId());
@@ -247,4 +245,5 @@ public class ConnectionsHandler {
private ConnectionSchedule.TimeUnitEnum toApiTimeUnit(Schedule.TimeUnit apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class);
}
}

View File

@@ -48,23 +48,20 @@ public class DestinationImplementationsHandler {
private final ConfigPersistence configPersistence;
private final IntegrationSchemaValidation validator;
public DestinationImplementationsHandler(
ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation,
Supplier<UUID> uuidGenerator) {
public DestinationImplementationsHandler(ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation,
Supplier<UUID> uuidGenerator) {
this.configPersistence = configPersistence;
this.validator = integrationSchemaValidation;
this.uuidGenerator = uuidGenerator;
}
public DestinationImplementationsHandler(
ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation) {
public DestinationImplementationsHandler(ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation) {
this(configPersistence, integrationSchemaValidation, UUID::randomUUID);
}
public DestinationImplementationRead createDestinationImplementation(
DestinationImplementationCreate destinationImplementationCreate) {
public DestinationImplementationRead createDestinationImplementation(DestinationImplementationCreate destinationImplementationCreate) {
// validate configuration
validateDestinationImplementation(
destinationImplementationCreate.getDestinationSpecificationId(),
@@ -82,8 +79,7 @@ public class DestinationImplementationsHandler {
return getDestinationImplementationInternal(destinationImplementationId);
}
public DestinationImplementationRead updateDestinationImplementation(
DestinationImplementationUpdate destinationImplementationUpdate) {
public DestinationImplementationRead updateDestinationImplementation(DestinationImplementationUpdate destinationImplementationUpdate) {
// get existing implementation
final DestinationImplementationRead persistedDestinationImplementation =
getDestinationImplementationInternal(
@@ -106,28 +102,25 @@ public class DestinationImplementationsHandler {
destinationImplementationUpdate.getDestinationImplementationId());
}
public DestinationImplementationRead getDestinationImplementation(
DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
public DestinationImplementationRead getDestinationImplementation(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
return getDestinationImplementationInternal(
destinationImplementationIdRequestBody.getDestinationImplementationId());
}
public DestinationImplementationReadList listDestinationImplementationsForWorkspace(
WorkspaceIdRequestBody workspaceIdRequestBody) {
public DestinationImplementationReadList listDestinationImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) {
final List<DestinationImplementationRead> reads =
ConfigFetchers.getDestinationConnectionImplementations(configPersistence).stream()
.filter(
destinationConnectionImplementation ->
destinationConnectionImplementation
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
destinationConnectionImplementation -> destinationConnectionImplementation
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
.map(
destinationConnectionImplementation -> {
final UUID destinationId =
ConfigFetchers.getDestinationConnectionSpecification(
configPersistence,
destinationConnectionImplementation.getDestinationSpecificationId())
configPersistence,
destinationConnectionImplementation.getDestinationSpecificationId())
.getDestinationId();
return toDestinationImplementationRead(
destinationConnectionImplementation, destinationId);
@@ -140,8 +133,7 @@ public class DestinationImplementationsHandler {
return destinationImplementationReadList;
}
private DestinationImplementationRead getDestinationImplementationInternal(
UUID destinationImplementationId) {
private DestinationImplementationRead getDestinationImplementationInternal(UUID destinationImplementationId) {
// read configuration from db
final DestinationConnectionImplementation retrievedDestinationConnectionImplementation;
retrievedDestinationConnectionImplementation =
@@ -150,19 +142,17 @@ public class DestinationImplementationsHandler {
final UUID destinationId =
ConfigFetchers.getDestinationConnectionSpecification(
configPersistence,
retrievedDestinationConnectionImplementation.getDestinationSpecificationId())
configPersistence,
retrievedDestinationConnectionImplementation.getDestinationSpecificationId())
.getDestinationId();
return toDestinationImplementationRead(
retrievedDestinationConnectionImplementation, destinationId);
}
private void validateDestinationImplementation(
UUID destinationConnectionSpecificationId, String implementationJson) {
private void validateDestinationImplementation(UUID destinationConnectionSpecificationId, String implementationJson) {
try {
validator.validateDestinationConnectionConfiguration(
destinationConnectionSpecificationId, implementationJson);
validator.validateDestinationConnectionConfiguration(destinationConnectionSpecificationId, implementationJson);
} catch (JsonValidationException e) {
throw new KnownException(
422,
@@ -172,11 +162,10 @@ public class DestinationImplementationsHandler {
}
}
private void persistDestinationConnectionImplementation(
UUID destinationSpecificationId,
UUID workspaceId,
UUID destinationImplementationId,
String configurationJson) {
private void persistDestinationConnectionImplementation(UUID destinationSpecificationId,
UUID workspaceId,
UUID destinationImplementationId,
String configurationJson) {
final DestinationConnectionImplementation destinationConnectionImplementation =
new DestinationConnectionImplementation();
destinationConnectionImplementation.setDestinationSpecificationId(destinationSpecificationId);
@@ -191,8 +180,8 @@ public class DestinationImplementationsHandler {
destinationConnectionImplementation);
}
private DestinationImplementationRead toDestinationImplementationRead(
DestinationConnectionImplementation destinationConnectionImplementation, UUID destinationId) {
private DestinationImplementationRead toDestinationImplementationRead(DestinationConnectionImplementation destinationConnectionImplementation,
UUID destinationId) {
final DestinationImplementationRead destinationImplementationRead =
new DestinationImplementationRead();
destinationImplementationRead.setDestinationId(destinationId);
@@ -207,4 +196,5 @@ public class DestinationImplementationsHandler {
return destinationImplementationRead;
}
}

View File

@@ -39,33 +39,29 @@ public class DestinationSpecificationsHandler {
this.configPersistence = configPersistence;
}
public DestinationSpecificationRead getDestinationSpecification(
DestinationIdRequestBody destinationIdRequestBody) {
public DestinationSpecificationRead getDestinationSpecification(DestinationIdRequestBody destinationIdRequestBody) {
final DestinationConnectionSpecification destinationConnection;
// todo (cgardens) - this is a shortcoming of rolling our own disk storage. since we are not
// querying on a the primary key, we have to list all of the specification objects and then
// filter.
// querying on a the primary key, we have to list all of the specification objects and then
// filter.
destinationConnection =
ConfigFetchers.getDestinationConnectionSpecifications(configPersistence).stream()
.filter(
destinationSpecification ->
destinationSpecification
.getDestinationId()
.equals(destinationIdRequestBody.getDestinationId()))
destinationSpecification -> destinationSpecification
.getDestinationId()
.equals(destinationIdRequestBody.getDestinationId()))
.findFirst()
.orElseThrow(
() ->
new KnownException(
404,
String.format(
"Could not find a destination specification for destination: %s",
destinationIdRequestBody.getDestinationId())));
() -> new KnownException(
404,
String.format(
"Could not find a destination specification for destination: %s",
destinationIdRequestBody.getDestinationId())));
return toDestinationSpecificationRead(destinationConnection);
}
private static DestinationSpecificationRead toDestinationSpecificationRead(
DestinationConnectionSpecification destinationConnectionSpecification) {
private static DestinationSpecificationRead toDestinationSpecificationRead(DestinationConnectionSpecification destinationConnectionSpecification) {
final DestinationSpecificationRead destinationSpecificationRead =
new DestinationSpecificationRead();
destinationSpecificationRead.setDestinationId(
@@ -77,4 +73,5 @@ public class DestinationSpecificationsHandler {
return destinationSpecificationRead;
}
}

View File

@@ -35,6 +35,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
public class DestinationsHandler {
private final ConfigPersistence configPersistence;
public DestinationsHandler(ConfigPersistence configPersistence) {
@@ -67,4 +68,5 @@ public class DestinationsHandler {
return destinationRead;
}
}

View File

@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.input.ReversedLinesFileReader;
public class JobHistoryHandler {
private static final int LOG_TAIL_SIZE = 10;
private final SchedulerPersistence schedulerPersistence;
@@ -131,4 +132,5 @@ public class JobHistoryHandler {
return jobRead;
}
}

View File

@@ -49,20 +49,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private final ConfigPersistence configPersistence;
private final SchedulerPersistence schedulerPersistence;
public SchedulerHandler(
ConfigPersistence configPersistence, SchedulerPersistence schedulerPersistence) {
public SchedulerHandler(ConfigPersistence configPersistence,
SchedulerPersistence schedulerPersistence) {
this.configPersistence = configPersistence;
this.schedulerPersistence = schedulerPersistence;
}
public CheckConnectionRead checkSourceImplementationConnection(
SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public CheckConnectionRead checkSourceImplementationConnection(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
final SourceConnectionImplementation connectionImplementation =
ConfigFetchers.getSourceConnectionImplementation(
@@ -80,8 +80,7 @@ public class SchedulerHandler {
return reportConnectionStatus(job);
}
public CheckConnectionRead checkDestinationImplementationConnection(
DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
public CheckConnectionRead checkDestinationImplementationConnection(DestinationImplementationIdRequestBody destinationImplementationIdRequestBody) {
final DestinationConnectionImplementation connectionImplementation =
ConfigFetchers.getDestinationConnectionImplementation(
@@ -100,8 +99,7 @@ public class SchedulerHandler {
return reportConnectionStatus(job);
}
public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(
SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
final SourceConnectionImplementation connectionImplementation =
ConfigFetchers.getSourceConnectionImplementation(
configPersistence, sourceImplementationIdRequestBody.getSourceImplementationId());
@@ -131,7 +129,8 @@ public class SchedulerHandler {
}
public ConnectionSyncRead syncConnection(ConnectionIdRequestBody connectionIdRequestBody) {
@NotNull final UUID connectionId = connectionIdRequestBody.getConnectionId();
@NotNull
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync;
standardSync = ConfigFetchers.getStandardSync(configPersistence, connectionId);
@@ -204,4 +203,5 @@ public class SchedulerHandler {
return checkConnectionRead;
}
}

View File

@@ -48,23 +48,20 @@ public class SourceImplementationsHandler {
private final ConfigPersistence configPersistence;
private final IntegrationSchemaValidation validator;
public SourceImplementationsHandler(
ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation,
Supplier<UUID> uuidGenerator) {
public SourceImplementationsHandler(ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation,
Supplier<UUID> uuidGenerator) {
this.configPersistence = configPersistence;
this.validator = integrationSchemaValidation;
this.uuidGenerator = uuidGenerator;
}
public SourceImplementationsHandler(
ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation) {
public SourceImplementationsHandler(ConfigPersistence configPersistence,
IntegrationSchemaValidation integrationSchemaValidation) {
this(configPersistence, integrationSchemaValidation, UUID::randomUUID);
}
public SourceImplementationRead createSourceImplementation(
SourceImplementationCreate sourceImplementationCreate) {
public SourceImplementationRead createSourceImplementation(SourceImplementationCreate sourceImplementationCreate) {
// validate configuration
validateSourceImplementation(
sourceImplementationCreate.getSourceSpecificationId(),
@@ -83,8 +80,7 @@ public class SourceImplementationsHandler {
return getSourceImplementationReadInternal(sourceImplementationId);
}
public SourceImplementationRead updateSourceImplementation(
SourceImplementationUpdate sourceImplementationUpdate) {
public SourceImplementationRead updateSourceImplementation(SourceImplementationUpdate sourceImplementationUpdate) {
// get existing implementation
final SourceConnectionImplementation persistedSourceImplementation =
getSourceConnectionImplementationInternal(
@@ -108,29 +104,26 @@ public class SourceImplementationsHandler {
sourceImplementationUpdate.getSourceImplementationId());
}
public SourceImplementationRead getSourceImplementation(
SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public SourceImplementationRead getSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
return getSourceImplementationReadInternal(
sourceImplementationIdRequestBody.getSourceImplementationId());
}
public SourceImplementationReadList listSourceImplementationsForWorkspace(
WorkspaceIdRequestBody workspaceIdRequestBody) {
public SourceImplementationReadList listSourceImplementationsForWorkspace(WorkspaceIdRequestBody workspaceIdRequestBody) {
final List<SourceImplementationRead> reads =
ConfigFetchers.getSourceConnectionImplementations(configPersistence).stream()
.filter(
sourceConnectionImplementation ->
sourceConnectionImplementation
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
sourceConnectionImplementation -> sourceConnectionImplementation
.getWorkspaceId()
.equals(workspaceIdRequestBody.getWorkspaceId()))
.map(
sourceConnectionImplementation -> {
final UUID sourceId =
ConfigFetchers.getSourceConnectionSpecification(
configPersistence,
sourceConnectionImplementation.getSourceSpecificationId())
configPersistence,
sourceConnectionImplementation.getSourceSpecificationId())
.getSourceId();
return toSourceImplementationRead(sourceConnectionImplementation, sourceId);
})
@@ -142,8 +135,7 @@ public class SourceImplementationsHandler {
return sourceImplementationReadList;
}
public void deleteSourceImplementation(
SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
public void deleteSourceImplementation(SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
// get existing implementation
final SourceImplementationRead persistedSourceImplementation =
getSourceImplementationReadInternal(
@@ -158,29 +150,26 @@ public class SourceImplementationsHandler {
(String) persistedSourceImplementation.getConnectionConfiguration());
}
private SourceConnectionImplementation getSourceConnectionImplementationInternal(
UUID sourceImplementationId) {
private SourceConnectionImplementation getSourceConnectionImplementationInternal(UUID sourceImplementationId) {
return ConfigFetchers.getSourceConnectionImplementation(
configPersistence, sourceImplementationId);
}
private SourceImplementationRead getSourceImplementationReadInternal(
UUID sourceImplementationId) {
private SourceImplementationRead getSourceImplementationReadInternal(UUID sourceImplementationId) {
// read configuration from db
final SourceConnectionImplementation retrievedSourceConnectionImplementation =
getSourceConnectionImplementationInternal(sourceImplementationId);
final UUID sourceId =
ConfigFetchers.getSourceConnectionSpecification(
configPersistence,
retrievedSourceConnectionImplementation.getSourceSpecificationId())
configPersistence,
retrievedSourceConnectionImplementation.getSourceSpecificationId())
.getSourceId();
return toSourceImplementationRead(retrievedSourceConnectionImplementation, sourceId);
}
private void validateSourceImplementation(
UUID sourceConnectionSpecificationId, String implementationJson) {
private void validateSourceImplementation(UUID sourceConnectionSpecificationId, String implementationJson) {
try {
validator.validateSourceConnectionConfiguration(
sourceConnectionSpecificationId, implementationJson);
@@ -193,12 +182,11 @@ public class SourceImplementationsHandler {
}
}
private void persistSourceConnectionImplementation(
UUID sourceSpecificationId,
UUID workspaceId,
UUID sourceImplementationId,
boolean tombstone,
String configurationJson) {
private void persistSourceConnectionImplementation(UUID sourceSpecificationId,
UUID workspaceId,
UUID sourceImplementationId,
boolean tombstone,
String configurationJson) {
final SourceConnectionImplementation sourceConnectionImplementation =
new SourceConnectionImplementation();
sourceConnectionImplementation.setSourceSpecificationId(sourceSpecificationId);
@@ -214,8 +202,7 @@ public class SourceImplementationsHandler {
sourceConnectionImplementation);
}
private SourceImplementationRead toSourceImplementationRead(
SourceConnectionImplementation sourceConnectionImplementation, UUID sourceId) {
private SourceImplementationRead toSourceImplementationRead(SourceConnectionImplementation sourceConnectionImplementation, UUID sourceId) {
final SourceImplementationRead sourceImplementationRead = new SourceImplementationRead();
sourceImplementationRead.setSourceId(sourceId);
sourceImplementationRead.setSourceImplementationId(
@@ -228,4 +215,5 @@ public class SourceImplementationsHandler {
return sourceImplementationRead;
}
}

View File

@@ -42,27 +42,24 @@ public class SourceSpecificationsHandler {
public SourceSpecificationRead getSourceSpecification(SourceIdRequestBody sourceIdRequestBody) {
final SourceConnectionSpecification sourceConnection;
// todo (cgardens) - this is a shortcoming of rolling our own disk storage. since we are not
// querying on a the primary key, we have to list all of the specification objects and then
// filter.
// querying on a the primary key, we have to list all of the specification objects and then
// filter.
sourceConnection =
ConfigFetchers.getSourceConnectionSpecifications(configPersistence).stream()
.filter(
sourceSpecification ->
sourceSpecification.getSourceId().equals(sourceIdRequestBody.getSourceId()))
sourceSpecification -> sourceSpecification.getSourceId().equals(sourceIdRequestBody.getSourceId()))
.findFirst()
.orElseThrow(
() ->
new KnownException(
404,
String.format(
"Could not find a source specification for source: %s",
sourceIdRequestBody.getSourceId())));
() -> new KnownException(
404,
String.format(
"Could not find a source specification for source: %s",
sourceIdRequestBody.getSourceId())));
return toSourceSpecificationRead(sourceConnection);
}
private static SourceSpecificationRead toSourceSpecificationRead(
SourceConnectionSpecification sourceConnectionSpecification) {
private static SourceSpecificationRead toSourceSpecificationRead(SourceConnectionSpecification sourceConnectionSpecification) {
final SourceSpecificationRead sourceSpecificationRead = new SourceSpecificationRead();
sourceSpecificationRead.setSourceId(sourceConnectionSpecification.getSourceId());
sourceSpecificationRead.setSourceSpecificationId(
@@ -72,4 +69,5 @@ public class SourceSpecificationsHandler {
return sourceSpecificationRead;
}
}

View File

@@ -35,6 +35,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
public class SourcesHandler {
private final ConfigPersistence configPersistence;
public SourcesHandler(ConfigPersistence configPersistence) {
@@ -67,4 +68,5 @@ public class SourcesHandler {
return sourceRead;
}
}

View File

@@ -36,6 +36,7 @@ import io.dataline.server.helpers.ConfigFetchers;
import java.util.UUID;
public class WorkspacesHandler {
private final ConfigPersistence configPersistence;
public WorkspacesHandler(ConfigPersistence configPersistence) {
@@ -86,4 +87,5 @@ public class WorkspacesHandler {
return getWorkspaceFromId(workspaceUpdate.getWorkspaceId());
}
}

View File

@@ -48,8 +48,8 @@ import java.util.UUID;
*/
public class ConfigFetchers {
public static StandardWorkspace getStandardWorkspace(
ConfigPersistence configPersistence, UUID workspaceId) {
public static StandardWorkspace getStandardWorkspace(ConfigPersistence configPersistence,
UUID workspaceId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_WORKSPACE,
@@ -62,8 +62,8 @@ public class ConfigFetchers {
}
}
public static StandardSource getStandardSource(
ConfigPersistence configPersistence, UUID sourceId) {
public static StandardSource getStandardSource(ConfigPersistence configPersistence,
UUID sourceId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_SOURCE, sourceId.toString(), StandardSource.class);
@@ -75,11 +75,10 @@ public class ConfigFetchers {
}
// wrap json validation errors for usages in API handlers.
public static <T> void writeConfig(
ConfigPersistence configPersistence,
PersistenceConfigType persistenceConfigType,
String configId,
T config) {
public static <T> void writeConfig(ConfigPersistence configPersistence,
PersistenceConfigType persistenceConfigType,
String configId,
T config) {
try {
configPersistence.writeConfig(persistenceConfigType, configId, config);
} catch (JsonValidationException e) {
@@ -96,8 +95,8 @@ public class ConfigFetchers {
}
}
public static SourceConnectionSpecification getSourceConnectionSpecification(
ConfigPersistence configPersistence, UUID sourceSpecificationId) {
public static SourceConnectionSpecification getSourceConnectionSpecification(ConfigPersistence configPersistence,
UUID sourceSpecificationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
@@ -110,8 +109,7 @@ public class ConfigFetchers {
}
}
public static Set<SourceConnectionSpecification> getSourceConnectionSpecifications(
ConfigPersistence configPersistence) {
public static Set<SourceConnectionSpecification> getSourceConnectionSpecifications(ConfigPersistence configPersistence) {
try {
return configPersistence.getConfigs(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
@@ -121,8 +119,8 @@ public class ConfigFetchers {
}
}
public static SourceConnectionImplementation getSourceConnectionImplementation(
ConfigPersistence configPersistence, UUID sourceImplementationId) {
public static SourceConnectionImplementation getSourceConnectionImplementation(ConfigPersistence configPersistence,
UUID sourceImplementationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
@@ -135,8 +133,7 @@ public class ConfigFetchers {
}
}
public static Set<SourceConnectionImplementation> getSourceConnectionImplementations(
ConfigPersistence configPersistence) {
public static Set<SourceConnectionImplementation> getSourceConnectionImplementations(ConfigPersistence configPersistence) {
try {
return configPersistence.getConfigs(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
@@ -146,8 +143,8 @@ public class ConfigFetchers {
}
}
public static StandardDestination getStandardDestination(
ConfigPersistence configPersistence, UUID destinationId) {
public static StandardDestination getStandardDestination(ConfigPersistence configPersistence,
UUID destinationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_DESTINATION,
@@ -160,8 +157,7 @@ public class ConfigFetchers {
}
}
public static Set<StandardDestination> getStandardDestinations(
ConfigPersistence configPersistence) {
public static Set<StandardDestination> getStandardDestinations(ConfigPersistence configPersistence) {
try {
return configPersistence.getConfigs(
PersistenceConfigType.STANDARD_DESTINATION, StandardDestination.class);
@@ -170,8 +166,8 @@ public class ConfigFetchers {
}
}
public static DestinationConnectionSpecification getDestinationConnectionSpecification(
ConfigPersistence configPersistence, UUID destinationSpecificationId) {
public static DestinationConnectionSpecification getDestinationConnectionSpecification(ConfigPersistence configPersistence,
UUID destinationSpecificationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
@@ -185,8 +181,7 @@ public class ConfigFetchers {
}
}
public static Set<DestinationConnectionSpecification> getDestinationConnectionSpecifications(
ConfigPersistence configPersistence) {
public static Set<DestinationConnectionSpecification> getDestinationConnectionSpecifications(ConfigPersistence configPersistence) {
try {
return configPersistence.getConfigs(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
@@ -196,8 +191,8 @@ public class ConfigFetchers {
}
}
public static DestinationConnectionImplementation getDestinationConnectionImplementation(
ConfigPersistence configPersistence, UUID destinationImplementationId) {
public static DestinationConnectionImplementation getDestinationConnectionImplementation(ConfigPersistence configPersistence,
UUID destinationImplementationId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
@@ -211,8 +206,7 @@ public class ConfigFetchers {
}
}
public static Set<DestinationConnectionImplementation> getDestinationConnectionImplementations(
ConfigPersistence configPersistence) {
public static Set<DestinationConnectionImplementation> getDestinationConnectionImplementations(ConfigPersistence configPersistence) {
try {
return configPersistence.getConfigs(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
@@ -222,8 +216,8 @@ public class ConfigFetchers {
}
}
public static StandardSync getStandardSync(
ConfigPersistence configPersistence, UUID connectionId) {
public static StandardSync getStandardSync(ConfigPersistence configPersistence,
UUID connectionId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC, connectionId.toString(), StandardSync.class);
@@ -242,8 +236,8 @@ public class ConfigFetchers {
}
}
public static StandardSyncSchedule getStandardSyncSchedule(
ConfigPersistence configPersistence, UUID connectionId) {
public static StandardSyncSchedule getStandardSyncSchedule(ConfigPersistence configPersistence,
UUID connectionId) {
try {
return configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
@@ -256,8 +250,9 @@ public class ConfigFetchers {
}
}
private static KnownException getConfigNotFoundException(
Throwable e, String configName, UUID id) {
private static KnownException getConfigNotFoundException(Throwable e,
String configName,
UUID id) {
return new KnownException(
422, String.format("Could not find sync configuration for %s: %s.", configName, id), e);
}
@@ -270,4 +265,5 @@ public class ConfigFetchers {
e.getMessage()),
e);
}
}

View File

@@ -46,8 +46,7 @@ public class IntegrationSchemaValidation {
this.jsonSchemaValidation = new JsonSchemaValidation();
}
public void validateSourceConnectionConfiguration(
UUID sourceConnectionSpecificationId, String configurationJson)
public void validateSourceConnectionConfiguration(UUID sourceConnectionSpecificationId, String configurationJson)
throws JsonValidationException {
final SourceConnectionSpecification sourceConnectionSpecification =
ConfigFetchers.getSourceConnectionSpecification(
@@ -60,8 +59,7 @@ public class IntegrationSchemaValidation {
jsonSchemaValidation.validateThrow(schemaJson, configJson);
}
public void validateDestinationConnectionConfiguration(
UUID destinationConnectionSpecificationId, String configurationJson)
public void validateDestinationConnectionConfiguration(UUID destinationConnectionSpecificationId, String configurationJson)
throws JsonValidationException {
final DestinationConnectionSpecification destinationConnectionSpecification =
ConfigFetchers.getDestinationConnectionSpecification(
@@ -73,4 +71,5 @@ public class IntegrationSchemaValidation {
jsonSchemaValidation.validateThrow(schemaJson, configJson);
}
}

View File

@@ -29,8 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
class ServerAppTest {
@Test
void name() {
assertEquals(1, 1);
}
}

View File

@@ -92,16 +92,16 @@ class ConnectionsHandlerTest {
when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync);
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync);
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSyncSchedule.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSyncSchedule.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
final ConnectionCreate connectionCreate = new ConnectionCreate();
connectionCreate.setSourceImplementationId(standardSync.getSourceImplementationId());
@@ -109,7 +109,7 @@ class ConnectionsHandlerTest {
connectionCreate.setName("presto to hudi");
connectionCreate.setStatus(ConnectionStatus.ACTIVE);
// todo (cgardens) - the codegen auto-nests enums as subclasses. this won't work. we expect
// these enums to be reusable in create, update, read.
// these enums to be reusable in create, update, read.
connectionCreate.setSyncMode(ConnectionCreate.SyncModeEnum.APPEND);
connectionCreate.setSchedule(generateBasicSchedule());
connectionCreate.setSyncSchema(generateBasicApiSchema());
@@ -167,18 +167,18 @@ class ConnectionsHandlerTest {
updatedPersistenceSchedule.setManual(true);
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync)
.thenReturn(updatedStandardSync);
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync)
.thenReturn(updatedStandardSync);
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSyncSchedule.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule)
.thenReturn(updatedPersistenceSchedule);
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSyncSchedule.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule)
.thenReturn(updatedPersistenceSchedule);
final ConnectionRead actualConnectionRead =
connectionsHandler.updateConnection(connectionUpdate);
@@ -211,16 +211,16 @@ class ConnectionsHandlerTest {
@Test
void testGetConnection() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync);
PersistenceConfigType.STANDARD_SYNC,
standardSync.getConnectionId().toString(),
StandardSync.class))
.thenReturn(standardSync);
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSync.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSync.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody();
connectionIdRequestBody.setConnectionId(standardSync.getConnectionId());
@@ -239,17 +239,17 @@ class ConnectionsHandlerTest {
// mock get source connection impl (used to check that connection is associated with given
// workspace)
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceImplementation);
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceImplementation);
// mock get schedule for the now verified connection
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSync.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
PersistenceConfigType.STANDARD_SYNC_SCHEDULE,
standardSync.getConnectionId().toString(),
StandardSyncSchedule.class))
.thenReturn(standardSyncSchedule);
final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody();
workspaceIdRequestBody.setWorkspaceId(sourceImplementation.getWorkspaceId());
@@ -326,8 +326,9 @@ class ConnectionsHandlerTest {
return connectionSchedule;
}
private ConnectionRead generateExpectedConnectionRead(
UUID connectionId, UUID sourceImplementationId, UUID destinationImplementationId) {
private ConnectionRead generateExpectedConnectionRead(UUID connectionId,
UUID sourceImplementationId,
UUID destinationImplementationId) {
final ConnectionRead expectedConnectionRead = new ConnectionRead();
expectedConnectionRead.setConnectionId(connectionId);
expectedConnectionRead.setSourceImplementationId(sourceImplementationId);
@@ -360,4 +361,5 @@ class ConnectionsHandlerTest {
return standardSchedule;
}
}

View File

@@ -57,6 +57,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class DestinationImplementationsHandlerTest {
private ConfigPersistence configPersistence;
private DestinationConnectionSpecification destinationConnectionSpecification;
private DestinationConnectionImplementation destinationConnectionImplementation;
@@ -88,8 +89,8 @@ class DestinationImplementationsHandlerTest {
return Files.readString(path);
}
private DestinationConnectionImplementation generateDestinationImplementation(
UUID destinationSpecificationId) throws IOException {
private DestinationConnectionImplementation generateDestinationImplementation(UUID destinationSpecificationId)
throws IOException {
final UUID workspaceId = UUID.randomUUID();
final UUID destinationImplementationId = UUID.randomUUID();
@@ -112,16 +113,16 @@ class DestinationImplementationsHandlerTest {
.thenReturn(destinationConnectionImplementation.getDestinationImplementationId());
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation);
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
final DestinationImplementationCreate destinationImplementationCreate =
new DestinationImplementationCreate();
@@ -182,17 +183,17 @@ class DestinationImplementationsHandlerTest {
expectedDestinationConnectionImplementation.setConfigurationJson(newConfiguration.toString());
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation)
.thenReturn(expectedDestinationConnectionImplementation);
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation)
.thenReturn(expectedDestinationConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
final DestinationImplementationUpdate destinationImplementationUpdate =
new DestinationImplementationUpdate();
@@ -227,16 +228,16 @@ class DestinationImplementationsHandlerTest {
@Test
void testGetDestinationImplementation() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation);
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
destinationConnectionImplementation.getDestinationImplementationId().toString(),
DestinationConnectionImplementation.class))
.thenReturn(destinationConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
DestinationImplementationRead expectedDestinationImplementationRead =
new DestinationImplementationRead();
@@ -267,15 +268,15 @@ class DestinationImplementationsHandlerTest {
void testListDestinationImplementationsForWorkspace()
throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfigs(
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
DestinationConnectionImplementation.class))
.thenReturn(Sets.newHashSet(destinationConnectionImplementation));
PersistenceConfigType.DESTINATION_CONNECTION_IMPLEMENTATION,
DestinationConnectionImplementation.class))
.thenReturn(Sets.newHashSet(destinationConnectionImplementation));
when(configPersistence.getConfig(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
destinationConnectionImplementation.getDestinationSpecificationId().toString(),
DestinationConnectionSpecification.class))
.thenReturn(destinationConnectionSpecification);
DestinationImplementationRead expectedDestinationImplementationRead =
new DestinationImplementationRead();
@@ -301,4 +302,5 @@ class DestinationImplementationsHandlerTest {
expectedDestinationImplementationRead,
actualDestinationImplementationRead.getDestinations().get(0));
}
}

View File

@@ -57,9 +57,9 @@ class DestinationSpecificationsHandlerTest {
@Test
void testGetDestinationSpecification() throws JsonValidationException {
when(configPersistence.getConfigs(
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
DestinationConnectionSpecification.class))
.thenReturn(Sets.newHashSet(destinationConnectionSpecification));
PersistenceConfigType.DESTINATION_CONNECTION_SPECIFICATION,
DestinationConnectionSpecification.class))
.thenReturn(Sets.newHashSet(destinationConnectionSpecification));
DestinationSpecificationRead expectedDestinationSpecificationRead =
new DestinationSpecificationRead();
@@ -79,4 +79,5 @@ class DestinationSpecificationsHandlerTest {
assertEquals(expectedDestinationSpecificationRead, actualDestinationSpecificationRead);
}
}

View File

@@ -44,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class DestinationsHandlerTest {
private ConfigPersistence configPersistence;
private StandardDestination destination;
private DestinationsHandler destinationHandler;
@@ -74,8 +75,8 @@ class DestinationsHandlerTest {
destination2);
when(configPersistence.getConfigs(
PersistenceConfigType.STANDARD_DESTINATION, StandardDestination.class))
.thenReturn(Sets.newHashSet(destination, destination2));
PersistenceConfigType.STANDARD_DESTINATION, StandardDestination.class))
.thenReturn(Sets.newHashSet(destination, destination2));
DestinationRead expectedDestinationRead1 = new DestinationRead();
expectedDestinationRead1.setDestinationId(destination.getDestinationId());
@@ -90,14 +91,12 @@ class DestinationsHandlerTest {
final Optional<DestinationRead> actualDestinationRead1 =
actualDestinationReadList.getDestinations().stream()
.filter(
destinationRead ->
destinationRead.getDestinationId().equals(destination.getDestinationId()))
destinationRead -> destinationRead.getDestinationId().equals(destination.getDestinationId()))
.findFirst();
final Optional<DestinationRead> actualDestinationRead2 =
actualDestinationReadList.getDestinations().stream()
.filter(
destinationRead ->
destinationRead.getDestinationId().equals(destination2.getDestinationId()))
destinationRead -> destinationRead.getDestinationId().equals(destination2.getDestinationId()))
.findFirst();
assertTrue(actualDestinationRead1.isPresent());
@@ -109,10 +108,10 @@ class DestinationsHandlerTest {
@Test
void testGetDestination() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_DESTINATION,
destination.getDestinationId().toString(),
StandardDestination.class))
.thenReturn(destination);
PersistenceConfigType.STANDARD_DESTINATION,
destination.getDestinationId().toString(),
StandardDestination.class))
.thenReturn(destination);
DestinationRead expectedDestinationRead = new DestinationRead();
expectedDestinationRead.setDestinationId(destination.getDestinationId());
@@ -126,4 +125,5 @@ class DestinationsHandlerTest {
assertEquals(expectedDestinationRead, actualDestinationRead);
}
}

View File

@@ -55,6 +55,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class SourceImplementationsHandlerTest {
private ConfigPersistence configPersistence;
private SourceConnectionSpecification sourceConnectionSpecification;
private SourceConnectionImplementation sourceConnectionImplementation;
@@ -85,16 +86,16 @@ class SourceImplementationsHandlerTest {
.thenReturn(sourceConnectionImplementation.getSourceImplementationId());
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation);
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
final SourceImplementationCreate sourceImplementationCreate = new SourceImplementationCreate();
sourceImplementationCreate.setWorkspaceId(sourceConnectionImplementation.getWorkspaceId());
@@ -149,17 +150,17 @@ class SourceImplementationsHandlerTest {
expectedSourceConnectionImplementation.setTombstone(false);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation)
.thenReturn(expectedSourceConnectionImplementation);
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation)
.thenReturn(expectedSourceConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
final SourceImplementationUpdate sourceImplementationUpdate = new SourceImplementationUpdate();
sourceImplementationUpdate.setSourceImplementationId(
@@ -190,16 +191,16 @@ class SourceImplementationsHandlerTest {
@Test
void testGetSourceImplementation() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation);
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
SourceImplementationRead expectedSourceImplementationRead = new SourceImplementationRead();
expectedSourceImplementationRead.setSourceId(sourceConnectionSpecification.getSourceId());
@@ -227,15 +228,15 @@ class SourceImplementationsHandlerTest {
void testListSourceImplementationsForWorkspace()
throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfigs(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
SourceConnectionImplementation.class))
.thenReturn(Sets.newHashSet(sourceConnectionImplementation));
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
SourceConnectionImplementation.class))
.thenReturn(Sets.newHashSet(sourceConnectionImplementation));
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
SourceImplementationRead expectedSourceImplementationRead = new SourceImplementationRead();
expectedSourceImplementationRead.setSourceId(sourceConnectionSpecification.getSourceId());
@@ -277,17 +278,17 @@ class SourceImplementationsHandlerTest {
expectedSourceConnectionImplementation.setTombstone(true);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation)
.thenReturn(expectedSourceConnectionImplementation);
PersistenceConfigType.SOURCE_CONNECTION_IMPLEMENTATION,
sourceConnectionImplementation.getSourceImplementationId().toString(),
SourceConnectionImplementation.class))
.thenReturn(sourceConnectionImplementation)
.thenReturn(expectedSourceConnectionImplementation);
when(configPersistence.getConfig(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
sourceConnectionSpecification.getSourceSpecificationId().toString(),
SourceConnectionSpecification.class))
.thenReturn(sourceConnectionSpecification);
final SourceImplementationIdRequestBody sourceImplementationIdRequestBody =
new SourceImplementationIdRequestBody();
@@ -311,4 +312,5 @@ class SourceImplementationsHandlerTest {
sourceConnectionImplementation.getSourceImplementationId().toString(),
expectedSourceConnectionImplementation);
}
}

View File

@@ -56,9 +56,9 @@ class SourceSpecificationsHandlerTest {
@Test
void testGetSourceSpecification() throws JsonValidationException {
when(configPersistence.getConfigs(
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
SourceConnectionSpecification.class))
.thenReturn(Sets.newHashSet(sourceConnectionSpecification));
PersistenceConfigType.SOURCE_CONNECTION_SPECIFICATION,
SourceConnectionSpecification.class))
.thenReturn(Sets.newHashSet(sourceConnectionSpecification));
SourceSpecificationRead expectedSourceSpecificationRead = new SourceSpecificationRead();
expectedSourceSpecificationRead.setSourceId(sourceConnectionSpecification.getSourceId());
@@ -75,4 +75,5 @@ class SourceSpecificationsHandlerTest {
assertEquals(expectedSourceSpecificationRead, actualSourceSpecificationRead);
}
}

View File

@@ -44,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class SourcesHandlerTest {
private ConfigPersistence configPersistence;
private StandardSource source;
private SourcesHandler sourceHandler;
@@ -102,10 +103,10 @@ class SourcesHandlerTest {
@Test
void testGetSource() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_SOURCE,
source.getSourceId().toString(),
StandardSource.class))
.thenReturn(source);
PersistenceConfigType.STANDARD_SOURCE,
source.getSourceId().toString(),
StandardSource.class))
.thenReturn(source);
SourceRead expectedSourceRead = new SourceRead();
expectedSourceRead.setSourceId(source.getSourceId());
@@ -118,4 +119,5 @@ class SourcesHandlerTest {
assertEquals(expectedSourceRead, actualSourceRead);
}
}

View File

@@ -44,6 +44,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class WorkspacesHandlerTest {
private ConfigPersistence configPersistence;
private StandardWorkspace workspace;
private WorkspacesHandler workspacesHandler;
@@ -71,10 +72,10 @@ class WorkspacesHandlerTest {
@Test
void testGetWorkspace() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace);
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace);
final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody();
workspaceIdRequestBody.setWorkspaceId(workspace.getWorkspaceId());
@@ -91,10 +92,10 @@ class WorkspacesHandlerTest {
@Test
void testGetWorkspaceBySlug() throws JsonValidationException, ConfigNotFoundException {
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace);
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace);
final SlugRequestBody slugRequestBody = new SlugRequestBody();
slugRequestBody.setSlug("default");
@@ -129,11 +130,11 @@ class WorkspacesHandlerTest {
expectedWorkspace.setInitialSetupComplete(true);
when(configPersistence.getConfig(
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace)
.thenReturn(expectedWorkspace);
PersistenceConfigType.STANDARD_WORKSPACE,
workspace.getWorkspaceId().toString(),
StandardWorkspace.class))
.thenReturn(workspace)
.thenReturn(expectedWorkspace);
final WorkspaceRead actualWorkspaceRead = workspacesHandler.updateWorkspace(workspaceUpdate);
@@ -151,4 +152,5 @@ class WorkspacesHandlerTest {
assertEquals(expectedWorkspaceRead, actualWorkspaceRead);
}
}

View File

@@ -49,4 +49,5 @@ public class DestinationSpecificationHelpers {
return destinationConnectionSpecification;
}
}

View File

@@ -33,8 +33,8 @@ import java.util.UUID;
public class SourceImplementationHelpers {
public static SourceConnectionImplementation generateSourceImplementation(
UUID sourceSpecificationId) throws IOException {
public static SourceConnectionImplementation generateSourceImplementation(UUID sourceSpecificationId)
throws IOException {
final UUID workspaceId = UUID.randomUUID();
final UUID sourceImplementationId = UUID.randomUUID();
@@ -56,4 +56,5 @@ public class SourceImplementationHelpers {
Paths.get("../dataline-server/src/test/resources/json/TestImplementation.json");
return Files.readString(path);
}
}

View File

@@ -48,4 +48,5 @@ public class SourceSpecificationHelpers {
return sourceConnectionSpecification;
}
}

View File

@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultSyncWorker implements SyncWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSyncWorker.class);
public static final String TAP_ERR_LOG = "tap_err.log";
@@ -49,9 +50,8 @@ public class DefaultSyncWorker implements SyncWorker {
private final AtomicBoolean cancelled;
public DefaultSyncWorker(
TapFactory<SingerMessage> singerTapFactory,
TargetFactory<SingerMessage> singerTargetFactory) {
public DefaultSyncWorker(TapFactory<SingerMessage> singerTapFactory,
TargetFactory<SingerMessage> singerTargetFactory) {
this.singerTapFactory = singerTapFactory;
this.singerTargetFactory = singerTargetFactory;
this.cancelled = new AtomicBoolean(false);
@@ -98,4 +98,5 @@ public class DefaultSyncWorker implements SyncWorker {
public void cancel() {
cancelled.set(true);
}
}

View File

@@ -27,6 +27,7 @@ package io.dataline.workers;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class DiscoveryOutput {
// TODO line this up with conduit config type
private final String catalog;
@@ -42,4 +43,5 @@ public class DiscoveryOutput {
public String toString() {
return new ToStringBuilder(this).append(catalog).toString();
}
}

View File

@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EchoWorker implements Worker<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoWorker.class);
public EchoWorker() {}
@@ -43,4 +44,5 @@ public class EchoWorker implements Worker<String, String> {
public void cancel() {
// no-op
}
}

View File

@@ -25,7 +25,9 @@
package io.dataline.workers;
public class InvalidCatalogException extends Exception {
public InvalidCatalogException(String message) {
super(message);
}
}

View File

@@ -25,7 +25,9 @@
package io.dataline.workers;
public class InvalidCredentialsException extends Exception {
public InvalidCredentialsException(String message) {
super(message);
}
}

View File

@@ -28,6 +28,7 @@ import java.util.Optional;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class OutputAndStatus<OutputType> {
private final OutputType output;
private final JobStatus status;
@@ -53,4 +54,5 @@ public class OutputAndStatus<OutputType> {
public JobStatus getStatus() {
return status;
}
}

View File

@@ -29,6 +29,8 @@ import java.nio.file.Path;
import java.util.stream.Stream;
public interface TapFactory<T> {
Stream<T> create(StandardTapConfig tapConfig, Path workspacePath)
throws InvalidCredentialsException, InvalidCatalogException, SyncException;
}

View File

@@ -60,4 +60,5 @@ public class TargetConsumer implements CloseableConsumer<SingerMessage> {
writer.flush();
WorkerUtils.closeProcess(process);
}
}

View File

@@ -29,6 +29,8 @@ import io.dataline.config.StandardTargetConfig;
import java.nio.file.Path;
public interface TargetFactory<T> {
CloseableConsumer<T> create(StandardTargetConfig targetConfig, Path workspacePath)
throws SyncException;
}

View File

@@ -27,12 +27,14 @@ package io.dataline.workers;
import java.nio.file.Path;
public interface Worker<InputType, OutputType> {
/**
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return
* either COMPLETE, FAILED, or CANCELLED.
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return either
* COMPLETE, FAILED, or CANCELLED.
*/
OutputAndStatus<OutputType> run(InputType inputType, Path jobRoot)
throws InvalidCredentialsException, InvalidCatalogException;
void cancel();
}

View File

@@ -105,4 +105,5 @@ public class WorkerUtils {
targetConfig.setStandardSync(sync.getStandardSync());
return targetConfig;
}
}

View File

@@ -71,4 +71,5 @@ public class DockerProcessBuilderFactory implements ProcessBuilderFactory {
final Path relativePath = workspaceRoot.relativize(jobRoot);
return MOUNT_DESTINATION.resolve(relativePath);
}
}

View File

@@ -29,4 +29,5 @@ import java.nio.file.Path;
public interface ProcessBuilderFactory {
ProcessBuilder create(Path jobPath, String imageName, String... args);
}

View File

@@ -39,8 +39,9 @@ public class MessageUtils {
private static final DateTimeFormatter SINGER_DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneId.of("UTC"));
public static SingerMessage createRecordMessage(
String tableName, JsonNode record, Instant timeExtracted) {
public static SingerMessage createRecordMessage(String tableName,
JsonNode record,
Instant timeExtracted) {
final SingerMessage singerMessage = new SingerMessage();
singerMessage.setType(SingerMessage.Type.RECORD);
singerMessage.setRecord(Jsons.serialize(record));
@@ -60,4 +61,5 @@ public class MessageUtils {
public static SingerMessage createRecordMessage(String tableName, Map<String, String> record) {
return createRecordMessage(tableName, Jsons.jsonNode(record), null);
}
}

View File

@@ -38,9 +38,10 @@ import org.slf4j.LoggerFactory;
* InputStream into a SingerMessage. If the line cannot be parsed into a SingerMessage it is
* dropped. Each record MUST be new line separated.
*
* <p>If a line starts with a SingerMessage and then has other characters after it, that
* SingerMessage will still be parsed. If there are multiple SingerMessage records on the same line,
* only the first will be parsed.
* <p>
* If a line starts with a SingerMessage and then has other characters after it, that SingerMessage
* will still be parsed. If there are multiple SingerMessage records on the same line, only the
* first will be parsed.
*/
public class SingerJsonStreamFactory {
@@ -59,4 +60,5 @@ public class SingerJsonStreamFactory {
return null;
}
}
}

View File

@@ -65,4 +65,5 @@ public class SingerMessageTracker implements Consumer<SingerMessage> {
public Optional<State> getOutputState() {
return Optional.ofNullable(outputState.get());
}
}

View File

@@ -72,4 +72,5 @@ public abstract class BaseSingerWorker<InputType, OutputType>
protected static Path getFullPath(Path workspaceRoot, String fileName) {
return workspaceRoot.resolve(fileName);
}
}

View File

@@ -43,8 +43,8 @@ public class SingerCatalogConverters {
/**
* Takes in a singer catalog and a dataline schema. It then applies the dataline configuration to
* that catalog. e.g. If dataline says that a certain column should or should not be included in
* the sync, this method applies that to the catalog. Thus we produce a valid singer catalog that
* that catalog. e.g. If dataline says that a certain column should or should not be included in the
* sync, this method applies that to the catalog. Thus we produce a valid singer catalog that
* contains configurations stored in dataline.
*
* @param catalog - singer catalog
@@ -110,8 +110,8 @@ public class SingerCatalogConverters {
// TODO
newSingerStream.setMetadata(newMetadata);
// todo (cgardens) - this will not work for legacy catalogs. want to handle this
// in a subsequent PR, because handling this is going to require doing another
// one of these monster map tasks.
// in a subsequent PR, because handling this is going to require doing another
// one of these monster map tasks.
newSingerStream.setSchema(stream.getSchema());
return newSingerStream;
@@ -189,15 +189,14 @@ public class SingerCatalogConverters {
return false;
}
private static Map<String, List<SingerMetadata>> getTableNameToMetadataList(
List<SingerStream> streams) {
private static Map<String, List<SingerMetadata>> getTableNameToMetadataList(List<SingerStream> streams) {
// todo (cgardens) - figure out if it's stream or stream id or table name.
return streams.stream()
.collect(Collectors.toMap(SingerStream::getStream, SingerStream::getMetadata));
}
private static Map<String, SingerMetadataChild> getColumnMetadataForTable(
Map<String, List<SingerMetadata>> tableNameToMetadata, String tableName) {
private static Map<String, SingerMetadataChild> getColumnMetadataForTable(Map<String, List<SingerMetadata>> tableNameToMetadata,
String tableName) {
if (!tableNameToMetadata.containsKey(tableName)) {
throw new RuntimeException("could not find metadata for table: " + tableName);
}
@@ -229,8 +228,8 @@ public class SingerCatalogConverters {
}
/**
* Singer tends to have 2 types for columns one of which is null. The null is pretty irrelevant,
* so look at types and find the first non-null one and use that.
* Singer tends to have 2 types for columns one of which is null. The null is pretty irrelevant, so
* look at types and find the first non-null one and use that.
*
* @param singerTypes - list of types discovered by singer.
* @return reduce down to one type which best matches the column's data type
@@ -244,9 +243,9 @@ public class SingerCatalogConverters {
}
/**
* Singer doesn't seem to have an official list of the data types that they support, so we will
* have to do our best here as we discover them. If it becomes too awful, we can just map types we
* don't recognize to string.
* Singer doesn't seem to have an official list of the data types that they support, so we will have
* to do our best here as we discover them. If it becomes too awful, we can just map types we don't
* recognize to string.
*
* @param singerType - singer's column data type
* @return best match for our own data type
@@ -258,7 +257,7 @@ public class SingerCatalogConverters {
case INTEGER:
return DataType.NUMBER;
case NULL:
//noinspection DuplicateBranchesInSwitch
// noinspection DuplicateBranchesInSwitch
return DataType.STRING; // todo (cgardens) - hackasaurus rex
case BOOLEAN:
return DataType.BOOLEAN;
@@ -293,4 +292,5 @@ public class SingerCatalogConverters {
return singerMetadata;
}
}

View File

@@ -50,8 +50,9 @@ public class SingerCheckConnectionWorker
}
@Override
public OutputAndStatus<StandardCheckConnectionOutput> run(
StandardCheckConnectionInput input, Path jobRoot) throws InvalidCredentialsException {
public OutputAndStatus<StandardCheckConnectionOutput> run(StandardCheckConnectionInput input,
Path jobRoot)
throws InvalidCredentialsException {
final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput();
discoverSchemaInput.setConnectionConfigurationJson(input.getConnectionConfigurationJson());
@@ -79,4 +80,5 @@ public class SingerCheckConnectionWorker
public void cancel() {
singerDiscoverSchemaWorker.cancel();
}
}

View File

@@ -66,11 +66,11 @@ public class SingerDiscoverSchemaWorker
// package private since package-local classes need direct access to singer catalog, and the
// conversion from SingerSchema to Dataline schema is lossy
OutputAndStatus<SingerCatalog> runInternal(
StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot)
OutputAndStatus<SingerCatalog> runInternal(StandardDiscoverSchemaInput discoverSchemaInput,
Path jobRoot)
throws InvalidCredentialsException {
// todo (cgardens) - just getting original impl to line up with new iface for now. this can be
// reduced.
// reduced.
final String configDotJson = discoverSchemaInput.getConnectionConfigurationJson();
writeFile(jobRoot, CONFIG_JSON_FILENAME, configDotJson);
@@ -105,8 +105,8 @@ public class SingerDiscoverSchemaWorker
}
@Override
public OutputAndStatus<StandardDiscoverSchemaOutput> run(
StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot)
public OutputAndStatus<StandardDiscoverSchemaOutput> run(StandardDiscoverSchemaInput discoverSchemaInput,
Path jobRoot)
throws InvalidCredentialsException {
OutputAndStatus<SingerCatalog> output = runInternal(discoverSchemaInput, jobRoot);
JobStatus status = output.getStatus();
@@ -131,4 +131,5 @@ public class SingerDiscoverSchemaWorker
public void cancel() {
cancelHelper(workerProcess);
}
}

View File

@@ -83,15 +83,15 @@ public class SingerTapFactory implements TapFactory<SingerMessage> {
try {
tapProcess =
pbf.create(
jobRoot,
imageName,
"--config",
CONFIG_JSON_FILENAME,
// TODO support both --properties and --catalog depending on integration
"--properties",
CATALOG_JSON_FILENAME,
"--state",
STATE_JSON_FILENAME)
jobRoot,
imageName,
"--config",
CONFIG_JSON_FILENAME,
// TODO support both --properties and --catalog depending on integration
"--properties",
CATALOG_JSON_FILENAME,
"--state",
STATE_JSON_FILENAME)
.redirectError(jobRoot.resolve(DefaultSyncWorker.TAP_ERR_LOG).toFile())
.start();
} catch (IOException e) {
@@ -126,4 +126,5 @@ public class SingerTapFactory implements TapFactory<SingerMessage> {
return new SingerDiscoverSchemaWorker(imageName, pbf)
.runInternal(discoveryInput, scopedWorkspace);
}
}

View File

@@ -81,4 +81,5 @@ public class SingerTargetFactory implements TargetFactory<SingerMessage> {
throw new RuntimeException(e);
}
}
}

View File

@@ -40,6 +40,7 @@ import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class BaseWorkerTestCase {
// TODO inject via env
protected Path workspaceRoot;
protected ProcessBuilderFactory pbf;
@@ -71,4 +72,5 @@ public abstract class BaseWorkerTestCase {
protected void assertJsonEquals(final String s1, final String s2) {
assertEquals(Jsons.deserialize(s1), Jsons.deserialize(s2));
}
}

Some files were not shown because too many files have changed in this diff Show More