1
0
mirror of synced 2025-12-19 18:14:56 -05:00

Prepare clean mount management (#126)

This commit is contained in:
Michel Tricot
2020-08-28 14:04:51 -07:00
committed by GitHub
parent 5d696f01db
commit f344ecbbb1
40 changed files with 533 additions and 246 deletions

View File

@@ -1,6 +1,7 @@
.dockerignore
.git
.idea
.gradle
**/build
**/node_modules
Dockerfile.*

3
.env
View File

@@ -4,4 +4,5 @@ DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_DB=dataline
DATABASE_URL=jdbc:postgresql://db:5432/dataline
CONFIG_PERSISTENCE_ROOT=data/config
CONFIG_ROOT=data/config
WORKSPACE_ROOT=/tmp/workspace

View File

@@ -3,6 +3,10 @@
#####################
FROM ubuntu:20.04 AS build-base
WORKDIR /code
ENV DEBIAN_FRONTEND noninteractive
# Install tools
RUN apt-get update && apt-get -y install curl
@@ -10,7 +14,8 @@ RUN apt-get update && apt-get -y install curl
RUN curl -sL https://deb.nodesource.com/setup_14.x | bash -
RUN apt-get update && apt-get -y install \
nodejs \
openjdk-14-jdk
openjdk-14-jdk \
docker.io
#######################
# Prepare project env #
@@ -29,6 +34,8 @@ RUN ./gradlew build --no-daemon -g /home/gradle/.gradle
###################
FROM build-project AS build
WORKDIR /code
# Copy code, etc.
COPY . /code

View File

@@ -43,13 +43,14 @@ import org.apache.commons.io.FileUtils;
// we force all interaction with disk storage to be effectively single threaded.
public class DefaultConfigPersistence implements ConfigPersistence {
private static final Object lock = new Object();
private final ObjectMapper objectMapper;
private final JsonSchemaValidation jsonSchemaValidation;
private final String storageRoot;
private final Path storageRoot;
public DefaultConfigPersistence(String storageRoot) {
public DefaultConfigPersistence(Path storageRoot) {
this.storageRoot = storageRoot;
jsonSchemaValidation = new JsonSchemaValidation();
objectMapper = new ObjectMapper();
@@ -145,7 +146,7 @@ public class DefaultConfigPersistence implements ConfigPersistence {
}
private Path getConfigDirectory(PersistenceConfigType persistenceConfigType) {
return Path.of(storageRoot).resolve(persistenceConfigType.toString());
return storageRoot.resolve(persistenceConfigType.toString());
}
private Path getConfigPath(PersistenceConfigType persistenceConfigType, String configId) {

View File

@@ -47,7 +47,7 @@ class DefaultConfigPersistenceTest {
@BeforeEach
void setUp() throws IOException {
rootPath = Files.createTempDirectory(DefaultConfigPersistenceTest.class.getName());
configPersistence = new DefaultConfigPersistence(rootPath.toString());
configPersistence = new DefaultConfigPersistence(rootPath);
}
private StandardSource generateStandardSource() {

View File

@@ -22,14 +22,17 @@
* SOFTWARE.
*/
package io.dataline.workers.singer.postgres_tap;
package io.dataline.config;
import io.dataline.integrations.Integrations;
import io.dataline.workers.singer.SingerDiscoverSchemaWorker;
import java.nio.file.Path;
public class SingerPostgresTapDiscoverWorker extends SingerDiscoverSchemaWorker {
public interface Configs {
public SingerPostgresTapDiscoverWorker() {
super(Integrations.POSTGRES_TAP.getSyncImage());
}
Path getConfigRoot();
Path getWorkspaceRoot();
String getWorkspaceDockerMount();
String getDockerNetwork();
}

View File

@@ -0,0 +1,93 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.config;
import java.nio.file.Path;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EnvConfigs implements Configs {
private static final Logger LOGGER = LoggerFactory.getLogger(EnvConfigs.class);
public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT";
public static final String CONFIG_ROOT = "CONFIG_ROOT";
public static final String DOCKER_NETWORK = "DOCKER_NETWORK";
public static final String DEFAULT_NETWORK = "host";
private final Function<String, String> getEnv;
public EnvConfigs() {
this(System::getenv);
}
EnvConfigs(final Function<String, String> getEnv) {
this.getEnv = getEnv;
}
@Override
public Path getConfigRoot() {
return getPath(CONFIG_ROOT);
}
@Override
public Path getWorkspaceRoot() {
return getPath(WORKSPACE_ROOT);
}
@Override
public String getWorkspaceDockerMount() {
final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT);
if (mount != null) {
return mount;
}
LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT);
return getWorkspaceRoot().toString();
}
@Override
public String getDockerNetwork() {
final String network = getEnv.apply(DOCKER_NETWORK);
if (network != null) {
return network;
}
LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK);
return DEFAULT_NETWORK;
}
private Path getPath(final String name) {
final String value = getEnv.apply(name);
if (value == null) {
throw new IllegalArgumentException("Env variable not defined: " + name);
}
return Path.of(value);
}
}

View File

@@ -0,0 +1,93 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.config;
import static org.mockito.Mockito.when;
import java.nio.file.Paths;
import java.util.function.Function;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
class EnvConfigsTest {
private Function<String, String> function;
private EnvConfigs config;
@BeforeEach
void setUp() {
function = Mockito.mock(Function.class);
config = new EnvConfigs(function);
}
@Test
void ensureGetEnvBehavior() {
Assertions.assertNull(System.getenv("MY_RANDOM_VAR_1234"));
}
@Test
void testWorkspaceRoot() {
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceRoot());
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
Assertions.assertEquals(Paths.get("abc/def"), config.getWorkspaceRoot());
}
@Test
void testConfigRoot() {
when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getConfigRoot());
when(function.apply(EnvConfigs.CONFIG_ROOT)).thenReturn("a/b");
Assertions.assertEquals(Paths.get("a/b"), config.getConfigRoot());
}
@Test
void testGetDockerMount() {
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn("abc/def");
Assertions.assertEquals("abc/def", config.getWorkspaceDockerMount());
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn("root");
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertEquals("root", config.getWorkspaceDockerMount());
when(function.apply(EnvConfigs.WORKSPACE_DOCKER_MOUNT)).thenReturn(null);
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getWorkspaceDockerMount());
}
@Test
void testDockerNetwork() {
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn(null);
Assertions.assertEquals("host", config.getDockerNetwork());
when(function.apply(EnvConfigs.DOCKER_NETWORK)).thenReturn("abc");
Assertions.assertEquals("abc", config.getDockerNetwork());
}
}

View File

@@ -11,6 +11,4 @@ COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt
WORKDIR /singer/data
ENTRYPOINT ["target-csv"]

View File

@@ -11,6 +11,4 @@ COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt
WORKDIR /singer/data
ENTRYPOINT ["tap-exchangeratesapi"]

View File

@@ -17,5 +17,4 @@ RUN apt-get update && \
python -m pip install --upgrade pip && \
pip install -r requirements.txt
WORKDIR /singer/data
ENTRYPOINT ["target-postgres"]

View File

@@ -17,6 +17,4 @@ RUN python -m pip install --upgrade pip && \
RUN apt-get autoremove -y gcc
WORKDIR /singer/data
ENTRYPOINT ["tap-postgres"]

View File

@@ -18,7 +18,7 @@ application {
run {
// default for running on local machine.
environment "CONFIG_PERSISTENCE_ROOT", new File(".").absolutePath + "data/config"
environment "CONFIG_ROOT", new File(".").absolutePath + "data/config"
environment "VERSION", "0.1.0"
environment "DATABASE_USER", "postgres"
environment "DATABASE_PASSWORD", ""

View File

@@ -132,7 +132,7 @@ public class JobScheduler implements Runnable {
}
private static Long getIntervalInSeconds(Schedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits().longValue();
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}
private Set<StandardSync> getAllActiveConnections() {

View File

@@ -25,6 +25,8 @@
package io.dataline.scheduler;
import io.dataline.db.DatabaseHelper;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -40,14 +42,20 @@ public class JobSubmitter implements Runnable {
private final ExecutorService threadPool;
private final BasicDataSource connectionPool;
private final SchedulerPersistence persistence;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public JobSubmitter(
ExecutorService threadPool,
BasicDataSource connectionPool,
SchedulerPersistence persistence) {
final ExecutorService threadPool,
final BasicDataSource connectionPool,
final SchedulerPersistence persistence,
final Path workspaceRoot,
final ProcessBuilderFactory pbf) {
this.threadPool = threadPool;
this.connectionPool = connectionPool;
this.persistence = persistence;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}
@Override
@@ -86,6 +94,7 @@ public class JobSubmitter implements Runnable {
}
private void submitJob(Job job) {
threadPool.submit(new WorkerRunner(job.getId(), connectionPool, persistence));
threadPool.submit(
new WorkerRunner(job.getId(), connectionPool, persistence, workspaceRoot, pbf));
}
}

View File

@@ -25,9 +25,14 @@
package io.dataline.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.dataline.config.Configs;
import io.dataline.config.EnvConfigs;
import io.dataline.config.persistence.ConfigPersistence;
import io.dataline.config.persistence.DefaultConfigPersistence;
import io.dataline.db.DatabaseHelper;
import io.dataline.workers.process.DockerProcessBuilderFactory;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -44,6 +49,7 @@ import org.slf4j.LoggerFactory;
* launching new jobs.
*/
public class SchedulerApp {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);
private static final int MAX_WORKERS = 4;
@@ -52,23 +58,32 @@ public class SchedulerApp {
new ThreadFactoryBuilder().setNameFormat("scheduler-%d").build();
private final BasicDataSource connectionPool;
private final String configPersistenceRoot;
private final Path configRoot;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public SchedulerApp(BasicDataSource connectionPool, String configPersistenceRoot) {
public SchedulerApp(
BasicDataSource connectionPool,
Path configRoot,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
this.connectionPool = connectionPool;
this.configPersistenceRoot = configPersistenceRoot;
this.configRoot = configRoot;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}
public void start() {
final SchedulerPersistence schedulerPersistence =
new DefaultSchedulerPersistence(connectionPool);
final ConfigPersistence configPersistence = new DefaultConfigPersistence(configPersistenceRoot);
final ConfigPersistence configPersistence = new DefaultConfigPersistence(configRoot);
final ExecutorService workerThreadPool =
Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final JobSubmitter jobSubmitter =
new JobSubmitter(workerThreadPool, connectionPool, schedulerPersistence);
new JobSubmitter(
workerThreadPool, connectionPool, schedulerPersistence, workspaceRoot, pbf);
final JobScheduler jobScheduler =
new JobScheduler(connectionPool, schedulerPersistence, configPersistence);
@@ -86,13 +101,22 @@ public class SchedulerApp {
}
public static void main(String[] args) {
final String configPersistenceRoot = System.getenv("CONFIG_PERSISTENCE_ROOT");
LOGGER.info("configPersistenceRoot = " + configPersistenceRoot);
final Configs configs = new EnvConfigs();
final Path configRoot = configs.getConfigRoot();
LOGGER.info("configRoot = " + configRoot);
final Path workspaceRoot = configs.getWorkspaceRoot();
LOGGER.info("workspaceRoot = " + workspaceRoot);
LOGGER.info("Creating DB connection pool...");
BasicDataSource connectionPool = DatabaseHelper.getConnectionPoolFromEnv();
final BasicDataSource connectionPool = DatabaseHelper.getConnectionPoolFromEnv();
final ProcessBuilderFactory pbf =
new DockerProcessBuilderFactory(
workspaceRoot, configs.getWorkspaceDockerMount(), configs.getDockerNetwork());
LOGGER.info("Launching scheduler...");
new SchedulerApp(connectionPool, configPersistenceRoot).start();
new SchedulerApp(connectionPool, configRoot, workspaceRoot, pbf).start();
}
}

View File

@@ -29,13 +29,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.db.DatabaseHelper;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,16 +57,19 @@ public class WorkerRun<InputType, OutputType> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRun.class);
private final long jobId;
private final Path jobRoot;
private final InputType input;
private final Worker<InputType, OutputType> worker;
private final BasicDataSource connectionPool;
public WorkerRun(
long jobId,
Path jobRoot,
InputType input,
Worker<InputType, OutputType> worker,
BasicDataSource connectionPool) {
this.jobId = jobId;
this.jobRoot = jobRoot;
this.input = input;
this.worker = worker;
this.connectionPool = connectionPool;
@@ -78,13 +81,9 @@ public class WorkerRun<InputType, OutputType> implements Runnable {
try {
setJobStatus(connectionPool, jobId, JobStatus.RUNNING);
// todo (cgardens) - replace this with whatever the correct path is. probably dependency
// inject it based via env.
final Path workspacesRoot = Path.of("/tmp/dataline/workspaces/");
FileUtils.forceMkdir(workspacesRoot.toFile());
final Path workspaceRoot = workspacesRoot.resolve(String.valueOf(jobId));
FileUtils.forceMkdir(workspaceRoot.toFile());
OutputAndStatus<OutputType> outputAndStatus = worker.run(input, workspaceRoot);
Files.createDirectories(jobRoot);
OutputAndStatus<OutputType> outputAndStatus = worker.run(input, jobRoot);
switch (outputAndStatus.getStatus()) {
case FAILED:

View File

@@ -31,11 +31,13 @@ import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardSyncInput;
import io.dataline.workers.DefaultSyncWorker;
import io.dataline.workers.process.ProcessBuilderFactory;
import io.dataline.workers.singer.SingerCheckConnectionWorker;
import io.dataline.workers.singer.SingerDiscoverSchemaWorker;
import io.dataline.workers.singer.SingerTapFactory;
import io.dataline.workers.singer.SingerTargetFactory;
import java.io.IOException;
import java.nio.file.Path;
import org.apache.commons.dbcp2.BasicDataSource;
/**
@@ -43,15 +45,24 @@ import org.apache.commons.dbcp2.BasicDataSource;
* appropriate worker for a given job.
*/
public class WorkerRunner implements Runnable {
private final long jobId;
private final BasicDataSource connectionPool;
private final SchedulerPersistence persistence;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public WorkerRunner(
long jobId, BasicDataSource connectionPool, SchedulerPersistence persistence) {
long jobId,
BasicDataSource connectionPool,
SchedulerPersistence persistence,
Path workspaceRoot,
ProcessBuilderFactory pbf) {
this.jobId = jobId;
this.connectionPool = connectionPool;
this.persistence = persistence;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}
@Override
@@ -63,6 +74,8 @@ public class WorkerRunner implements Runnable {
throw new RuntimeException(e);
}
final Path jobRoot = workspaceRoot.resolve(String.valueOf(jobId));
switch (job.getConfig().getConfigType()) {
case CHECK_CONNECTION_SOURCE:
case CHECK_CONNECTION_DESTINATION:
@@ -70,9 +83,10 @@ public class WorkerRunner implements Runnable {
getCheckConnectionInput(job.getConfig().getCheckConnection());
new WorkerRun<>(
jobId,
jobRoot,
checkConnectionInput,
new SingerCheckConnectionWorker(
job.getConfig().getCheckConnection().getDockerImage()),
job.getConfig().getCheckConnection().getDockerImage(), pbf),
connectionPool)
.run();
break;
@@ -81,9 +95,10 @@ public class WorkerRunner implements Runnable {
getDiscoverSchemaInput(job.getConfig().getDiscoverSchema());
new WorkerRun<>(
jobId,
jobRoot,
discoverSchemaInput,
new SingerDiscoverSchemaWorker(
job.getConfig().getDiscoverSchema().getDockerImage()),
job.getConfig().getDiscoverSchema().getDockerImage(), pbf),
connectionPool)
.run();
break;
@@ -91,14 +106,16 @@ public class WorkerRunner implements Runnable {
final StandardSyncInput syncInput = getSyncInput(job.getConfig().getSync());
new WorkerRun<>(
jobId,
jobRoot,
syncInput,
// todo (cgardens) - still locked into only using SingerTaps and Targets. Next step
// here is to create DefaultTap and DefaultTarget which will be able to
// interoperate with SingerTap and SingerTarget now that they are split and
// mediated in DefaultSyncWorker.
new DefaultSyncWorker(
new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage()),
new SingerTargetFactory(job.getConfig().getSync().getDestinationDockerImage())),
new SingerTapFactory(job.getConfig().getSync().getSourceDockerImage(), pbf),
new SingerTargetFactory(
job.getConfig().getSync().getDestinationDockerImage(), pbf)),
connectionPool)
.run();
break;

View File

@@ -30,7 +30,7 @@ application {
run {
// default for running on local machine.
environment "CONFIG_PERSISTENCE_ROOT", new File(".").absolutePath + "data/config"
environment "CONFIG_ROOT", new File(".").absolutePath + "data/config"
environment "VERSION", "0.1.0"
environment "DATABASE_USER", "postgres"
environment "DATABASE_PASSWORD", ""

View File

@@ -25,18 +25,20 @@
package io.dataline.server;
import io.dataline.server.apis.ConfigurationApi;
import java.nio.file.Path;
import org.apache.commons.dbcp2.BasicDataSource;
import org.glassfish.hk2.api.Factory;
public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
private static String dbRoot;
private static Path dbRoot;
private static BasicDataSource connectionPool;
public static void setConfigPersistenceRoot(String dbRoot) {
public static void setConfigPersistenceRoot(final Path dbRoot) {
ConfigurationApiFactory.dbRoot = dbRoot;
}
public static void setDbConnectionPool(BasicDataSource connectionPool) {
public static void setDbConnectionPool(final BasicDataSource connectionPool) {
ConfigurationApiFactory.connectionPool = connectionPool;
}

View File

@@ -24,6 +24,8 @@
package io.dataline.server;
import io.dataline.config.Configs;
import io.dataline.config.EnvConfigs;
import io.dataline.db.DatabaseHelper;
import io.dataline.server.apis.ConfigurationApi;
import io.dataline.server.errors.InvalidInputExceptionMapper;
@@ -31,6 +33,7 @@ import io.dataline.server.errors.InvalidJsonExceptionMapper;
import io.dataline.server.errors.InvalidJsonInputExceptionMapper;
import io.dataline.server.errors.KnownExceptionMapper;
import io.dataline.server.errors.UncaughtExceptionMapper;
import java.nio.file.Path;
import java.util.logging.Level;
import org.apache.commons.dbcp2.BasicDataSource;
import org.eclipse.jetty.server.Server;
@@ -46,12 +49,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerApp {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class);
private final String configPersistenceRoot;
public ServerApp(String configPersistenceRoot) {
private final Path configRoot;
this.configPersistenceRoot = configPersistenceRoot;
public ServerApp(final Path configRoot) {
this.configRoot = configRoot;
}
public void start() throws Exception {
@@ -61,7 +65,7 @@ public class ServerApp {
ServletContextHandler handler = new ServletContextHandler();
ConfigurationApiFactory.setConfigPersistenceRoot(configPersistenceRoot);
ConfigurationApiFactory.setConfigPersistenceRoot(configRoot);
ConfigurationApiFactory.setDbConnectionPool(connectionPool);
ResourceConfig rc =
@@ -110,10 +114,12 @@ public class ServerApp {
}
public static void main(String[] args) throws Exception {
final String configPersistenceRoot = System.getenv("CONFIG_PERSISTENCE_ROOT");
LOGGER.info("configPersistenceRoot = " + configPersistenceRoot);
final Configs configs = new EnvConfigs();
final Path configRoot = configs.getConfigRoot();
LOGGER.info("configRoot = " + configRoot);
LOGGER.info("Starting server...");
new ServerApp(configPersistenceRoot).start();
new ServerApp(configRoot).start();
}
}

View File

@@ -73,11 +73,11 @@ import io.dataline.server.handlers.SourceSpecificationsHandler;
import io.dataline.server.handlers.SourcesHandler;
import io.dataline.server.handlers.WorkspacesHandler;
import io.dataline.server.validation.IntegrationSchemaValidation;
import java.nio.file.Path;
import javax.validation.Valid;
import javax.ws.rs.Path;
import org.apache.commons.dbcp2.BasicDataSource;
@Path("/v1")
@javax.ws.rs.Path("/v1")
public class ConfigurationApi implements io.dataline.api.V1Api {
private final WorkspacesHandler workspacesHandler;
private final SourcesHandler sourcesHandler;
@@ -90,7 +90,7 @@ public class ConfigurationApi implements io.dataline.api.V1Api {
private final SchedulerHandler schedulerHandler;
private final JobHistoryHandler jobHistoryHandler;
public ConfigurationApi(String dbRoot, BasicDataSource connectionPool) {
public ConfigurationApi(final Path dbRoot, BasicDataSource connectionPool) {
ConfigPersistence configPersistence = new DefaultConfigPersistence(dbRoot);
final IntegrationSchemaValidation integrationSchemaValidation =
new IntegrationSchemaValidation(configPersistence);

View File

@@ -58,7 +58,7 @@ public class DefaultSyncWorker implements SyncWorker {
}
@Override
public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path workspacePath) {
public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path jobRoot) {
long startTime = System.currentTimeMillis();
final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput);
@@ -67,17 +67,17 @@ public class DefaultSyncWorker implements SyncWorker {
final SingerMessageTracker singerMessageTracker =
new SingerMessageTracker(syncInput.getStandardSync().getConnectionId());
try (Stream<SingerMessage> tap = singerTapFactory.create(tapConfig, workspacePath);
try (Stream<SingerMessage> tap = singerTapFactory.create(tapConfig, jobRoot);
CloseableConsumer<SingerMessage> consumer =
singerTargetFactory.create(targetConfig, workspacePath)) {
singerTargetFactory.create(targetConfig, jobRoot)) {
tap.takeWhile(record -> !cancelled.get()).peek(singerMessageTracker).forEach(consumer);
} catch (Exception e) {
LOGGER.debug(
"Sync worker failed. Tap error log: {}.\n Target error log: {}",
WorkerUtils.readFileFromWorkspace(workspacePath, TAP_ERR_LOG),
WorkerUtils.readFileFromWorkspace(workspacePath, TARGET_ERR_LOG));
WorkerUtils.readFileFromWorkspace(jobRoot, TAP_ERR_LOG),
WorkerUtils.readFileFromWorkspace(jobRoot, TARGET_ERR_LOG));
return new OutputAndStatus<>(JobStatus.FAILED, null);
}

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dataline.config.StandardCheckConnectionInput;
import io.dataline.config.StandardCheckConnectionOutput;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
@@ -35,22 +36,25 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DockerCheckConnectionWorker implements CheckConnectionWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerCheckConnectionWorker.class);
private static final String INPUT = "input.json";
private static final String OUTPUT = "output.json";
private final String imageName;
private final ProcessBuilderFactory pbf;
Process tapProcess;
public DockerCheckConnectionWorker(String imageName) {
public DockerCheckConnectionWorker(final String imageName, final ProcessBuilderFactory pbf) {
this.imageName = imageName;
this.pbf = pbf;
}
@Override
public OutputAndStatus<StandardCheckConnectionOutput> run(
StandardCheckConnectionInput standardCheckConnectionInput, Path workspacePath) {
StandardCheckConnectionInput standardCheckConnectionInput, Path jobRoot) {
final ObjectMapper objectMapper = new ObjectMapper();
// write input struct to docker image
@@ -60,25 +64,19 @@ public class DockerCheckConnectionWorker implements CheckConnectionWorker {
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
final Path configPath =
WorkerUtils.writeFileToWorkspace(workspacePath, INPUT, inputString); // wrong type
WorkerUtils.writeFileToWorkspace(jobRoot, INPUT, inputString); // wrong type
// run it. patiently.
try {
String[] tapCmd = {
"docker", "run", workspacePath.toString(), imageName, "--config", configPath.toString()
};
LOGGER.debug("Tap command: {}", String.join(" ", tapCmd));
tapProcess = new ProcessBuilder().command(tapCmd).start();
tapProcess = pbf.create(jobRoot, imageName, "--config", INPUT).start();
while (!tapProcess.waitFor(1, TimeUnit.MINUTES)) {
LOGGER.debug("Waiting for worker");
}
// read output struct. assume it is written to correct place.
final String outputString = WorkerUtils.readFileFromWorkspace(workspacePath, OUTPUT);
final String outputString = WorkerUtils.readFileFromWorkspace(jobRoot, OUTPUT);
final StandardCheckConnectionOutput standardCheckConnectionOutput =
objectMapper.readValue(outputString, StandardCheckConnectionOutput.class);

View File

@@ -34,8 +34,8 @@ public class EchoWorker implements Worker<String, String> {
public EchoWorker() {}
@Override
public OutputAndStatus<String> run(String string, Path workspaceRoot) {
LOGGER.info("Hello World. input: {}, workspace root: {}", string, workspaceRoot);
public OutputAndStatus<String> run(String string, Path jobRoot) {
LOGGER.info("Hello World. input: {}, workspace root: {}", string, jobRoot);
return new OutputAndStatus<>(JobStatus.SUCCESSFUL, "echoed");
}

View File

@@ -31,7 +31,7 @@ public interface Worker<InputType, OutputType> {
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return
* either COMPLETE, FAILED, or CANCELLED.
*/
OutputAndStatus<OutputType> run(InputType inputType, Path workspacePath)
OutputAndStatus<OutputType> run(InputType inputType, Path jobRoot)
throws InvalidCredentialsException, InvalidCatalogException;
void cancel();

View File

@@ -0,0 +1,74 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.workers.process;
import com.google.common.collect.Lists;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DockerProcessBuilderFactory implements ProcessBuilderFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessBuilderFactory.class);
private static final Path MOUNT_DESTINATION = Path.of("/data");
private final String mountSource;
private final Path workspaceRoot;
private final String networkName;
public DockerProcessBuilderFactory(Path workspaceRoot, String mountSource, String networkName) {
this.mountSource = mountSource;
this.workspaceRoot = workspaceRoot;
this.networkName = networkName;
}
@Override
public ProcessBuilder create(final Path jobRoot, final String imageName, final String... args) {
final List<String> cmd =
Lists.newArrayList(
"docker",
"run",
"-v",
String.format("%s:%s", mountSource, MOUNT_DESTINATION),
"-w",
rebasePath(jobRoot).toString(),
"--network",
networkName,
imageName);
cmd.addAll(Arrays.asList(args));
LOGGER.debug("Preparing command: {}", cmd);
return new ProcessBuilder(cmd);
}
private Path rebasePath(final Path jobRoot) {
final Path relativePath = workspaceRoot.relativize(jobRoot);
return MOUNT_DESTINATION.resolve(relativePath);
}
}

View File

@@ -22,14 +22,11 @@
* SOFTWARE.
*/
package io.dataline.workers.singer.postgres_tap;
package io.dataline.workers.process;
import io.dataline.integrations.Integrations;
import io.dataline.workers.singer.SingerCheckConnectionWorker;
import java.nio.file.Path;
public class SingerPostgresTapCheckConnectionWorker extends SingerCheckConnectionWorker {
public interface ProcessBuilderFactory {
public SingerPostgresTapCheckConnectionWorker() {
super(Integrations.POSTGRES_TAP.getCheckConnectionImage());
}
ProcessBuilder create(Path jobPath, String imageName, String... args);
}

View File

@@ -32,6 +32,7 @@ import io.dataline.workers.CheckConnectionWorker;
import io.dataline.workers.InvalidCredentialsException;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,19 +45,19 @@ public class SingerCheckConnectionWorker
private final SingerDiscoverSchemaWorker singerDiscoverSchemaWorker;
public SingerCheckConnectionWorker(String imageName) {
this.singerDiscoverSchemaWorker = new SingerDiscoverSchemaWorker(imageName);
public SingerCheckConnectionWorker(final String imageName, final ProcessBuilderFactory pbf) {
this.singerDiscoverSchemaWorker = new SingerDiscoverSchemaWorker(imageName, pbf);
}
@Override
public OutputAndStatus<StandardCheckConnectionOutput> run(
StandardCheckConnectionInput input, Path workspaceRoot) throws InvalidCredentialsException {
StandardCheckConnectionInput input, Path jobRoot) throws InvalidCredentialsException {
final StandardDiscoverSchemaInput discoverSchemaInput = new StandardDiscoverSchemaInput();
discoverSchemaInput.setConnectionConfiguration(input.getConnectionConfiguration());
OutputAndStatus<StandardDiscoverSchemaOutput> outputAndStatus =
singerDiscoverSchemaWorker.run(discoverSchemaInput, workspaceRoot);
singerDiscoverSchemaWorker.run(discoverSchemaInput, jobRoot);
StandardCheckConnectionOutput output = new StandardCheckConnectionOutput();
JobStatus jobStatus;
if (outputAndStatus.getStatus() == JobStatus.SUCCESSFUL

View File

@@ -37,7 +37,7 @@ import io.dataline.workers.DiscoverSchemaWorker;
import io.dataline.workers.InvalidCredentialsException;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.utils.DockerUtils;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
@@ -55,17 +55,20 @@ public class SingerDiscoverSchemaWorker
private static String CATALOG_JSON_FILENAME = "catalog.json";
private static String ERROR_LOG_FILENAME = "err.log";
private volatile Process workerProcess;
private final String imageName;
private final ProcessBuilderFactory pbf;
public SingerDiscoverSchemaWorker(String imageName) {
private volatile Process workerProcess;
public SingerDiscoverSchemaWorker(final String imageName, final ProcessBuilderFactory pbf) {
this.imageName = imageName;
this.pbf = pbf;
}
// package private since package-local classes need direct access to singer catalog, and the
// conversion from SingerSchema to Dataline schema is lossy
OutputAndStatus<SingerCatalog> runInternal(
StandardDiscoverSchemaInput discoverSchemaInput, Path workspaceRoot)
StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot)
throws InvalidCredentialsException {
// todo (cgardens) - just getting original impl to line up with new iface for now. this can be
// reduced.
@@ -78,25 +81,14 @@ public class SingerDiscoverSchemaWorker
throw new RuntimeException(e);
}
writeFile(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson);
writeFile(jobRoot, CONFIG_JSON_FILENAME, configDotJson);
// exec
try {
String[] cmd =
DockerUtils.getDockerCommand(
workspaceRoot,
imageName,
"--config",
CONFIG_JSON_FILENAME,
imageName,
"--config",
CONFIG_JSON_FILENAME,
"--discover");
workerProcess =
new ProcessBuilder(cmd)
.redirectError(getFullPath(workspaceRoot, ERROR_LOG_FILENAME).toFile())
.redirectOutput(getFullPath(workspaceRoot, CATALOG_JSON_FILENAME).toFile())
pbf.create(jobRoot, imageName, "--config", CONFIG_JSON_FILENAME, "--discover")
.redirectError(getFullPath(jobRoot, ERROR_LOG_FILENAME).toFile())
.redirectOutput(getFullPath(jobRoot, CATALOG_JSON_FILENAME).toFile())
.start();
while (!workerProcess.waitFor(1, TimeUnit.MINUTES)) {
@@ -105,12 +97,12 @@ public class SingerDiscoverSchemaWorker
int exitCode = workerProcess.exitValue();
if (exitCode == 0) {
String catalog = readFile(workspaceRoot, CATALOG_JSON_FILENAME);
String catalog = readFile(jobRoot, CATALOG_JSON_FILENAME);
final SingerCatalog singerCatalog = jsonCatalogToTyped(catalog);
return new OutputAndStatus<>(SUCCESSFUL, singerCatalog);
} else {
// TODO throw invalid credentials exception where appropriate based on error log
String errLog = readFile(workspaceRoot, ERROR_LOG_FILENAME);
String errLog = readFile(jobRoot, ERROR_LOG_FILENAME);
LOGGER.debug(
"Discovery job subprocess finished with exit code {}. Error log: {}", exitCode, errLog);
return new OutputAndStatus<>(FAILED);
@@ -123,9 +115,9 @@ public class SingerDiscoverSchemaWorker
@Override
public OutputAndStatus<StandardDiscoverSchemaOutput> run(
StandardDiscoverSchemaInput discoverSchemaInput, Path workspaceRoot)
StandardDiscoverSchemaInput discoverSchemaInput, Path jobRoot)
throws InvalidCredentialsException {
OutputAndStatus<SingerCatalog> output = runInternal(discoverSchemaInput, workspaceRoot);
OutputAndStatus<SingerCatalog> output = runInternal(discoverSchemaInput, jobRoot);
JobStatus status = output.getStatus();
OutputAndStatus<StandardDiscoverSchemaOutput> finalOutput;

View File

@@ -36,17 +36,17 @@ import io.dataline.workers.InvalidCredentialsException;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.TapFactory;
import io.dataline.workers.WorkerUtils;
import io.dataline.workers.process.ProcessBuilderFactory;
import io.dataline.workers.protocol.singer.SingerJsonIterator;
import io.dataline.workers.utils.DockerUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerTapFactory implements TapFactory<SingerMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(SingerTapFactory.class);
private static final String CONFIG_JSON_FILENAME = "tap_config.json";
@@ -54,20 +54,22 @@ public class SingerTapFactory implements TapFactory<SingerMessage> {
private static final String STATE_JSON_FILENAME = "input_state.json";
private final String dockerImageName;
private final String imageName;
private final ProcessBuilderFactory pbf;
private Process tapProcess = null;
private InputStream stdout = null;
public SingerTapFactory(String dockerImageName) {
this.dockerImageName = dockerImageName;
public SingerTapFactory(final String imageName, final ProcessBuilderFactory pbf) {
this.imageName = imageName;
this.pbf = pbf;
}
@SuppressWarnings("UnstableApiUsage")
@Override
public Stream<SingerMessage> create(StandardTapConfig input, Path workspaceRoot)
public Stream<SingerMessage> create(StandardTapConfig input, Path jobRoot)
throws InvalidCredentialsException {
OutputAndStatus<SingerCatalog> discoveryOutput = runDiscovery(input, workspaceRoot);
OutputAndStatus<SingerCatalog> discoveryOutput = runDiscovery(input, jobRoot);
final ObjectMapper objectMapper = new ObjectMapper();
final String configDotJson;
@@ -87,36 +89,24 @@ public class SingerTapFactory implements TapFactory<SingerMessage> {
throw new RuntimeException(e);
}
// write config.json to disk
Path configPath =
WorkerUtils.writeFileToWorkspace(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson);
Path catalogPath =
WorkerUtils.writeFileToWorkspace(workspaceRoot, CATALOG_JSON_FILENAME, catalogDotJson);
Path statePath =
WorkerUtils.writeFileToWorkspace(workspaceRoot, STATE_JSON_FILENAME, stateDotJson);
WorkerUtils.writeFileToWorkspace(jobRoot, CONFIG_JSON_FILENAME, configDotJson);
WorkerUtils.writeFileToWorkspace(jobRoot, CATALOG_JSON_FILENAME, catalogDotJson);
WorkerUtils.writeFileToWorkspace(jobRoot, STATE_JSON_FILENAME, stateDotJson);
try {
String[] tapCmd =
DockerUtils.getDockerCommand(
workspaceRoot,
dockerImageName,
tapProcess =
pbf.create(
jobRoot,
imageName,
"--config",
configPath.toString(),
CONFIG_JSON_FILENAME,
// TODO support both --properties and --catalog depending on integration
"--properties",
catalogPath.toString(),
CATALOG_JSON_FILENAME,
"--state",
statePath.toString());
LOGGER.info("running command: {}", Arrays.toString(tapCmd));
tapProcess =
new ProcessBuilder()
.command(tapCmd)
.redirectError(workspaceRoot.resolve(DefaultSyncWorker.TAP_ERR_LOG).toFile())
STATE_JSON_FILENAME)
.redirectError(jobRoot.resolve(DefaultSyncWorker.TAP_ERR_LOG).toFile())
.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -145,7 +135,7 @@ public class SingerTapFactory implements TapFactory<SingerMessage> {
discoveryInput.setConnectionConfiguration(
input.getSourceConnectionImplementation().getConfiguration());
Path scopedWorkspace = workspaceRoot.resolve("discovery");
return new SingerDiscoverSchemaWorker(dockerImageName)
return new SingerDiscoverSchemaWorker(imageName, pbf)
.runInternal(discoveryInput, scopedWorkspace);
}
}

View File

@@ -34,7 +34,7 @@ import io.dataline.workers.DefaultSyncWorker;
import io.dataline.workers.TargetConsumer;
import io.dataline.workers.TargetFactory;
import io.dataline.workers.WorkerUtils;
import io.dataline.workers.utils.DockerUtils;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -43,19 +43,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerTargetFactory implements TargetFactory<SingerMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(SingerTargetFactory.class);
private static final String CONFIG_JSON_FILENAME = "target_config.json";
private final String dockerImageName;
private final String imageName;
private final ProcessBuilderFactory pbf;
public SingerTargetFactory(String dockerImageName) {
this.dockerImageName = dockerImageName;
public SingerTargetFactory(final String imageName, final ProcessBuilderFactory pbf) {
this.imageName = imageName;
this.pbf = pbf;
}
@Override
public CloseableConsumer<SingerMessage> create(
StandardTargetConfig targetConfig, Path workspacePath) {
public CloseableConsumer<SingerMessage> create(StandardTargetConfig targetConfig, Path jobRoot) {
final ObjectMapper objectMapper = new ObjectMapper();
final String configDotJson;
@@ -69,17 +71,12 @@ public class SingerTargetFactory implements TargetFactory<SingerMessage> {
// write config.json to disk
Path configPath =
WorkerUtils.writeFileToWorkspace(workspacePath, CONFIG_JSON_FILENAME, configDotJson);
String[] dockerCmd =
DockerUtils.getDockerCommand(
workspacePath, dockerImageName, "--config", configPath.toString());
WorkerUtils.writeFileToWorkspace(jobRoot, CONFIG_JSON_FILENAME, configDotJson);
try {
final Process targetProcess =
new ProcessBuilder()
.command(dockerCmd)
.redirectError(workspacePath.resolve(DefaultSyncWorker.TARGET_ERR_LOG).toFile())
pbf.create(jobRoot, imageName, "--config", configPath.toString())
.redirectError(jobRoot.resolve(DefaultSyncWorker.TARGET_ERR_LOG).toFile())
.start();
try (BufferedWriter writer =

View File

@@ -1,43 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.workers.utils;
import java.nio.file.Path;
import org.apache.commons.lang3.ArrayUtils;
public class DockerUtils {
public static String[] getDockerCommand(Path workspaceRoot, String imageName, String... args) {
final String[] dockerCommander = {
"docker",
"run",
"-v",
String.format("%s:/dataline/data", workspaceRoot.toString()),
"--network=host",
imageName
};
return ArrayUtils.addAll(dockerCommander, args);
}
}

View File

@@ -28,6 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import io.dataline.workers.process.DockerProcessBuilderFactory;
import io.dataline.workers.process.ProcessBuilderFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
@@ -36,28 +38,26 @@ import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class BaseWorkerTestCase {
// TODO inject via env
protected Path workspaceDirectory;
protected Path workspaceRoot;
protected ProcessBuilderFactory pbf;
@BeforeAll
public void init() throws IOException {
FileUtils.forceMkdir(new File("/tmp/tests"));
workspaceDirectory = Files.createTempDirectory(Path.of("/tmp/tests"), "dataline");
System.out.println("Workspace directory: " + workspaceDirectory.toString());
final Path testsPath = Path.of("/tmp/tests");
Files.createDirectories(testsPath);
this.workspaceRoot = Files.createTempDirectory(testsPath, "dataline");
this.pbf = new DockerProcessBuilderFactory(workspaceRoot, workspaceRoot.toString(), "host");
System.out.println("Workspace directory: " + workspaceRoot.toString());
}
protected Path createWorkspacePath(String jobId) {
final Path workspacePath = workspaceDirectory.resolve(jobId);
try {
FileUtils.forceMkdir(workspacePath.toFile());
} catch (IOException e) {
throw new RuntimeException(e);
}
return workspacePath;
protected Path createJobRoot(String jobId) throws IOException {
final Path jobRoot = workspaceRoot.resolve(jobId);
return Files.createDirectories(jobRoot);
}
protected String readResource(String name) {

View File

@@ -43,7 +43,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -73,9 +72,9 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
standardCheckConnectionInput.setConnectionConfiguration(o);
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage());
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf);
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
worker.run(standardCheckConnectionInput, createJobRoot(jobId));
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
@@ -97,7 +96,7 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
db.getFirstMappedPort() + "");
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage());
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf);
final Object o = new ObjectMapper().readValue(incorrectCreds, Object.class);
final StandardCheckConnectionInput standardCheckConnectionInput =
@@ -105,7 +104,7 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
standardCheckConnectionInput.setConnectionConfiguration(o);
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
worker.run(standardCheckConnectionInput, createJobRoot(jobId));
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
@@ -114,7 +113,6 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
// in the logs
}
@Disabled
@Test
public void testSuccessfulConnection()
throws IOException, InvalidCredentialsException, InvalidCatalogException {
@@ -128,9 +126,9 @@ public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
standardCheckConnectionInput.setConnectionConfiguration(o);
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage());
new SingerCheckConnectionWorker(Integrations.POSTGRES_TAP.getCheckConnectionImage(), pbf);
OutputAndStatus<StandardCheckConnectionOutput> run =
worker.run(standardCheckConnectionInput, createWorkspacePath(jobId));
worker.run(standardCheckConnectionInput, createJobRoot(jobId));
assertEquals(SUCCESSFUL, run.getStatus());
assertTrue(run.getOutput().isPresent());

View File

@@ -34,19 +34,16 @@ import io.dataline.config.StandardDiscoverSchemaInput;
import io.dataline.config.StandardDiscoverSchemaOutput;
import io.dataline.integrations.Integrations;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.InvalidCatalogException;
import io.dataline.workers.InvalidCredentialsException;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.PostgreSQLContainerTestHelper;
import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
@@ -56,28 +53,25 @@ public class SingerDiscoverSchemaWorkerTest extends BaseWorkerTestCase {
PostgreSQLContainer db;
@BeforeAll
public void initDb() throws SQLException, IOException, InterruptedException {
public void initDb() throws IOException, InterruptedException {
db = new PostgreSQLContainer();
db.start();
PostgreSQLContainerTestHelper.runSqlScript(
MountableFile.forClasspathResource("simple_postgres_init.sql"), db);
}
@Disabled
@Test
public void testPostgresDiscovery()
throws IOException, InvalidCredentialsException, InvalidCatalogException {
public void testPostgresDiscovery() throws IOException, InvalidCredentialsException {
final String jobId = "1";
String postgresCreds = PostgreSQLContainerTestHelper.getSingerTapConfig(db);
final Object o = new ObjectMapper().readValue(postgresCreds, Object.class);
final StandardDiscoverSchemaInput input = new StandardDiscoverSchemaInput();
input.setConnectionConfiguration(o);
System.out.println(input);
SingerDiscoverSchemaWorker worker =
new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage());
new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage(), pbf);
OutputAndStatus<StandardDiscoverSchemaOutput> run =
worker.run(input, createWorkspacePath(jobId));
OutputAndStatus<StandardDiscoverSchemaOutput> run = worker.run(input, createJobRoot(jobId));
assertEquals(SUCCESSFUL, run.getStatus());
@@ -100,17 +94,17 @@ public class SingerDiscoverSchemaWorkerTest extends BaseWorkerTestCase {
input.setConnectionConfiguration(o);
SingerDiscoverSchemaWorker worker =
new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage());
new SingerDiscoverSchemaWorker(Integrations.POSTGRES_TAP.getDiscoverSchemaImage(), pbf);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<?> workerWasCancelled =
threadPool.submit(
() -> {
OutputAndStatus<StandardDiscoverSchemaOutput> output = null;
try {
output = worker.run(input, createWorkspacePath(jobId));
OutputAndStatus<StandardDiscoverSchemaOutput> output =
worker.run(input, createJobRoot(jobId));
assertEquals(FAILED, output.getStatus());
} catch (InvalidCredentialsException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
});

View File

@@ -3,7 +3,7 @@
"tables": [
{
"name": "id_and_name",
"selected": true,
"selected": false,
"columns": [
{
"name": "name",

View File

@@ -24,3 +24,10 @@ services:
dockerfile: Dockerfile.build
target: webapp
context: .
volumes:
workspace:
networks:
dataline:
driver: "bridge"

View File

@@ -18,7 +18,7 @@ services:
- DATABASE_URL=${DATABASE_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS=db:5432
- CONFIG_PERSISTENCE_ROOT=${CONFIG_PERSISTENCE_ROOT}
- CONFIG_ROOT=${CONFIG_ROOT}
depends_on:
- db
server:
@@ -31,7 +31,7 @@ services:
- DATABASE_URL=${DATABASE_URL}
- WAIT_BEFORE_HOSTS=5
- WAIT_HOSTS=db:5432
- CONFIG_PERSISTENCE_ROOT=${CONFIG_PERSISTENCE_ROOT}
- CONFIG_ROOT=${CONFIG_ROOT}
ports:
- 8001:8001
depends_on:

33
tools/bin/gradled.sh Executable file
View File

@@ -0,0 +1,33 @@
#!/usr/bin/env bash
set -e
. tools/lib/lib.sh
IMG_NAME=dataline/build-project:dev
TMP_VOLUME_NAME=gradlew-tmp
main() {
assert_root
if [[ $# -gt 0 ]]; then
OPTS=
CMD="./gradlew $@"
else
OPTS=-it
CMD=/bin/bash
fi
local args=${@:-/bin/bash}
docker build -f Dockerfile.build . -t $IMG_NAME --target build-project
docker run $OPTS --rm \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/code \
-p 5005:5005 \
-e GRADLE_OPTS="-Dorg.gradle.daemon=false" \
$IMG_NAME $CMD
}
main "$@"