1
0
mirror of synced 2026-01-07 09:05:45 -05:00

JDBC Sources Integration Test migration follow-up: Switch additional tests to use system stubs (#20040)

* Migrating remaining tests

* More tests

* set env vars before querying them; use stream state everywhere?

* fix strirct encrypt expected spec

* runRead after workerConfigs exist

* format

Co-authored-by: Edward Gao <edward.gao@airbyte.io>
This commit is contained in:
Akash Kulkarni
2022-12-05 11:06:13 -08:00
committed by GitHub
parent 4f9b66071e
commit 3b0512fe87
9 changed files with 442 additions and 23 deletions

View File

@@ -9,6 +9,7 @@ import static io.airbyte.db.PostgresUtils.getCertificate;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.PostgresUtils;
@@ -29,11 +30,19 @@ import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends SourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
@@ -45,6 +54,8 @@ public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>(DockerImageName.parse("postgres:bullseye")
.asCompatibleSubstituteFor("postgres"));
container.start();

View File

@@ -6,6 +6,7 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
@@ -28,11 +29,19 @@ import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import java.util.List;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final Network network = Network.newNetwork();
@@ -74,6 +83,7 @@ public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcce
// requiring data to already be in place.
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
startTestContainers();
config = bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db, List.of("public")));
populateDatabaseTestData();

View File

@@ -42,7 +42,7 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
}
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
protected void postSetup() throws Exception {
final Database database = setupDatabase();
initTests();
for (final TestDataHolder test : testDataHolders) {

View File

@@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
@@ -35,10 +36,18 @@ import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
@@ -53,6 +62,8 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
String username = container.getUsername();

View File

@@ -9,6 +9,7 @@ import static io.airbyte.db.PostgresUtils.getCertificate;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
@@ -30,18 +31,23 @@ import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
/**
* This class is copied from source-postgres-strict-encrypt. The original file can be deleted
* completely once the migration of multi-variant connector is done.
*/
@SetEnvironmentVariable(key = "DEPLOYMENT_MODE",
value = "CLOUD")
@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest {
@SystemStub
private EnvironmentVariables environmentVariables;
private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
@@ -53,6 +59,8 @@ public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceT
@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
environmentVariables.set("DEPLOYMENT_MODE", "CLOUD");
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new PostgreSQLContainer<>(DockerImageName.parse("postgres:bullseye")
.asCompatibleSubstituteFor("postgres"));
container.start();
@@ -109,7 +117,7 @@ public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceT
@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class));
return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_strict_encrypt_spec.json"), ConnectorSpecification.class));
}
@Override

View File

@@ -0,0 +1,377 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/postgres",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgres Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"properties": {
"host": {
"title": "Host",
"description": "Hostname of the database.",
"type": "string",
"order": 0
},
"port": {
"title": "Port",
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 5432,
"examples": ["5432"],
"order": 1
},
"database": {
"title": "Database Name",
"description": "Name of the database.",
"type": "string",
"order": 2
},
"schemas": {
"title": "Schemas",
"description": "The list of schemas (case sensitive) to sync from. Defaults to public.",
"type": "array",
"items": {
"type": "string"
},
"minItems": 0,
"uniqueItems": true,
"default": ["public"],
"order": 3
},
"username": {
"title": "Username",
"description": "Username to access the database.",
"type": "string",
"order": 4
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (Eg. key1=value1&key2=value2&key3=value3). For more information read about <a href=\"https://jdbc.postgresql.org/documentation/head/connect.html\">JDBC URL parameters</a>.",
"title": "JDBC URL Parameters (Advanced)",
"type": "string",
"order": 6
},
"ssl_mode": {
"title": "SSL Modes",
"description": "SSL connection modes. \n <ul><li><b>disable</b> - Disables encryption of communication between Airbyte and source database</li>\n <li><b>allow</b> - Enables encryption only when required by the source database</li>\n <li><b>prefer</b> - allows unencrypted connection only if the source database does not support encryption</li>\n <li><b>require</b> - Always require encryption. If the source database server does not support encryption, connection will fail</li>\n <li><b>verify-ca</b> - Always require encryption and verifies that the source database server has a valid SSL certificate</li>\n <li><b>verify-full</b> - This is the most secure mode. Always require encryption and verifies the identity of the source database server</li></ul>\n Read more <a href=\"https://jdbc.postgresql.org/documentation/head/ssl-client.html\"> in the docs</a>.",
"type": "object",
"order": 7,
"oneOf": [
{
"title": "allow",
"additionalProperties": true,
"description": "Allow SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "allow",
"order": 0
}
}
},
{
"title": "prefer",
"additionalProperties": true,
"description": "Prefer SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "prefer",
"order": 0
}
}
},
{
"title": "require",
"additionalProperties": true,
"description": "Require SSL mode.",
"required": ["mode"],
"properties": {
"mode": {
"type": "string",
"const": "require",
"order": 0
}
}
},
{
"title": "verify-ca",
"additionalProperties": true,
"description": "Verify-ca SSL mode.",
"required": ["mode", "ca_certificate"],
"properties": {
"mode": {
"type": "string",
"const": "verify-ca",
"order": 0
},
"ca_certificate": {
"type": "string",
"title": "CA certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true,
"order": 1
},
"client_certificate": {
"type": "string",
"title": "Client Certificate",
"description": "Client certificate",
"airbyte_secret": true,
"multiline": true,
"order": 2
},
"client_key": {
"type": "string",
"title": "Client Key",
"description": "Client key",
"airbyte_secret": true,
"multiline": true,
"order": 3
},
"client_key_password": {
"type": "string",
"title": "Client key password",
"description": "Password for keystorage. If you do not add it - the password will be generated automatically.",
"airbyte_secret": true,
"order": 4
}
}
},
{
"title": "verify-full",
"additionalProperties": true,
"description": "Verify-full SSL mode.",
"required": ["mode", "ca_certificate"],
"properties": {
"mode": {
"type": "string",
"const": "verify-full",
"order": 0
},
"ca_certificate": {
"type": "string",
"title": "CA Certificate",
"description": "CA certificate",
"airbyte_secret": true,
"multiline": true,
"order": 1
},
"client_certificate": {
"type": "string",
"title": "Client Certificate",
"description": "Client certificate",
"airbyte_secret": true,
"multiline": true,
"order": 2
},
"client_key": {
"type": "string",
"title": "Client Key",
"description": "Client key",
"airbyte_secret": true,
"multiline": true,
"order": 3
},
"client_key_password": {
"type": "string",
"title": "Client key password",
"description": "Password for keystorage. If you do not add it - the password will be generated automatically.",
"airbyte_secret": true,
"order": 4
}
}
}
]
},
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "Replication method for extracting data from the database.",
"order": 8,
"oneOf": [
{
"title": "Standard",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["method"],
"properties": {
"method": {
"type": "string",
"const": "Standard",
"order": 0
}
}
},
{
"title": "Logical Replication (CDC)",
"description": "Logical replication uses the Postgres write-ahead log (WAL) to detect inserts, updates, and deletes. This needs to be configured on the source database itself. Only available on Postgres 10 and above. Read the <a href=\"https://docs.airbyte.com/integrations/sources/postgres\">docs</a>.",
"required": ["method", "replication_slot", "publication"],
"properties": {
"method": {
"type": "string",
"const": "CDC",
"order": 1
},
"plugin": {
"type": "string",
"title": "Plugin",
"description": "A logical decoding plugin installed on the PostgreSQL server. The `pgoutput` plugin is used by default. If the replication table contains a lot of big jsonb values it is recommended to use `wal2json` plugin. Read more about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-2-select-a-replication-plugin\">selecting replication plugins</a>.",
"enum": ["pgoutput", "wal2json"],
"const": "pgoutput",
"order": 2
},
"replication_slot": {
"type": "string",
"title": "Replication Slot",
"description": "A plugin logical replication slot. Read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-3-create-replication-slot\">replication slots</a>.",
"order": 3
},
"publication": {
"type": "string",
"title": "Publication",
"description": "A Postgres publication used for consuming changes. Read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-4-create-publications-and-replication-identities-for-tables\">publications and replication identities</a>.",
"order": 4
},
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/postgres#step-5-optional-set-up-initial-waiting-time\">initial waiting time</a>.",
"default": 300,
"order": 5,
"min": 120,
"max": 1200
}
}
}
]
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
"type": "string",
"const": "NO_TUNNEL",
"order": 0
}
}
},
{
"title": "SSH Key Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"ssh_key"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and ssh key",
"type": "string",
"const": "SSH_KEY_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host.",
"type": "string",
"order": 3
},
"ssh_key": {
"title": "SSH Private Key",
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
"type": "string",
"airbyte_secret": true,
"multiline": true,
"order": 4
}
}
},
{
"title": "Password Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"tunnel_user_password"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and password authentication",
"type": "string",
"const": "SSH_PASSWORD_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host",
"type": "string",
"order": 3
},
"tunnel_user_password": {
"title": "Password",
"description": "OS-level password for logging into the jump server host",
"type": "string",
"airbyte_secret": true,
"order": 4
}
}
}
]
}
}
},
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": []
}