1
0
mirror of synced 2025-12-25 02:09:19 -05:00

SSH for Postgres Destination (#5743)

Co-authored-by: George Claireaux <phlair@users.noreply.github.com>
This commit is contained in:
Charles
2021-09-07 17:06:25 -07:00
committed by GitHub
parent 589d535a61
commit 8ad43afb07
24 changed files with 721 additions and 30 deletions

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.3.10
LABEL io.airbyte.version=0.3.11
LABEL io.airbyte.name=airbyte/destination-postgres

View File

@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.ArrayList;
import java.util.List;
@@ -41,16 +42,18 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);
public static final String DRIVER_CLASS = "org.postgresql.Driver";
public static final List<String> HOST_KEY = List.of("host");
public static final List<String> PORT_KEY = List.of("port");
public PostgresDestination() {
super(DRIVER_CLASS, new PostgresSQLNameTransformer(), new PostgresSqlOperations());
}
@Override
public JsonNode toJdbcConfig(JsonNode config) {
public JsonNode toJdbcConfig(final JsonNode config) {
final String schema = Optional.ofNullable(config.get("schema")).map(JsonNode::asText).orElse("public");
List<String> additionalParameters = new ArrayList<>();
final List<String> additionalParameters = new ArrayList<>();
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?",
config.get("host").asText(),
@@ -77,8 +80,8 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
return Jsons.jsonNode(configBuilder.build());
}
public static void main(String[] args) throws Exception {
final Destination destination = new PostgresDestination();
public static void main(final String[] args) throws Exception {
final Destination destination = new SshWrappedDestination(new PostgresDestination(), HOST_KEY, PORT_KEY);
LOGGER.info("starting destination: {}", PostgresDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", PostgresDestination.class);

View File

@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.postgres;
import java.nio.file.Path;
public class SshKeyPostgresDestinationAcceptanceTest extends SshPostgresDestinationAcceptanceTest {
@Override
public Path getConfigFilePath() {
return Path.of("secrets/ssh-key-config.json");
}
}

View File

@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.postgres;
import java.nio.file.Path;
public class SshPasswordPostgresDestinationAcceptanceTest extends SshPostgresDestinationAcceptanceTest {
@Override
public Path getConfigFilePath() {
return Path.of("secrets/ssh-pwd-config.json");
}
}

View File

@@ -0,0 +1,188 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.postgres;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
// todo (cgardens) - likely some of this could be further de-duplicated with
// PostgresDestinationAcceptanceTest.
/**
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshPostgresDestinationAcceptanceTest extends DestinationAcceptanceTest {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private String schemaName;
public abstract Path getConfigFilePath();
@Override
protected String getImageName() {
return "airbyte/destination-postgres:dev";
}
@Override
protected JsonNode getConfig() {
final JsonNode config = getConfigFromSecretsFile();
// do everything in a randomly generated schema so that we can wipe it out at the end.
((ObjectNode) config).put("schema", schemaName);
return config;
}
private JsonNode getConfigFromSecretsFile() {
return Jsons.deserialize(IOs.readFile(getConfigFilePath()));
}
@Override
protected JsonNode getFailCheckConfig() {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password");
return clone;
}
@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}
@Override
protected boolean supportsNormalization() {
return true;
}
@Override
protected boolean supportsDBT() {
return true;
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace)
throws Exception {
final String tableName = namingResolver.getIdentifier(streamName);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// //tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, namespace);
}
@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private static Database getDatabaseFromConfig(final JsonNode config) {
return Databases.createPostgresDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), config.get("port").asText(),
config.get("database").asText()));
}
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
final JsonNode config = getConfig();
return SshTunnel.sshWrap(
config,
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}
@Override
protected void setup(final TestDestinationEnv testEnv) throws Exception {
// do everything in a randomly generated schema so that we can wipe it out at the end.
schemaName = RandomStringUtils.randomAlphabetic(8).toLowerCase();
SshTunnel.sshWrap(
getConfig(),
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
mangledConfig -> {
getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName)));
});
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
// blow away the test schema at the end.
SshTunnel.sshWrap(
getConfig(),
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
mangledConfig -> {
getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("DROP SCHEMA %s CASCADE;", schemaName)));
});
}
}