1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Add Singer CheckConnection Worker (#75)

This commit is contained in:
Sherif A. Nada
2020-08-19 10:58:08 -07:00
committed by GitHub
parent cf10405c82
commit 9a9439fa72
11 changed files with 247 additions and 31 deletions

View File

@@ -7,8 +7,11 @@ configurations {
}
dependencies {
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.9.8"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
implementation project(":dataline-commons")
implementation project(":dataline-config")
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.9.8"
testImplementation "org.postgresql:postgresql:42.1.4"
testImplementation "org.testcontainers:testcontainers:1.14.3"
testImplementation "org.testcontainers:postgresql:1.14.3"

View File

@@ -24,6 +24,8 @@
package io.dataline.workers;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class DiscoveryOutput {
// TODO line this up with conduit config type
private final String catalog;
@@ -35,4 +37,9 @@ public class DiscoveryOutput {
public String getCatalog() {
return catalog;
}
@Override
public String toString() {
return new ToStringBuilder(this).append(catalog).toString();
}
}

View File

@@ -24,6 +24,10 @@
package io.dataline.workers;
/**
* Indicates whether the worker's underlying process was succesful. E.g this should return
* SUCCESSFUL if a connection check succeeds, FAILED otherwise.
*/
public enum JobStatus {
FAILED,
SUCCESSFUL;

View File

@@ -25,6 +25,7 @@
package io.dataline.workers;
import java.util.Optional;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class OutputAndStatus<OutputType> {
private final Optional<OutputType> output;
@@ -40,6 +41,11 @@ public class OutputAndStatus<OutputType> {
this.output = Optional.empty();
}
@Override
public String toString() {
return new ToStringBuilder(this).append(status).append(output).build();
}
public Optional<OutputType> getOutput() {
return output;
}

View File

@@ -60,7 +60,7 @@ public abstract class BaseSingerWorker<OutputType> implements Worker<OutputType>
return runInternal();
}
public abstract OutputAndStatus<OutputType> runInternal();
abstract OutputAndStatus<OutputType> runInternal();
private void createWorkspace() {
try {

View File

@@ -0,0 +1,75 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.workers.singer;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.workers.DiscoveryOutput;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerCheckConnectionWorker extends BaseSingerWorker<StandardConnectionStatus> {
private final Logger LOGGER = LoggerFactory.getLogger(SingerCheckConnectionWorker.class);
private SingerDiscoveryWorker singerDiscoveryWorker;
public SingerCheckConnectionWorker(
String workerId,
SingerTap singerTap,
String configDotJson,
String workspaceRoot,
String singerLibsRoot) {
super(workerId, workspaceRoot, singerLibsRoot);
this.singerDiscoveryWorker =
new SingerDiscoveryWorker(
workerId, configDotJson, singerTap, workspaceRoot, singerLibsRoot);
}
@Override
OutputAndStatus<StandardConnectionStatus> runInternal() {
OutputAndStatus<DiscoveryOutput> outputAndStatus = singerDiscoveryWorker.runInternal();
StandardConnectionStatus connectionStatus = new StandardConnectionStatus();
JobStatus jobStatus;
if (outputAndStatus.getStatus() == JobStatus.SUCCESSFUL
&& outputAndStatus.getOutput().isPresent()) {
connectionStatus.setStatus(StandardConnectionStatus.Status.SUCCESS);
jobStatus = JobStatus.SUCCESSFUL;
} else {
LOGGER.info(
"Connection check for worker {} unsuccessful. Discovery output: {}",
workerId,
outputAndStatus);
jobStatus = JobStatus.FAILED;
connectionStatus.setStatus(StandardConnectionStatus.Status.FAILURE);
// TODO add better error log parsing to specify the exact reason for failure as the message fieldk
}
return new OutputAndStatus<>(jobStatus, connectionStatus);
}
@Override
public void cancel() {
singerDiscoveryWorker.cancel();
}
}

View File

@@ -58,7 +58,7 @@ public class SingerDiscoveryWorker extends BaseSingerWorker<DiscoveryOutput> {
}
@Override
public OutputAndStatus<DiscoveryOutput> runInternal() {
OutputAndStatus<DiscoveryOutput> runInternal() {
// TODO use format converter here
// write config.json to disk
String configPath = writeFileToWorkspace(CONFIG_JSON_FILENAME, configDotJson);

View File

@@ -32,11 +32,15 @@ import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class BaseWorkerTestCase {
// TODO inject via env
protected String SINGER_LIB_PATH = "/usr/local/lib/singer";
private Path workspaceDirectory;
@BeforeAll
public void init() throws IOException {
workspaceDirectory = Files.createTempDirectory("dataline");
System.out.println("Workspace directory: " + workspaceDirectory.toString());
}
protected Path getWorkspacePath() {

View File

@@ -22,15 +22,35 @@
* SOFTWARE.
*/
package io.dataline.workers.singer;
package io.dataline.workers;
public class SingerTestConnectionWorker {
private final SingerConnector tapOrTarget;
private final String configDotJson;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.PostgreSQLContainer;
public SingerTestConnectionWorker(SingerConnector tapOrTarget, String configDotJson) {
public class PostgreSQLContainerHelper {
this.tapOrTarget = tapOrTarget;
this.configDotJson = configDotJson;
public static String getSingerConfigJson(PostgreSQLContainer db) throws JsonProcessingException {
return getSingerConfigJson(
db.getUsername(),
db.getPassword(),
db.getHost(),
db.getDatabaseName(),
String.valueOf(db.getFirstMappedPort()));
}
public static String getSingerConfigJson(
String user, String password, String host, String dbname, String port)
throws JsonProcessingException {
Map<String, String> creds = new HashMap<>();
creds.put("user", user);
creds.put("password", password);
creds.put("host", host);
creds.put("dbname", dbname);
creds.put("port", port);
return new ObjectMapper().writeValueAsString(creds);
}
}

View File

@@ -0,0 +1,111 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.dataline.workers.singer;
import static io.dataline.workers.JobStatus.FAILED;
import static io.dataline.workers.JobStatus.SUCCESSFUL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.dataline.config.StandardConnectionStatus;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.PostgreSQLContainerHelper;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
public class SingerCheckConnectionWorkerTest extends BaseWorkerTestCase {
private PostgreSQLContainer db;
@BeforeAll
public void initDb() throws SQLException {
db = new PostgreSQLContainer();
db.start();
Connection con =
DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
con.createStatement().execute("CREATE TABLE id_and_name (id integer, name VARCHAR(200));");
}
@Test
public void testNonexistentDb() throws JsonProcessingException {
String fakeDbCreds =
PostgreSQLContainerHelper.getSingerConfigJson(
"user", "pass", "localhost", "postgres", "111111");
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(
"1", SingerTap.POSTGRES, fakeDbCreds, getWorkspacePath().toString(), SINGER_LIB_PATH);
OutputAndStatus<StandardConnectionStatus> run = worker.run();
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}
@Test
public void testIncorrectAuthCredentials() throws JsonProcessingException {
String incorrectCreds =
PostgreSQLContainerHelper.getSingerConfigJson(
db.getUsername(),
"wrongpassword",
db.getHost(),
db.getDatabaseName(),
db.getFirstMappedPort() + "");
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(
"1",
SingerTap.POSTGRES,
incorrectCreds,
getWorkspacePath().toString(),
SINGER_LIB_PATH);
OutputAndStatus<StandardConnectionStatus> run = worker.run();
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}
@Test
public void testSuccessfulConnection() throws JsonProcessingException {
String creds = PostgreSQLContainerHelper.getSingerConfigJson(db);
SingerCheckConnectionWorker worker =
new SingerCheckConnectionWorker(
"1", SingerTap.POSTGRES, creds, getWorkspacePath().toString(), SINGER_LIB_PATH);
OutputAndStatus<StandardConnectionStatus> run = worker.run();
assertEquals(SUCCESSFUL, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.SUCCESS, run.getOutput().get().getStatus());
// TODO Once log file locations are accessible externally, also verify the correct error message
// in the logs
}
}

View File

@@ -31,18 +31,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.io.Resources;
import io.dataline.workers.BaseWorkerTestCase;
import io.dataline.workers.DiscoveryOutput;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.PostgreSQLContainerHelper;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,9 +51,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
public class TestSingerDiscoveryWorker extends BaseWorkerTestCase {
// TODO inject as env variable
private static final String SINGER_LIBS_ROOT = "/usr/local/lib/singer/";
public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
PostgreSQLContainer db;
@@ -70,19 +67,20 @@ public class TestSingerDiscoveryWorker extends BaseWorkerTestCase {
@Test
public void testPostgresDiscovery() throws IOException {
String postgresCreds = getPostgresConfigJson(db);
String postgresCreds = PostgreSQLContainerHelper.getSingerConfigJson(db);
SingerDiscoveryWorker worker =
new SingerDiscoveryWorker(
"1",
postgresCreds,
SingerTap.POSTGRES,
getWorkspacePath().toAbsolutePath().toString(),
SINGER_LIBS_ROOT);
SINGER_LIB_PATH);
OutputAndStatus<DiscoveryOutput> run = worker.run();
assertEquals(SUCCESSFUL, run.getStatus());
String expectedCatalog = readResource("simple_postgres_catalog.json");
assertTrue(run.getOutput().isPresent());
assertJsonEquals(expectedCatalog, run.getOutput().get().getCatalog());
}
@@ -90,14 +88,14 @@ public class TestSingerDiscoveryWorker extends BaseWorkerTestCase {
@Test
public void testCancellation()
throws JsonProcessingException, InterruptedException, ExecutionException {
String postgresCreds = getPostgresConfigJson(db);
String postgresCreds = PostgreSQLContainerHelper.getSingerConfigJson(db);
SingerDiscoveryWorker worker =
new SingerDiscoveryWorker(
"1",
postgresCreds,
SingerTap.POSTGRES,
getWorkspacePath().toAbsolutePath().toString(),
SINGER_LIBS_ROOT);
SINGER_LIB_PATH);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future<?> workerWasCancelled =
threadPool.submit(
@@ -124,16 +122,4 @@ public class TestSingerDiscoveryWorker extends BaseWorkerTestCase {
ObjectMapper mapper = new ObjectMapper();
assertTrue(mapper.readTree(s1).equals(mapper.readTree(s2)));
}
private String getPostgresConfigJson(PostgreSQLContainer psqlContainer)
throws JsonProcessingException {
Map<String, String> props = Maps.newHashMap();
props.put("dbname", psqlContainer.getDatabaseName());
props.put("user", psqlContainer.getUsername());
props.put("password", psqlContainer.getPassword());
props.put("host", psqlContainer.getHost());
props.put("port", String.valueOf(psqlContainer.getFirstMappedPort()));
return new ObjectMapper().writeValueAsString(props);
}
}