source-postgres: adopt cleaned-up cdk (#34751)
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
import org.jsonschema2pojo.SourceType
|
||||
|
||||
plugins {
|
||||
id 'application'
|
||||
id 'airbyte-java-connector'
|
||||
id "org.jsonschema2pojo" version "1.2.1"
|
||||
}
|
||||
@@ -13,53 +12,24 @@ java {
|
||||
}
|
||||
|
||||
airbyteJavaConnector {
|
||||
cdkVersionRequired = '0.16.6'
|
||||
features = ['db-sources']
|
||||
cdkVersionRequired = '0.19.0'
|
||||
features = ['db-sources', 'datastore-postgres']
|
||||
useLocalCdk = false
|
||||
}
|
||||
|
||||
|
||||
application {
|
||||
mainClass = 'io.airbyte.integrations.source.postgres.PostgresSource'
|
||||
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
|
||||
}
|
||||
|
||||
// Add a configuration for our migrations tasks defined below to encapsulate their dependencies
|
||||
configurations {
|
||||
migrations.extendsFrom implementation
|
||||
}
|
||||
|
||||
configurations.all {
|
||||
resolutionStrategy {
|
||||
force 'org.jooq:jooq:3.13.4'
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
testImplementation libs.jooq
|
||||
testImplementation libs.hikaricp
|
||||
implementation 'commons-codec:commons-codec:1.16.0'
|
||||
implementation 'io.debezium:debezium-embedded:2.4.0.Final'
|
||||
implementation 'io.debezium:debezium-connector-postgres:2.4.0.Final'
|
||||
|
||||
migrations libs.testcontainers.postgresql
|
||||
migrations sourceSets.main.output
|
||||
|
||||
// Lombok
|
||||
implementation libs.lombok
|
||||
annotationProcessor libs.lombok
|
||||
|
||||
implementation 'org.apache.commons:commons-lang3:3.11'
|
||||
implementation libs.postgresql
|
||||
implementation libs.bundles.datadog
|
||||
|
||||
implementation libs.debezium.api
|
||||
implementation libs.debezium.embedded
|
||||
implementation libs.debezium.postgres
|
||||
testFixturesApi 'org.testcontainers:postgresql:1.19.0'
|
||||
|
||||
testImplementation 'org.hamcrest:hamcrest-all:1.3'
|
||||
testFixturesImplementation libs.testcontainers.jdbc
|
||||
testFixturesImplementation libs.testcontainers.postgresql
|
||||
testImplementation libs.testcontainers.jdbc
|
||||
testImplementation libs.testcontainers.postgresql
|
||||
testImplementation libs.junit.jupiter.system.stubs
|
||||
}
|
||||
|
||||
jsonSchema2Pojo {
|
||||
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
|
||||
dockerImageTag: 3.3.7
|
||||
dockerImageTag: 3.3.8
|
||||
dockerRepository: airbyte/source-postgres
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
|
||||
githubIssueLabel: source-postgres
|
||||
|
||||
@@ -60,7 +60,6 @@ import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
|
||||
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto;
|
||||
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
|
||||
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
|
||||
import io.airbyte.cdk.integrations.util.HostPortResolver;
|
||||
import io.airbyte.commons.exceptions.ConfigErrorException;
|
||||
import io.airbyte.commons.functional.CheckedConsumer;
|
||||
import io.airbyte.commons.functional.CheckedFunction;
|
||||
@@ -98,6 +97,8 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
|
||||
import io.airbyte.protocol.models.v0.ConnectorSpecification;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
@@ -174,7 +175,7 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
|
||||
// https://github.com/airbytehq/airbyte/issues/24796
|
||||
additionalParameters.add("prepareThreshold=0");
|
||||
|
||||
final String encodedDatabaseName = HostPortResolver.encodeValue(config.get(JdbcUtils.DATABASE_KEY).asText());
|
||||
final String encodedDatabaseName = URLEncoder.encode(config.get(JdbcUtils.DATABASE_KEY).asText(), StandardCharsets.UTF_8);
|
||||
|
||||
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?",
|
||||
config.get(JdbcUtils.HOST_KEY).asText(),
|
||||
|
||||
@@ -29,9 +29,7 @@ public class FillPostgresTestDbScriptTest extends AbstractSourceFillDbWithTestDa
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown(final TestDestinationEnv testEnv) {
|
||||
dslContext.close();
|
||||
}
|
||||
protected void tearDown(final TestDestinationEnv testEnv) {}
|
||||
|
||||
@Override
|
||||
protected String getImageName() {
|
||||
|
||||
@@ -14,7 +14,6 @@ import io.airbyte.cdk.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.cdk.integrations.base.Source;
|
||||
import io.airbyte.cdk.integrations.base.adaptive.AdaptiveSourceRunner;
|
||||
import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer;
|
||||
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
|
||||
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
|
||||
import io.airbyte.commons.features.EnvVariableFeatureFlags;
|
||||
import io.airbyte.commons.features.FeatureFlagsWrapper;
|
||||
@@ -123,8 +122,8 @@ public class CloudDeploymentPostgresSourceTest {
|
||||
final String sslMode,
|
||||
final boolean innerAddress) {
|
||||
final var containerAddress = innerAddress
|
||||
? SshHelpers.getInnerContainerAddress(db.getContainer())
|
||||
: SshHelpers.getOuterContainerAddress(db.getContainer());
|
||||
? SshBastionContainer.getInnerContainerAddress(db.getContainer())
|
||||
: SshBastionContainer.getOuterContainerAddress(db.getContainer());
|
||||
return db.configBuilder()
|
||||
.with(JdbcUtils.HOST_KEY, Objects.requireNonNull(containerAddress.left))
|
||||
.with(JdbcUtils.PORT_KEY, containerAddress.right)
|
||||
|
||||
@@ -246,20 +246,19 @@ class PostgresSourceTest {
|
||||
return null;
|
||||
});
|
||||
final JsonNode config = getConfig();
|
||||
try (final DSLContext dslContext = getDslContextWithSpecifiedUser(config, "test_user_3", "132")) {
|
||||
final Database database = new Database(dslContext);
|
||||
database.query(ctx -> {
|
||||
ctx.fetch("CREATE TABLE id_and_name_3(id INTEGER, name VARCHAR(200));");
|
||||
ctx.fetch("CREATE VIEW id_and_name_3_view(id, name) as\n"
|
||||
+ "SELECT id_and_name_3.id,\n"
|
||||
+ " id_and_name_3.name\n"
|
||||
+ "FROM id_and_name_3;\n"
|
||||
+ "ALTER TABLE id_and_name_3_view\n"
|
||||
+ " owner TO test_user_3");
|
||||
ctx.fetch("INSERT INTO id_and_name_3 (id, name) VALUES (1,'Zed'), (2, 'Jack'), (3, 'Antuan');");
|
||||
return null;
|
||||
});
|
||||
}
|
||||
final DSLContext dslContext = getDslContextWithSpecifiedUser(config, "test_user_3", "132");
|
||||
final Database database = new Database(dslContext);
|
||||
database.query(ctx -> {
|
||||
ctx.fetch("CREATE TABLE id_and_name_3(id INTEGER, name VARCHAR(200));");
|
||||
ctx.fetch("CREATE VIEW id_and_name_3_view(id, name) as\n"
|
||||
+ "SELECT id_and_name_3.id,\n"
|
||||
+ " id_and_name_3.name\n"
|
||||
+ "FROM id_and_name_3;\n"
|
||||
+ "ALTER TABLE id_and_name_3_view\n"
|
||||
+ " owner TO test_user_3");
|
||||
ctx.fetch("INSERT INTO id_and_name_3 (id, name) VALUES (1,'Zed'), (2, 'Jack'), (3, 'Antuan');");
|
||||
return null;
|
||||
});
|
||||
final JsonNode anotherUserConfig = getConfig("test_user_3", "132");
|
||||
final Set<AirbyteMessage> actualMessages =
|
||||
MoreIterators.toSet(source().read(anotherUserConfig, CONFIGURED_CATALOG, null));
|
||||
@@ -309,13 +308,12 @@ class PostgresSourceTest {
|
||||
});
|
||||
final var config = getConfig();
|
||||
|
||||
try (final DSLContext dslContext = getDslContextWithSpecifiedUser(config, "test_user_4", "132")) {
|
||||
final Database database = new Database(dslContext);
|
||||
database.query(ctx -> {
|
||||
ctx.fetch("CREATE TABLE id_and_name_3(id INTEGER, name VARCHAR(200));");
|
||||
return null;
|
||||
});
|
||||
}
|
||||
final DSLContext dslContext = getDslContextWithSpecifiedUser(config, "test_user_4", "132");
|
||||
final Database database = new Database(dslContext);
|
||||
database.query(ctx -> {
|
||||
ctx.fetch("CREATE TABLE id_and_name_3(id INTEGER, name VARCHAR(200));");
|
||||
return null;
|
||||
});
|
||||
AirbyteCatalog actual = source().discover(getConfig("test_user_4", "132"));
|
||||
Set<String> tableNames = actual.getStreams().stream().map(stream -> stream.getName()).collect(Collectors.toSet());
|
||||
assertEquals(Sets.newHashSet("id_and_name", "id_and_name_7", "id_and_name_3"), tableNames);
|
||||
|
||||
@@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
|
||||
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
|
||||
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
|
||||
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
|
||||
|
||||
Reference in New Issue
Block a user