1
0
mirror of synced 2025-12-23 21:03:15 -05:00

source-mssql: adopt cleaned-up cdk (#34749)

This commit is contained in:
Marius Posta
2024-02-09 12:02:49 -08:00
committed by GitHub
parent 7ff484868f
commit d80a9fca8f
5 changed files with 60 additions and 36 deletions

View File

@@ -1,20 +1,13 @@
plugins {
id 'application'
id 'airbyte-java-connector'
}
airbyteJavaConnector {
cdkVersionRequired = '0.18.0'
cdkVersionRequired = '0.19.0'
features = ['db-sources']
useLocalCdk = false
}
configurations.all {
resolutionStrategy {
force libs.jooq
}
}
java {
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes"
@@ -27,17 +20,14 @@ application {
}
dependencies {
implementation libs.postgresql
implementation libs.debezium.sqlserver
implementation libs.debezium.embedded
implementation 'com.microsoft.sqlserver:mssql-jdbc:10.2.1.jre8'
implementation 'io.debezium:debezium-embedded:2.4.0.Final'
implementation 'io.debezium:debezium-connector-sqlserver:2.4.0.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.awaitility:awaitility:4.2.0'
testFixturesImplementation 'org.testcontainers:mssqlserver:1.19.0'
testImplementation libs.testcontainers.mssqlserver
testFixturesImplementation libs.testcontainers.mssqlserver
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.testcontainers:mssqlserver:1.19.0'
}

View File

@@ -17,11 +17,11 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import microsoft.sql.DateTimeOffset;
import org.apache.commons.codec.binary.Base64;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,7 +197,7 @@ public class MssqlDebeziumConverter implements CustomConverter<SchemaBuilder, Re
}
if (input instanceof byte[]) {
return Base64.encodeBase64String((byte[]) input);
return Base64.getEncoder().encodeToString((byte[]) input);
}
LOGGER.warn("Uncovered binary class type '{}'. Use default converter",

View File

@@ -27,8 +27,8 @@ import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Base64;
import microsoft.sql.DateTimeOffset;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,7 +128,7 @@ public class MssqlSourceOperations extends JdbcSourceOperations {
final int index)
throws SQLException {
final byte[] bytes = resultSet.getBytes(index);
final String value = Base64.encodeBase64String(bytes);
final String value = Base64.getEncoder().encodeToString(bytes);
node.put(columnName, value);
}

View File

@@ -6,11 +6,16 @@ package io.airbyte.integrations.source.mssql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.db.factory.DSLContextFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer;
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
@@ -25,6 +30,8 @@ import io.airbyte.protocol.models.v0.SyncMode;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import org.jooq.SQLDialect;
public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
@@ -33,14 +40,15 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
public abstract SshTunnel.TunnelMethod getTunnelMethod();
protected MsSQLTestDatabase testdb;
protected SshBastionContainer bastion;
private final SshBastionContainer bastion = new SshBastionContainer();
private MsSQLTestDatabase testdb;
@Override
protected JsonNode getConfig() {
try {
return testdb.integrationTestConfigBuilder()
.with("tunnel_method", bastion.getTunnelMethod(getTunnelMethod(), false))
.withoutSsl()
.with("tunnel_method", bastion.getTunnelMethod(getTunnelMethod(), true))
.build();
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -49,23 +57,51 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
}
}
private void populateDatabaseTestData() throws Exception {
final var outerConfig = testdb.integrationTestConfigBuilder()
.withSchemas("public")
.withoutSsl()
.with("tunnel_method", bastion.getTunnelMethod(getTunnelMethod(), false))
.build();
SshTunnel.sshWrap(
outerConfig,
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(ctx -> {
ctx.fetch("ALTER DATABASE %s SET AUTO_CLOSE OFF WITH NO_WAIT;", testdb.getDatabaseName());
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));");
ctx.fetch("INSERT INTO id_and_name (id, name, born) VALUES " +
"(1, 'picard', '2124-03-04T01:01:01Z'), " +
"(2, 'crusher', '2124-03-04T01:01:01Z'), " +
"(3, 'vash', '2124-03-04T01:01:01Z');");
return null;
}));
}
private static Database getDatabaseFromConfig(final JsonNode config) {
return new Database(
DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
DatabaseDriver.MSSQLSERVER.getDriverClassName(),
String.format(DatabaseDriver.MSSQLSERVER.getUrlFormatString(),
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.DEFAULT));
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2017, ContainerModifier.NETWORK);
testdb = testdb
.with("ALTER DATABASE %s SET AUTO_CLOSE OFF WITH NO_WAIT;", testdb.getDatabaseName())
.with("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));")
.with("INSERT INTO id_and_name (id, name, born) VALUES " +
"(1, 'picard', '2124-03-04T01:01:01Z'), " +
"(2, 'crusher', '2124-03-04T01:01:01Z'), " +
"(3, 'vash', '2124-03-04T01:01:01Z');");
bastion.initAndStartBastion(testdb.getContainer().getNetwork());
populateDatabaseTestData();
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
bastion.close();
testdb.close();
bastion.stopAndClose();
}
@Override

View File

@@ -28,9 +28,7 @@ public class FillMsSqlTestDbScriptTest extends AbstractSourceFillDbWithTestData
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
}
protected void tearDown(final TestDestinationEnv testEnv) {}
@Override
protected String getImageName() {