JDBC Sources: Switch integration tests to use system stubs (#20026)
This commit is contained in:
@@ -31,6 +31,8 @@ dependencies {
|
||||
testImplementation libs.postgresql
|
||||
testImplementation libs.connectors.testcontainers.postgresql
|
||||
|
||||
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
|
||||
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
integrationTestJavaImplementation libs.connectors.testcontainers.postgresql
|
||||
|
||||
|
||||
@@ -34,18 +34,26 @@ import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.utility.MountableFile;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
/**
|
||||
* Runs the acceptance tests in the source-jdbc test module. We want this module to run these tests
|
||||
* itself as a sanity check. The trade off here is that this class is duplicated from the one used
|
||||
* in source-postgres.
|
||||
*/
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
class DefaultJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
private static PostgreSQLContainer<?> PSQL_DB;
|
||||
|
||||
private JsonNode config;
|
||||
@@ -55,7 +63,6 @@ class DefaultJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
static void init() {
|
||||
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
|
||||
PSQL_DB.start();
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s BIT(3) NOT NULL);";
|
||||
INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(B'101');";
|
||||
}
|
||||
@@ -72,6 +79,8 @@ class DefaultJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
.put(JdbcUtils.PASSWORD_KEY, PSQL_DB.getPassword())
|
||||
.build());
|
||||
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
|
||||
final String initScriptName = "init_" + dbName.concat(".sql");
|
||||
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");
|
||||
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB);
|
||||
|
||||
@@ -1252,17 +1252,4 @@ public abstract class JdbcSourceAcceptanceTest {
|
||||
}
|
||||
}
|
||||
|
||||
public static void setEnv(final String key, final String value) {
|
||||
try {
|
||||
final Map<String, String> env = System.getenv();
|
||||
final Class<?> cl = env.getClass();
|
||||
final java.lang.reflect.Field field = cl.getDeclaredField("m");
|
||||
field.setAccessible(true);
|
||||
final Map<String, String> writableEnv = (Map<String, String>) field.get(env);
|
||||
writableEnv.put(key, value);
|
||||
} catch (final Exception e) {
|
||||
throw new IllegalStateException("Failed to set environment variable", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ dependencies {
|
||||
|
||||
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
|
||||
testImplementation project(':airbyte-test-utils')
|
||||
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
|
||||
|
||||
testImplementation libs.connectors.testcontainers.mysql
|
||||
|
||||
|
||||
@@ -49,11 +49,19 @@ import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
class MySqlStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
protected static final String TEST_USER = "test";
|
||||
protected static final String TEST_PASSWORD = "test";
|
||||
protected static MySQLContainer<?> container;
|
||||
@@ -71,13 +79,13 @@ class MySqlStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTes
|
||||
.withEnv("MYSQL_ROOT_HOST", "%")
|
||||
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD);
|
||||
container.start();
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", container.getPassword());
|
||||
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
config = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put(JdbcUtils.HOST_KEY, container.getHost())
|
||||
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
|
||||
|
||||
@@ -26,6 +26,7 @@ dependencies {
|
||||
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
|
||||
testImplementation 'org.apache.commons:commons-lang3:3.11'
|
||||
testImplementation 'org.hamcrest:hamcrest-all:1.3'
|
||||
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
|
||||
testImplementation libs.connectors.testcontainers.mysql
|
||||
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
|
||||
|
||||
import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
|
||||
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
|
||||
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
@@ -39,10 +38,18 @@ import java.util.stream.Collectors;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
private static final String STREAM_NAME = "id_and_name";
|
||||
private static final String STREAM_NAME2 = "starships";
|
||||
private MySQLContainer<?> container;
|
||||
@@ -105,7 +112,7 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
.put("method", "CDC")
|
||||
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)
|
||||
.build());
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
config = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put(JdbcUtils.HOST_KEY, container.getHost())
|
||||
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
|
||||
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
|
||||
|
||||
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -29,10 +28,18 @@ import io.airbyte.protocol.models.SyncMode;
|
||||
import java.util.HashMap;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
private static final String STREAM_NAME = "id_and_name";
|
||||
private static final String STREAM_NAME2 = "public.starships";
|
||||
|
||||
@@ -46,7 +53,7 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put("method", "STANDARD")
|
||||
.build());
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
config = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put(JdbcUtils.HOST_KEY, container.getHost())
|
||||
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
|
||||
|
||||
@@ -6,7 +6,6 @@ package io.airbyte.integrations.source.mysql;
|
||||
|
||||
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
|
||||
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
|
||||
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
|
||||
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
|
||||
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
|
||||
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
|
||||
@@ -46,10 +45,18 @@ import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
public class CdcMysqlSourceTest extends CdcSourceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
private static final String DB_NAME = MODELS_SCHEMA;
|
||||
private MySQLContainer<?> container;
|
||||
private Database database;
|
||||
@@ -58,7 +65,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws SQLException {
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
init();
|
||||
revokeAllPermissions();
|
||||
grantCorrectPermissions();
|
||||
|
||||
@@ -46,10 +46,18 @@ import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
|
||||
protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password";
|
||||
protected static final String TEST_USER = "test";
|
||||
@@ -61,7 +69,6 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@BeforeAll
|
||||
static void init() throws Exception {
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
container = new MySQLContainer<>("mysql:8.0")
|
||||
.withUsername(TEST_USER)
|
||||
.withPassword(TEST_PASSWORD.call())
|
||||
@@ -74,6 +81,7 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
config = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put(JdbcUtils.HOST_KEY, container.getHost())
|
||||
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
|
||||
|
||||
@@ -30,6 +30,7 @@ dependencies {
|
||||
testImplementation project(':airbyte-test-utils')
|
||||
testImplementation libs.connectors.testcontainers.jdbc
|
||||
testImplementation libs.connectors.testcontainers.postgresql
|
||||
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
|
||||
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
performanceTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
|
||||
@@ -7,7 +7,6 @@ package io.airbyte.integrations.source.postgres;
|
||||
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
|
||||
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;
|
||||
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
|
||||
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
@@ -61,12 +60,20 @@ import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
import org.testcontainers.utility.MountableFile;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
abstract class CdcPostgresSourceTest extends CdcSourceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
protected static final String SLOT_NAME_BASE = "debezium_slot";
|
||||
protected static final String PUBLICATION = "publication";
|
||||
protected static final int INITIAL_WAITING_SECONDS = 5;
|
||||
@@ -94,7 +101,7 @@ abstract class CdcPostgresSourceTest extends CdcSourceTest {
|
||||
.withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), "/etc/postgresql/postgresql.conf")
|
||||
.withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf");
|
||||
container.start();
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
source = new PostgresSource();
|
||||
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
|
||||
|
||||
|
||||
@@ -45,11 +45,19 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.utility.MountableFile;
|
||||
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
|
||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
|
||||
|
||||
@ExtendWith(SystemStubsExtension.class)
|
||||
class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@SystemStub
|
||||
private EnvironmentVariables environmentVariables;
|
||||
|
||||
private static final String DATABASE = "new_db";
|
||||
protected static final String USERNAME_WITHOUT_PERMISSION = "new_user";
|
||||
protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password";
|
||||
@@ -62,12 +70,12 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
static void init() {
|
||||
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
|
||||
PSQL_DB.start();
|
||||
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
|
||||
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
|
||||
COLUMN_CLAUSE_WITH_PK =
|
||||
"id INTEGER, name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL, wakeup_at TIMETZ NOT NULL, last_visited_at TIMESTAMPTZ NOT NULL, last_comment_at TIMESTAMP NOT NULL";
|
||||
|
||||
Reference in New Issue
Block a user