get source-mssql CI to green (#35721)
TheAbstractMssqlSourceDatatypeTest was wrong for reals. CdcMssqlSourceTest and CdcMssqlSslSourceTest no longer use exclusive containers. tests that change the SQLServer agent state are now in their own test class (which needs to use an exclusive container) Besides that, the core of the problem can be grouped in 2 issues: a) some tests are failing to enable CDC for tables. This is due to a timing issue. We moved that logic into its own function that will try for a total of 240seconds before giving up. b) some tests are failing when trying to read the minLsn. There is a 1sec wait implemented in the production code. Instead we introduce a busy loop that will wait for a total of 240seconds for records to appear in the CDC table before giving up. That function is implemented in test code. Unfortunately, for both cases, we sometimes needed to wait while in the middle of a function implemented in the CDK. We introduced a few hooks in the parent PR that are implemented in this PR for the source-mssql tests, and use the functions described above
This commit is contained in:
@@ -3,7 +3,7 @@ plugins {
|
||||
}
|
||||
|
||||
airbyteJavaConnector {
|
||||
cdkVersionRequired = '0.23.14'
|
||||
cdkVersionRequired = '0.23.15'
|
||||
features = ['db-sources']
|
||||
useLocalCdk = false
|
||||
}
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
testExecutionConcurrency=-1
|
||||
testExecutionConcurrency=-1
|
||||
JunitMethodExecutionTimeout=5 m
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
|
||||
dockerImageTag: 3.7.5
|
||||
dockerImageTag: 3.7.6
|
||||
dockerRepository: airbyte/source-mssql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
|
||||
githubIssueLabel: source-mssql
|
||||
|
||||
@@ -100,6 +100,7 @@ public class MssqlCdcHelper {
|
||||
final String sslMethod = sslConfig.get("ssl_method").asText();
|
||||
if ("unencrypted".equals(sslMethod)) {
|
||||
props.setProperty("database.encrypt", "false");
|
||||
props.setProperty("driver.trustServerCertificate", "true");
|
||||
} else if ("encrypted_trust_server_certificate".equals(sslMethod)) {
|
||||
props.setProperty("driver.encrypt", "true");
|
||||
props.setProperty("driver.trustServerCertificate", "true");
|
||||
@@ -118,6 +119,8 @@ public class MssqlCdcHelper {
|
||||
props.setProperty("database.hostNameInCertificate", dbConfig.get("hostNameInCertificate").asText());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
props.setProperty("driver.trustServerCertificate", "true");
|
||||
}
|
||||
|
||||
return props;
|
||||
|
||||
@@ -46,7 +46,7 @@ public class MssqlCdcStateHandler implements CdcStateHandler {
|
||||
|
||||
final JsonNode asJson = Jsons.jsonNode(state);
|
||||
|
||||
LOGGER.info("debezium state: {}", asJson);
|
||||
LOGGER.info("debezium state offset: {}", Jsons.jsonNode(offset));
|
||||
|
||||
final CdcState cdcState = new CdcState().withState(asJson);
|
||||
stateManager.getCdcStateManager().setCdcState(cdcState);
|
||||
|
||||
@@ -15,7 +15,6 @@ import io.airbyte.commons.json.Jsons;
|
||||
import io.debezium.connector.sqlserver.Lsn;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -26,8 +25,6 @@ public class MssqlCdcTargetPosition implements CdcTargetPosition<Lsn> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class);
|
||||
|
||||
public static final Duration MAX_LSN_QUERY_DELAY = Duration.ZERO;
|
||||
public static final Duration MAX_LSN_QUERY_DELAY_TEST = Duration.ofSeconds(1);
|
||||
public final Lsn targetLsn;
|
||||
|
||||
public MssqlCdcTargetPosition(final Lsn targetLsn) {
|
||||
@@ -87,27 +84,24 @@ public class MssqlCdcTargetPosition implements CdcTargetPosition<Lsn> {
|
||||
// a chance to catch up. This is important in tests, where reads might occur in quick succession
|
||||
// which might leave the CT tables (which Debezium consumes) in a stale state.
|
||||
final JsonNode sourceConfig = database.getSourceConfig();
|
||||
final Duration delay = (sourceConfig != null && sourceConfig.has("is_test") && sourceConfig.get("is_test").asBoolean())
|
||||
? MAX_LSN_QUERY_DELAY_TEST
|
||||
: MAX_LSN_QUERY_DELAY;
|
||||
final String maxLsnQuery = """
|
||||
USE [%s];
|
||||
WAITFOR DELAY '%02d:%02d:%02d';
|
||||
SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;
|
||||
""".formatted(dbName, delay.toHours(), delay.toMinutesPart(), delay.toSecondsPart());
|
||||
""".formatted(dbName);
|
||||
// Query the high-water mark.
|
||||
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
|
||||
connection -> connection.createStatement().executeQuery(maxLsnQuery),
|
||||
JdbcUtils.getDefaultSourceOperations()::rowToJson);
|
||||
Preconditions.checkState(jsonNodes.size() == 1);
|
||||
|
||||
final Lsn maxLsn;
|
||||
if (jsonNodes.get(0).get("max_lsn") != null) {
|
||||
final Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
|
||||
LOGGER.info("identified target lsn: " + maxLsn);
|
||||
return new MssqlCdcTargetPosition(maxLsn);
|
||||
maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
|
||||
} else {
|
||||
throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " +
|
||||
"Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service)");
|
||||
maxLsn = Lsn.NULL;
|
||||
}
|
||||
LOGGER.info("identified target lsn: " + maxLsn);
|
||||
return new MssqlCdcTargetPosition(maxLsn);
|
||||
} catch (final SQLException | IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -224,6 +224,8 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
|
||||
if (mssqlConfig.has("ssl_method")) {
|
||||
readSsl(mssqlConfig, additionalParameters);
|
||||
} else {
|
||||
additionalParameters.add("trustServerCertificate=true");
|
||||
}
|
||||
|
||||
if (!additionalParameters.isEmpty()) {
|
||||
|
||||
@@ -125,7 +125,7 @@ public class MssqlDebeziumStateUtil implements DebeziumStateUtil {
|
||||
assert Objects.nonNull(schemaHistory.schema());
|
||||
|
||||
final JsonNode asJson = serialize(offset, schemaHistory);
|
||||
LOGGER.info("Initial Debezium state constructed: {}", asJson);
|
||||
LOGGER.info("Initial Debezium state constructed. offset={}", Jsons.jsonNode(offset));
|
||||
|
||||
if (asJson.get(MssqlCdcStateConstants.MSSQL_DB_HISTORY).asText().isBlank()) {
|
||||
throw new RuntimeException("Schema history snapshot returned empty history.");
|
||||
|
||||
@@ -92,7 +92,7 @@ public class MssqlInitialSyncStateIterator extends AbstractIterator<AirbyteMessa
|
||||
} else if (!hasEmittedFinalState) {
|
||||
hasEmittedFinalState = true;
|
||||
final AirbyteStateMessage finalStateMessage = stateManager.createFinalStateMessage(pair, streamStateForIncrementalRun);
|
||||
LOGGER.info("Finished initial sync of stream {}, Emitting final state, state is {}", pair, finalStateMessage);
|
||||
LOGGER.info("Finished initial sync of stream {}, Emitting final state.", pair);
|
||||
return new AirbyteMessage()
|
||||
.withType(Type.STATE)
|
||||
.withState(finalStateMessage);
|
||||
|
||||
@@ -123,14 +123,13 @@ public abstract class AbstractMssqlSourceDatatypeTest extends AbstractSourceData
|
||||
.createTablePatternSql(CREATE_TABLE_SQL)
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.sourceType("real")
|
||||
.airbyteType(JsonSchemaType.NUMBER)
|
||||
.addInsertValues("'123'", "'1234567890.1234567'", "null")
|
||||
.addExpectedValues("123.0", "1.23456794E9", null)
|
||||
.createTablePatternSql(CREATE_TABLE_SQL)
|
||||
.build());
|
||||
addDataTypeTestData(TestDataHolder.builder()
|
||||
.sourceType("real")
|
||||
.airbyteType(JsonSchemaType.NUMBER)
|
||||
.addInsertValues("'123'", "'1234567890.1234567'", "null")
|
||||
.addExpectedValues("123.0", "1.234568E9", null)
|
||||
.createTablePatternSql(CREATE_TABLE_SQL)
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
|
||||
@@ -32,9 +32,13 @@ import java.io.UncheckedIOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
static private final Logger LOGGER = LoggerFactory.getLogger(AbstractSshMssqlSourceAcceptanceTest.class);
|
||||
|
||||
private static final String STREAM_NAME = "dbo.id_and_name";
|
||||
private static final String STREAM_NAME2 = "dbo.starships";
|
||||
|
||||
@@ -69,7 +73,6 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
|
||||
JdbcUtils.PORT_LIST_KEY,
|
||||
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
|
||||
.query(ctx -> {
|
||||
ctx.fetch("ALTER DATABASE %s SET AUTO_CLOSE OFF WITH NO_WAIT;", testdb.getDatabaseName());
|
||||
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), born DATETIMEOFFSET(7));");
|
||||
ctx.fetch("INSERT INTO id_and_name (id, name, born) VALUES " +
|
||||
"(1, 'picard', '2124-03-04T01:01:01Z'), " +
|
||||
@@ -88,14 +91,16 @@ public abstract class AbstractSshMssqlSourceAcceptanceTest extends SourceAccepta
|
||||
String.format(DatabaseDriver.MSSQLSERVER.getUrlFormatString(),
|
||||
config.get(JdbcUtils.HOST_KEY).asText(),
|
||||
config.get(JdbcUtils.PORT_KEY).asInt(),
|
||||
config.get(JdbcUtils.DATABASE_KEY).asText()),
|
||||
config.get(JdbcUtils.DATABASE_KEY).asText()) + ";encrypt=false;trustServerCertificate=true",
|
||||
SQLDialect.DEFAULT));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
|
||||
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2017, ContainerModifier.NETWORK);
|
||||
LOGGER.info("starting bastion");
|
||||
bastion.initAndStartBastion(testdb.getContainer().getNetwork());
|
||||
LOGGER.info("bastion started");
|
||||
populateDatabaseTestData();
|
||||
}
|
||||
|
||||
|
||||
@@ -99,12 +99,6 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
@Override
|
||||
protected void setupEnvironment(final TestDestinationEnv environment) {
|
||||
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT);
|
||||
final var enableCdcSqlFmt = """
|
||||
EXEC sys.sp_cdc_enable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'%s',
|
||||
\t@role_name = N'%s',
|
||||
\t@supports_net_changes = 0""";
|
||||
testdb
|
||||
.withWaitUntilAgentRunning()
|
||||
.withCdc()
|
||||
@@ -115,17 +109,16 @@ public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
.with("INSERT INTO %s.%s (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", SCHEMA_NAME, STREAM_NAME)
|
||||
.with("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');", SCHEMA_NAME, STREAM_NAME2)
|
||||
// enable cdc on tables for designated role
|
||||
.with(enableCdcSqlFmt, SCHEMA_NAME, STREAM_NAME, CDC_ROLE_NAME)
|
||||
.with(enableCdcSqlFmt, SCHEMA_NAME, STREAM_NAME2, CDC_ROLE_NAME)
|
||||
.withShortenedCapturePollingInterval()
|
||||
.withWaitUntilMaxLsnAvailable()
|
||||
.withCdcForTable(SCHEMA_NAME, STREAM_NAME, CDC_ROLE_NAME)
|
||||
.withCdcForTable(SCHEMA_NAME, STREAM_NAME2, CDC_ROLE_NAME)
|
||||
// revoke user permissions
|
||||
.with("REVOKE ALL FROM %s CASCADE;", testdb.getUserName())
|
||||
.with("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO %s;\"", testdb.getUserName())
|
||||
// grant user permissions
|
||||
.with("EXEC sp_addrolemember N'%s', N'%s';", "db_datareader", testdb.getUserName())
|
||||
.with("GRANT SELECT ON SCHEMA :: [cdc] TO %s", testdb.getUserName())
|
||||
.with("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, testdb.getUserName());
|
||||
.with("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, testdb.getUserName())
|
||||
.withWaitUntilMaxLsnAvailable();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -6,7 +6,6 @@ package io.airbyte.integrations.source.mssql;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import io.airbyte.cdk.db.Database;
|
||||
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
|
||||
|
||||
@@ -27,46 +26,18 @@ public class CdcMssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest
|
||||
return testdb.getDatabase();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
|
||||
super.setupEnvironment(environment);
|
||||
enableCdcOnAllTables();
|
||||
protected void createTables() throws Exception {
|
||||
super.createTables();
|
||||
for (var test : testDataHolders) {
|
||||
testdb.withCdcForTable(test.getNameSpace(), test.getNameWithTestPrefix(), null);
|
||||
}
|
||||
}
|
||||
|
||||
private void enableCdcOnAllTables() {
|
||||
testdb.with("""
|
||||
DECLARE @TableName VARCHAR(100)
|
||||
DECLARE @TableSchema VARCHAR(100)
|
||||
DECLARE CDC_Cursor CURSOR FOR
|
||||
SELECT * FROM (
|
||||
SELECT Name,SCHEMA_NAME(schema_id) AS TableSchema
|
||||
FROM sys.objects
|
||||
WHERE type = 'u'
|
||||
AND is_ms_shipped <> 1
|
||||
) CDC
|
||||
OPEN CDC_Cursor
|
||||
FETCH NEXT FROM CDC_Cursor INTO @TableName,@TableSchema
|
||||
WHILE @@FETCH_STATUS = 0
|
||||
BEGIN
|
||||
DECLARE @SQL NVARCHAR(1000)
|
||||
DECLARE @CDC_Status TINYINT
|
||||
SET @CDC_Status=(SELECT COUNT(*)
|
||||
FROM cdc.change_tables
|
||||
WHERE Source_object_id = OBJECT_ID(@TableSchema+'.'+@TableName))
|
||||
--IF CDC is not enabled on Table, Enable CDC
|
||||
IF @CDC_Status <> 1
|
||||
BEGIN
|
||||
SET @SQL='EXEC sys.sp_cdc_enable_table
|
||||
@source_schema = '''+@TableSchema+''',
|
||||
@source_name = ''' + @TableName
|
||||
+ ''',
|
||||
@role_name = null;'
|
||||
EXEC sp_executesql @SQL
|
||||
END
|
||||
FETCH NEXT FROM CDC_Cursor INTO @TableName,@TableSchema
|
||||
END
|
||||
CLOSE CDC_Cursor
|
||||
DEALLOCATE CDC_Cursor""");
|
||||
protected void populateTables() throws Exception {
|
||||
super.populateTables();
|
||||
for (var test : testDataHolders) {
|
||||
testdb.waitForCdcRecords(test.getNameSpace(), test.getNameWithTestPrefix(), test.getValues().size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -35,9 +35,12 @@ import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
|
||||
import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
|
||||
import io.airbyte.cdk.integrations.JdbcConnector;
|
||||
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
|
||||
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.util.AutoCloseableIterator;
|
||||
import io.airbyte.commons.util.AutoCloseableIterators;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
|
||||
import io.airbyte.integrations.source.mssql.cdc.MssqlDebeziumStateUtil;
|
||||
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
|
||||
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
|
||||
@@ -49,6 +52,7 @@ import io.airbyte.protocol.models.v0.AirbyteStream;
|
||||
import io.airbyte.protocol.models.v0.AirbyteStreamState;
|
||||
import io.airbyte.protocol.models.v0.SyncMode;
|
||||
import io.debezium.connector.sqlserver.Lsn;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -60,54 +64,33 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.sql.DataSource;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.api.TestInstance.Lifecycle;
|
||||
import org.testcontainers.containers.MSSQLServerContainer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestDatabase> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class);
|
||||
|
||||
static private final String CDC_ROLE_NAME = "cdc_selector";
|
||||
|
||||
static private final String TEST_USER_NAME_PREFIX = "cdc_test_user";
|
||||
|
||||
// Deliberately do not share this test container, as we're going to mutate the global SQL Server
|
||||
// state.
|
||||
protected final MSSQLServerContainer<?> privateContainer;
|
||||
|
||||
private DataSource testDataSource;
|
||||
|
||||
CdcMssqlSourceTest() {
|
||||
this.privateContainer = createContainer();
|
||||
}
|
||||
|
||||
protected MSSQLServerContainer<?> createContainer() {
|
||||
return new MsSQLContainerFactory().exclusive(
|
||||
MsSQLTestDatabase.BaseImage.MSSQL_2022.reference,
|
||||
MsSQLTestDatabase.ContainerModifier.AGENT.methodName);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
void afterAll() {
|
||||
privateContainer.close();
|
||||
}
|
||||
|
||||
protected final String testUserName() {
|
||||
return testdb.withNamespace(TEST_USER_NAME_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MsSQLTestDatabase createTestDatabase() {
|
||||
final var testdb = new MsSQLTestDatabase(privateContainer);
|
||||
return testdb
|
||||
.withConnectionProperty("encrypt", "false")
|
||||
.withConnectionProperty("databaseName", testdb.getDatabaseName())
|
||||
.initialized()
|
||||
return MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT)
|
||||
.withWaitUntilAgentRunning()
|
||||
.withCdc();
|
||||
}
|
||||
@@ -134,19 +117,12 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
|
||||
@Override
|
||||
@BeforeEach
|
||||
protected void setup() {
|
||||
super.setup();
|
||||
|
||||
testdb = createTestDatabase();
|
||||
createTables();
|
||||
// Enables cdc on MODELS_SCHEMA.MODELS_STREAM_NAME, giving CDC_ROLE_NAME select access.
|
||||
final var enableCdcSqlFmt = """
|
||||
EXEC sys.sp_cdc_enable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'%s',
|
||||
\t@role_name = N'%s',
|
||||
\t@supports_net_changes = 0""";
|
||||
testdb
|
||||
.with(enableCdcSqlFmt, modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
|
||||
.with(enableCdcSqlFmt, randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME)
|
||||
.withShortenedCapturePollingInterval();
|
||||
.withCdcForTable(modelsSchema(), MODELS_STREAM_NAME, CDC_ROLE_NAME)
|
||||
.withCdcForTable(randomSchema(), RANDOM_TABLE_NAME, CDC_ROLE_NAME);
|
||||
|
||||
// Create a test user to be used by the source, with proper permissions.
|
||||
testdb
|
||||
@@ -162,16 +138,24 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
|
||||
.with("USE [%s]", testdb.getDatabaseName())
|
||||
.with("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, testUserName());
|
||||
|
||||
populateTables();
|
||||
waitForCdcRecords();
|
||||
testDataSource = createTestDataSource();
|
||||
}
|
||||
|
||||
public void waitForCdcRecords() {
|
||||
testdb.waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, MODEL_RECORDS.size());
|
||||
testdb.waitForCdcRecords(randomSchema(), RANDOM_TABLE_NAME, MODEL_RECORDS_RANDOM.size());
|
||||
|
||||
}
|
||||
|
||||
protected DataSource createTestDataSource() {
|
||||
return DataSourceFactory.create(
|
||||
testUserName(),
|
||||
testdb.getPassword(),
|
||||
testdb.getDatabaseDriver().getDriverClassName(),
|
||||
testdb.getJdbcUrl(),
|
||||
Map.of("encrypt", "false"),
|
||||
Map.of("encrypt", "false", "trustServerCertificate", "true"),
|
||||
JdbcConnector.CONNECT_TIMEOUT_DEFAULT);
|
||||
}
|
||||
|
||||
@@ -299,41 +283,6 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
|
||||
() -> source().assertCdcSchemaQueryable(config(), testDatabase()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAssertSqlServerAgentRunning() {
|
||||
testdb.withAgentStopped().withWaitUntilAgentStopped();
|
||||
// assert expected failure if sql server agent stopped
|
||||
assertThrows(RuntimeException.class, () -> source().assertSqlServerAgentRunning(testDatabase()));
|
||||
// assert success if sql server agent running
|
||||
testdb.withAgentStarted().withWaitUntilAgentRunning();
|
||||
assertDoesNotThrow(() -> source().assertSqlServerAgentRunning(testDatabase()));
|
||||
}
|
||||
|
||||
// Ensure the CDC check operations are included when CDC is enabled
|
||||
// todo: make this better by checking the returned checkOperations from source.getCheckOperations
|
||||
@Test
|
||||
void testCdcCheckOperations() throws Exception {
|
||||
// assertCdcEnabledInDb
|
||||
testdb.withoutCdc();
|
||||
AirbyteConnectionStatus status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.withCdc();
|
||||
// assertCdcSchemaQueryable
|
||||
testdb.with("REVOKE SELECT ON SCHEMA :: [cdc] TO %s", testUserName());
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.with("GRANT SELECT ON SCHEMA :: [cdc] TO %s", testUserName());
|
||||
|
||||
// assertSqlServerAgentRunning
|
||||
|
||||
testdb.withAgentStopped().withWaitUntilAgentStopped();
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.withAgentStarted().withWaitUntilAgentRunning();
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCdcCheckOperationsWithDot() throws Exception {
|
||||
final String dbNameWithDot = testdb.getDatabaseName().replace("_", ".");
|
||||
@@ -347,7 +296,7 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
|
||||
// todo: check LSN returned is actually the max LSN
|
||||
// todo: check we fail as expected under certain conditions
|
||||
@Test
|
||||
void testGetTargetPosition() {
|
||||
void testGetTargetPosition() throws Exception {
|
||||
// check that getTargetPosition returns higher Lsn after inserting new row
|
||||
testdb.withWaitUntilMaxLsnAvailable();
|
||||
final Lsn firstLsn = MssqlCdcTargetPosition.getTargetPosition(testDatabase(), testdb.getDatabaseName()).targetLsn;
|
||||
@@ -478,4 +427,32 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void compareTargetPositionFromTheRecordsWithTargetPostionGeneratedBeforeSync(final CdcTargetPosition targetPosition,
|
||||
final AirbyteRecordMessage record) {
|
||||
// The LSN from records should be either equal or grater than the position value before the sync
|
||||
// started.
|
||||
// Since we're using shared containers, the current LSN can move forward without any data
|
||||
// modifications
|
||||
// (INSERT, UPDATE, DELETE) in the current DB
|
||||
assert targetPosition instanceof MssqlCdcTargetPosition;
|
||||
assertTrue(extractPosition(record.getData()).targetLsn.compareTo(((MssqlCdcTargetPosition) targetPosition).targetLsn) >= 0);
|
||||
}
|
||||
|
||||
protected void waitForCdcRecords(String schemaName, String tableName, int recordCount)
|
||||
throws Exception {
|
||||
testdb.waitForCdcRecords(schemaName, tableName, recordCount);
|
||||
}
|
||||
|
||||
protected void deleteCommand(final String streamName) {
|
||||
String selectCountSql = "SELECT COUNT(*) FROM %s.%s".formatted(modelsSchema(), streamName);
|
||||
try {
|
||||
int rowCount = testdb.query(ctx -> ctx.fetch(selectCountSql)).get(0).get(0, Integer.class);
|
||||
LOGGER.info("deleting all {} rows from table {}.{}", rowCount, modelsSchema(), streamName);
|
||||
super.deleteCommand(streamName);
|
||||
} catch (SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,34 +10,22 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import io.airbyte.cdk.db.factory.DataSourceFactory;
|
||||
import io.airbyte.cdk.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.cdk.integrations.JdbcConnector;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.CertificateKey;
|
||||
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Map;
|
||||
import javax.sql.DataSource;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.testcontainers.containers.MSSQLServerContainer;
|
||||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
public class CdcMssqlSslSourceTest extends CdcMssqlSourceTest {
|
||||
|
||||
@Override
|
||||
protected MSSQLServerContainer<?> createContainer() {
|
||||
return new MsSQLContainerFactory().exclusive(
|
||||
MsSQLTestDatabase.BaseImage.MSSQL_2022.reference,
|
||||
MsSQLTestDatabase.ContainerModifier.AGENT.methodName,
|
||||
MsSQLTestDatabase.ContainerModifier.WITH_SSL_CERTIFICATES.methodName);
|
||||
}
|
||||
|
||||
@Override
|
||||
final protected MsSQLTestDatabase createTestDatabase() {
|
||||
final var testdb = new MsSQLTestDatabase(privateContainer);
|
||||
return testdb
|
||||
.withConnectionProperty("encrypt", "true")
|
||||
.withConnectionProperty("databaseName", testdb.getDatabaseName())
|
||||
.withConnectionProperty("trustServerCertificate", "true")
|
||||
.initialized()
|
||||
.withWaitUntilAgentRunning()
|
||||
final var testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT, ContainerModifier.WITH_SSL_CERTIFICATES);
|
||||
return testdb.withWaitUntilAgentRunning()
|
||||
.withCdc();
|
||||
}
|
||||
|
||||
|
||||
@@ -56,28 +56,27 @@ public class CdcStateCompressionTest {
|
||||
private MsSQLTestDatabase testdb;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
public void setup() throws Exception {
|
||||
testdb = MsSQLTestDatabase.in(MsSQLTestDatabase.BaseImage.MSSQL_2022, MsSQLTestDatabase.ContainerModifier.AGENT)
|
||||
.withWaitUntilAgentRunning()
|
||||
.withCdc();
|
||||
|
||||
// Create a test schema and a bunch of test tables with CDC enabled.
|
||||
// Insert one row in each table so that they're not empty.
|
||||
final var enableCdcSqlFmt = """
|
||||
EXEC sys.sp_cdc_enable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'test_table_%d',
|
||||
\t@role_name = N'%s',
|
||||
\t@supports_net_changes = 0,
|
||||
\t@capture_instance = N'capture_instance_%d_%d'
|
||||
""";
|
||||
testdb.with("CREATE SCHEMA %s;", TEST_SCHEMA);
|
||||
for (int i = 0; i < TEST_TABLES; i++) {
|
||||
String tableName = "test_table_%d".formatted(i);
|
||||
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
|
||||
testdb
|
||||
.with("CREATE TABLE %s.test_table_%d (id INT IDENTITY(1,1) PRIMARY KEY);", TEST_SCHEMA, i)
|
||||
.with(enableCdcSqlFmt, TEST_SCHEMA, i, CDC_ROLE_NAME, i, 1)
|
||||
.withShortenedCapturePollingInterval()
|
||||
.with("INSERT INTO %s.test_table_%d DEFAULT VALUES", TEST_SCHEMA, i);
|
||||
.with("CREATE TABLE %s.%s (id INT IDENTITY(1,1) PRIMARY KEY);", TEST_SCHEMA, tableName)
|
||||
.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, cdcInstanceName)
|
||||
.with("INSERT INTO %s.%s DEFAULT VALUES", TEST_SCHEMA, tableName);
|
||||
}
|
||||
|
||||
for (int i = 0; i < TEST_TABLES; i++) {
|
||||
String tableName = "test_table_%d".formatted(i);
|
||||
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
|
||||
testdb.waitForCdcRecords(TEST_SCHEMA, tableName, cdcInstanceName, 1);
|
||||
}
|
||||
|
||||
// Create a test user to be used by the source, with proper permissions.
|
||||
@@ -97,15 +96,13 @@ public class CdcStateCompressionTest {
|
||||
// We do this by adding lots of columns with long names,
|
||||
// then migrating to a new CDC capture instance for each table.
|
||||
// This is admittedly somewhat awkward and perhaps could be improved.
|
||||
final var disableCdcSqlFmt = """
|
||||
EXEC sys.sp_cdc_disable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'test_table_%d',
|
||||
\t@capture_instance = N'capture_instance_%d_%d'
|
||||
""";
|
||||
|
||||
for (int i = 0; i < TEST_TABLES; i++) {
|
||||
String tableName = "test_table_%d".formatted(i);
|
||||
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 2);
|
||||
String oldCdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
|
||||
final var sb = new StringBuilder();
|
||||
sb.append("ALTER TABLE ").append(TEST_SCHEMA).append(".test_table_").append(i).append(" ADD");
|
||||
sb.append("ALTER TABLE ").append(TEST_SCHEMA).append(".").append(tableName).append(" ADD");
|
||||
for (int j = 0; j < ADDED_COLUMNS; j++) {
|
||||
sb.append((j > 0) ? ", " : " ")
|
||||
.append("rather_long_column_name_________________________________________________________________________________________").append(j)
|
||||
@@ -113,9 +110,8 @@ public class CdcStateCompressionTest {
|
||||
}
|
||||
testdb
|
||||
.with(sb.toString())
|
||||
.with(enableCdcSqlFmt, TEST_SCHEMA, i, CDC_ROLE_NAME, i, 2)
|
||||
.with(disableCdcSqlFmt, TEST_SCHEMA, i, i, 1)
|
||||
.withShortenedCapturePollingInterval();
|
||||
.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, cdcInstanceName)
|
||||
.withCdcDisabledForTable(TEST_SCHEMA, tableName, oldCdcInstanceName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +160,7 @@ public class CdcStateCompressionTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is similar in principle to {@link CdcMysqlSourceTest.testCompressedSchemaHistory}.
|
||||
* This test is similar in principle to CdcMysqlSourceTest.testCompressedSchemaHistory.
|
||||
*/
|
||||
@Test
|
||||
public void testCompressedSchemaHistory() throws Exception {
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.source.mssql;
|
||||
|
||||
import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import io.airbyte.cdk.db.factory.DataSourceFactory;
|
||||
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
|
||||
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.cdk.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.cdk.integrations.JdbcConnector;
|
||||
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
|
||||
import java.util.Map;
|
||||
import javax.sql.DataSource;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.MSSQLServerContainer;
|
||||
|
||||
public class MssqlAgentStateTest {
|
||||
|
||||
private static MsSQLTestDatabase testdb;
|
||||
private static DataSource testDataSource;
|
||||
private static MSSQLServerContainer privateContainer;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
privateContainer = new MsSQLContainerFactory().exclusive(
|
||||
MsSQLTestDatabase.BaseImage.MSSQL_2022.reference,
|
||||
MsSQLTestDatabase.ContainerModifier.AGENT);
|
||||
testdb = new MsSQLTestDatabase(privateContainer);
|
||||
testdb
|
||||
.withConnectionProperty("encrypt", "false")
|
||||
.withConnectionProperty("trustServerCertificate", "true")
|
||||
.withConnectionProperty("databaseName", testdb.getDatabaseName())
|
||||
.initialized()
|
||||
.withWaitUntilAgentRunning()
|
||||
.withCdc();
|
||||
testDataSource = DataSourceFactory.create(
|
||||
testdb.getUserName(),
|
||||
testdb.getPassword(),
|
||||
testdb.getDatabaseDriver().getDriverClassName(),
|
||||
testdb.getJdbcUrl(),
|
||||
Map.of("encrypt", "false", "trustServerCertificate", "true"),
|
||||
JdbcConnector.CONNECT_TIMEOUT_DEFAULT);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDown() {
|
||||
privateContainer.close();
|
||||
}
|
||||
|
||||
protected MssqlSource source() {
|
||||
return new MssqlSource();
|
||||
}
|
||||
|
||||
private JdbcDatabase testDatabase() {
|
||||
return new DefaultJdbcDatabase(testDataSource);
|
||||
}
|
||||
|
||||
protected JsonNode config() {
|
||||
return testdb.configBuilder()
|
||||
.withHostAndPort()
|
||||
.withDatabase()
|
||||
.with(JdbcUtils.USERNAME_KEY, testdb.getUserName())
|
||||
.with(JdbcUtils.PASSWORD_KEY, testdb.getPassword())
|
||||
.withCdcReplication()
|
||||
.withoutSsl()
|
||||
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAssertSqlServerAgentRunning() throws Exception {
|
||||
testdb.withAgentStopped().withWaitUntilAgentStopped();
|
||||
// assert expected failure if sql server agent stopped
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> source().assertSqlServerAgentRunning(testDatabase()));
|
||||
// assert success if sql server agent running
|
||||
testdb.withAgentStarted().withWaitUntilAgentRunning();
|
||||
assertDoesNotThrow(() -> source().assertSqlServerAgentRunning(testDatabase()));
|
||||
}
|
||||
|
||||
// Ensure the CDC check operations are included when CDC is enabled
|
||||
// todo: make this better by checking the returned checkOperations from source.getCheckOperations
|
||||
@Test
|
||||
void testCdcCheckOperations() throws Exception {
|
||||
// assertCdcEnabledInDb
|
||||
testdb.withoutCdc();
|
||||
AirbyteConnectionStatus status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.withCdc();
|
||||
// assertCdcSchemaQueryable
|
||||
testdb.with("REVOKE SELECT ON SCHEMA :: [cdc] TO %s", testdb.getUserName());
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.with("GRANT SELECT ON SCHEMA :: [cdc] TO %s", testdb.getUserName());
|
||||
|
||||
// assertSqlServerAgentRunning
|
||||
|
||||
testdb.withAgentStopped().withWaitUntilAgentStopped();
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
testdb.withAgentStarted().withWaitUntilAgentRunning();
|
||||
status = source().check(config());
|
||||
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -23,15 +23,15 @@ public class MsSQLContainerFactory extends ContainerFactory<MSSQLServerContainer
|
||||
/**
|
||||
* Create a new network and bind it to the container.
|
||||
*/
|
||||
public void withNetwork(MSSQLServerContainer<?> container) {
|
||||
public static void withNetwork(MSSQLServerContainer<?> container) {
|
||||
container.withNetwork(Network.newNetwork());
|
||||
}
|
||||
|
||||
public void withAgent(MSSQLServerContainer<?> container) {
|
||||
public static void withAgent(MSSQLServerContainer<?> container) {
|
||||
container.addEnv("MSSQL_AGENT_ENABLED", "True");
|
||||
}
|
||||
|
||||
public void withSslCertificates(MSSQLServerContainer<?> container) {
|
||||
public static void withSslCertificates(MSSQLServerContainer<?> container) {
|
||||
// yes, this is uglier than sin. The reason why I'm doing this is because there's no command to
|
||||
// reload a SqlServer config. So I need to create all the necessary files before I start the
|
||||
// SQL server. Hence this horror
|
||||
|
||||
@@ -7,19 +7,24 @@ package io.airbyte.integrations.source.mssql;
|
||||
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
|
||||
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.RESYNC_DATA_OPTION;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import io.airbyte.cdk.db.factory.DatabaseDriver;
|
||||
import io.airbyte.cdk.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.cdk.testutils.ContainerFactory.NamedContainerModifier;
|
||||
import io.airbyte.cdk.testutils.TestDatabase;
|
||||
import io.debezium.connector.sqlserver.Lsn;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.jooq.exception.DataAccessException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.MSSQLServerContainer;
|
||||
@@ -28,7 +33,9 @@ public class MsSQLTestDatabase extends TestDatabase<MSSQLServerContainer<?>, MsS
|
||||
|
||||
static private final Logger LOGGER = LoggerFactory.getLogger(MsSQLTestDatabase.class);
|
||||
|
||||
static public final int MAX_RETRIES = 60;
|
||||
// empirically, 240 is enough. If you fee like you need to increase it, you're probably missing a
|
||||
// check somewhere
|
||||
static public final int MAX_RETRIES = 240;
|
||||
|
||||
public enum BaseImage {
|
||||
|
||||
@@ -44,42 +51,112 @@ public class MsSQLTestDatabase extends TestDatabase<MSSQLServerContainer<?>, MsS
|
||||
|
||||
}
|
||||
|
||||
public enum ContainerModifier {
|
||||
public enum ContainerModifier implements NamedContainerModifier<MSSQLServerContainer<?>> {
|
||||
|
||||
NETWORK("withNetwork"),
|
||||
AGENT("withAgent"),
|
||||
WITH_SSL_CERTIFICATES("withSslCertificates"),
|
||||
NETWORK(MsSQLContainerFactory::withNetwork),
|
||||
AGENT(MsSQLContainerFactory::withAgent),
|
||||
WITH_SSL_CERTIFICATES(MsSQLContainerFactory::withSslCertificates),
|
||||
;
|
||||
|
||||
public final String methodName;
|
||||
public final Consumer<MSSQLServerContainer<?>> modifier;
|
||||
|
||||
ContainerModifier(final String methodName) {
|
||||
this.methodName = methodName;
|
||||
ContainerModifier(final Consumer<MSSQLServerContainer<?>> modifier) {
|
||||
this.modifier = modifier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<MSSQLServerContainer<?>> modifier() {
|
||||
return modifier;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerModifier... methods) {
|
||||
final String[] methodNames = Stream.of(methods).map(im -> im.methodName).toList().toArray(new String[0]);
|
||||
final var container = new MsSQLContainerFactory().shared(imageName.reference, methodNames);
|
||||
final var testdb = new MsSQLTestDatabase(container);
|
||||
static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerModifier... modifiers) {
|
||||
final var container = new MsSQLContainerFactory().shared(imageName.reference, modifiers);
|
||||
final MsSQLTestDatabase testdb = new MsSQLTestDatabase(container);
|
||||
return testdb
|
||||
.withConnectionProperty("encrypt", "false")
|
||||
.withConnectionProperty("trustServerCertificate", "true")
|
||||
.withConnectionProperty("databaseName", testdb.getDatabaseName())
|
||||
.initialized();
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase(final MSSQLServerContainer<?> container) {
|
||||
super(container);
|
||||
LOGGER.info("creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName());
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase withCdc() {
|
||||
return with("EXEC sys.sp_cdc_enable_db;");
|
||||
LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId);
|
||||
with("EXEC sys.sp_cdc_enable_db;");
|
||||
LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId);
|
||||
return this;
|
||||
}
|
||||
|
||||
private static final String RETRYABLE_CDC_TABLE_ENABLEMENT_ERROR_CONTENT =
|
||||
"The error returned was 14258: 'Cannot perform this operation while SQLServerAgent is starting. Try again later.'";
|
||||
private static final String ENABLE_CDC_SQL_FMT = """
|
||||
EXEC sys.sp_cdc_enable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'%s',
|
||||
\t@role_name = %s,
|
||||
\t@supports_net_changes = 0,
|
||||
\t@capture_instance = N'%s'""";
|
||||
private final Set<String> CDC_INSTANCE_NAMES = Sets.newConcurrentHashSet();
|
||||
|
||||
public MsSQLTestDatabase withCdcForTable(String schemaName, String tableName, String roleName) {
|
||||
return withCdcForTable(schemaName, tableName, roleName, "%s_%s".formatted(schemaName, tableName));
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase withCdcForTable(String schemaName, String tableName, String roleName, String instanceName) {
|
||||
LOGGER.info(formatLogLine("enabling CDC for table {}.{} and role {}, instance {}"), schemaName, tableName, roleName, instanceName);
|
||||
String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName);
|
||||
for (int tryCount = 0; tryCount < MAX_RETRIES; tryCount++) {
|
||||
try {
|
||||
Thread.sleep(1_000);
|
||||
synchronized (getContainer()) {
|
||||
LOGGER.info(formatLogLine("Trying to enable CDC for table {}.{} and role {}, instance {}, try {}/{}"), schemaName, tableName, roleName,
|
||||
instanceName, tryCount, MAX_RETRIES);
|
||||
with(ENABLE_CDC_SQL_FMT.formatted(schemaName, tableName, sqlRoleName, instanceName));
|
||||
}
|
||||
CDC_INSTANCE_NAMES.add(instanceName);
|
||||
return withShortenedCapturePollingInterval();
|
||||
} catch (DataAccessException e) {
|
||||
if (!e.getMessage().contains(RETRYABLE_CDC_TABLE_ENABLEMENT_ERROR_CONTENT)) {
|
||||
throw e;
|
||||
}
|
||||
tryCount++;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException(formatLogLine("failed to enable CDC for table %s.%s within %d seconds").formatted(schemaName, tableName, MAX_RETRIES));
|
||||
}
|
||||
|
||||
private static final String DISABLE_CDC_SQL_FMT = """
|
||||
EXEC sys.sp_cdc_disable_table
|
||||
\t@source_schema = N'%s',
|
||||
\t@source_name = N'%s',
|
||||
\t@capture_instance = N'%s'
|
||||
""";
|
||||
|
||||
public MsSQLTestDatabase withCdcDisabledForTable(String schemaName, String tableName, String instanceName) {
|
||||
LOGGER.info(formatLogLine("disabling CDC for table {}.{}, instance {}"), schemaName, tableName, instanceName);
|
||||
if (!CDC_INSTANCE_NAMES.remove(instanceName)) {
|
||||
throw new RuntimeException(formatLogLine("CDC was disabled for instance ") + instanceName);
|
||||
}
|
||||
synchronized (getContainer()) {
|
||||
return with(DISABLE_CDC_SQL_FMT.formatted(schemaName, tableName, instanceName));
|
||||
}
|
||||
}
|
||||
|
||||
private static final String DISABLE_CDC_SQL = "EXEC sys.sp_cdc_disable_db;";
|
||||
|
||||
public MsSQLTestDatabase withoutCdc() {
|
||||
return with("EXEC sys.sp_cdc_disable_db;");
|
||||
CDC_INSTANCE_NAMES.clear();
|
||||
synchronized (getContainer()) {
|
||||
return with(DISABLE_CDC_SQL);
|
||||
}
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase withAgentStarted() {
|
||||
@@ -100,50 +177,87 @@ public class MsSQLTestDatabase extends TestDatabase<MSSQLServerContainer<?>, MsS
|
||||
return self();
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase waitForCdcRecords(String schemaName, String tableName, int recordCount) {
|
||||
return waitForCdcRecords(schemaName, tableName, "%s_%s".formatted(schemaName, tableName), recordCount);
|
||||
}
|
||||
|
||||
public MsSQLTestDatabase waitForCdcRecords(String schemaName, String tableName, String cdcInstanceName, int recordCount) {
|
||||
if (!CDC_INSTANCE_NAMES.contains(cdcInstanceName)) {
|
||||
throw new RuntimeException("CDC is not enabled on instance %s".formatted(cdcInstanceName));
|
||||
}
|
||||
String sql = "SELECT count(*) FROM cdc.%s_ct".formatted(cdcInstanceName);
|
||||
int actualRecordCount = 0;
|
||||
for (int tryCount = 0; tryCount < MAX_RETRIES; tryCount++) {
|
||||
LOGGER.info(formatLogLine("fetching the number of CDC records for {}.{}, instance {}"), schemaName, tableName, cdcInstanceName);
|
||||
try {
|
||||
Thread.sleep(1_000);
|
||||
actualRecordCount = query(ctx -> ctx.fetch(sql)).get(0).get(0, Integer.class);
|
||||
} catch (SQLException | DataAccessException e) {
|
||||
actualRecordCount = 0;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
LOGGER.info(formatLogLine("Found {} CDC records for {}.{} in instance {}. Expecting {}. Trying again ({}/{}"), actualRecordCount, schemaName,
|
||||
tableName, cdcInstanceName,
|
||||
recordCount, tryCount, MAX_RETRIES);
|
||||
if (actualRecordCount >= recordCount) {
|
||||
LOGGER.info(formatLogLine("found {} records after {} tries!"), actualRecordCount, tryCount);
|
||||
return self();
|
||||
}
|
||||
}
|
||||
throw new RuntimeException(formatLogLine(
|
||||
"failed to find %d records after %s seconds. Only found %d!").formatted(recordCount, MAX_RETRIES, actualRecordCount));
|
||||
}
|
||||
|
||||
private boolean shortenedPollingIntervalEnabled = false;
|
||||
|
||||
public MsSQLTestDatabase withShortenedCapturePollingInterval() {
|
||||
return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = %d;",
|
||||
MssqlCdcTargetPosition.MAX_LSN_QUERY_DELAY_TEST.toSeconds());
|
||||
if (!shortenedPollingIntervalEnabled) {
|
||||
synchronized (getContainer()) {
|
||||
shortenedPollingIntervalEnabled = true;
|
||||
with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = 1;");
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void waitForAgentState(final boolean running) {
|
||||
final String expectedValue = running ? "Running." : "Stopped.";
|
||||
LOGGER.debug("Waiting for SQLServerAgent state to change to '{}'.", expectedValue);
|
||||
LOGGER.info(formatLogLine("Waiting for SQLServerAgent state to change to '{}'."), expectedValue);
|
||||
for (int i = 0; i < MAX_RETRIES; i++) {
|
||||
try {
|
||||
Thread.sleep(1_000);
|
||||
final var r = query(ctx -> ctx.fetch("EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';").get(0));
|
||||
if (expectedValue.equalsIgnoreCase(r.getValue(0).toString())) {
|
||||
LOGGER.debug("SQLServerAgent state is '{}', as expected.", expectedValue);
|
||||
LOGGER.info(formatLogLine("SQLServerAgent state is '{}', as expected."), expectedValue);
|
||||
return;
|
||||
}
|
||||
LOGGER.debug("Retrying, SQLServerAgent state {} does not match expected '{}'.", r, expectedValue);
|
||||
LOGGER.info(formatLogLine("Retrying, SQLServerAgent state {} does not match expected '{}'."), r, expectedValue);
|
||||
} catch (final SQLException e) {
|
||||
LOGGER.debug("Retrying agent state query after catching exception {}.", e.getMessage());
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1_000); // Wait one second between retries.
|
||||
} catch (final InterruptedException e) {
|
||||
LOGGER.info(formatLogLine("Retrying agent state query after catching exception {}."), e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Exhausted retry attempts while polling for agent state");
|
||||
throw new RuntimeException(formatLogLine("Exhausted retry attempts while polling for agent state"));
|
||||
}
|
||||
|
||||
public static final String MAX_LSN_QUERY = "SELECT sys.fn_cdc_get_max_lsn();";
|
||||
|
||||
public MsSQLTestDatabase withWaitUntilMaxLsnAvailable() {
|
||||
LOGGER.debug("Waiting for max LSN to become available for database {}.", getDatabaseName());
|
||||
LOGGER.info(formatLogLine("Waiting for max LSN to become available for database {}."), getDatabaseName());
|
||||
for (int i = 0; i < MAX_RETRIES; i++) {
|
||||
try {
|
||||
final var maxLSN = query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn();").get(0).get(0, byte[].class));
|
||||
Thread.sleep(1_000);
|
||||
final var maxLSN = query(ctx -> ctx.fetch(MAX_LSN_QUERY).get(0).get(0, byte[].class));
|
||||
if (maxLSN != null) {
|
||||
LOGGER.debug("Max LSN available for database {}: {}", getDatabaseName(), Lsn.valueOf(maxLSN));
|
||||
LOGGER.info(formatLogLine("Max LSN available for database {}: {}"), getDatabaseName(), Lsn.valueOf(maxLSN));
|
||||
return self();
|
||||
}
|
||||
LOGGER.debug("Retrying, max LSN still not available for database {}.", getDatabaseName());
|
||||
LOGGER.info(formatLogLine("Retrying, max LSN still not available for database {}."), getDatabaseName());
|
||||
} catch (final SQLException e) {
|
||||
LOGGER.warn("Retrying max LSN query after catching exception {}", e.getMessage());
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1_000); // Wait one second between retries.
|
||||
} catch (final InterruptedException e) {
|
||||
LOGGER.info(formatLogLine("Retrying max LSN query after catching exception {}"), e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@@ -224,23 +338,22 @@ public class MsSQLTestDatabase extends TestDatabase<MSSQLServerContainer<?>, MsS
|
||||
|
||||
}
|
||||
|
||||
private Map<CertificateKey, String> cachedCerts;
|
||||
private volatile Map<CertificateKey, String> cachedCerts = new ConcurrentHashMap<>();
|
||||
|
||||
public synchronized String getCertificate(final CertificateKey certificateKey) {
|
||||
if (cachedCerts == null) {
|
||||
final Map<CertificateKey, String> cachedCerts = new HashMap<>();
|
||||
public String getCertificate(final CertificateKey certificateKey) {
|
||||
if (!cachedCerts.containsKey(certificateKey)) {
|
||||
final String certificate;
|
||||
try {
|
||||
for (final CertificateKey key : CertificateKey.values()) {
|
||||
final String command = "cat /tmp/certs/" + key.name().toLowerCase() + ".crt";
|
||||
final String certificate = getContainer().execInContainer("bash", "-c", command).getStdout().trim();
|
||||
cachedCerts.put(key, certificate);
|
||||
}
|
||||
final String command = "cat /tmp/certs/" + certificateKey.name().toLowerCase() + ".crt";
|
||||
certificate = getContainer().execInContainer("bash", "-c", command).getStdout().trim();
|
||||
} catch (final IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.cachedCerts = cachedCerts;
|
||||
synchronized (cachedCerts) {
|
||||
this.cachedCerts.put(certificateKey, certificate);
|
||||
}
|
||||
}
|
||||
return cachedCerts.get(certificateKey);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user