1
0
mirror of synced 2025-12-25 02:09:19 -05:00

speed up source-mssql tests (#35799)

This commit is contained in:
Stephane Geneix
2024-03-07 07:16:02 -08:00
committed by GitHub
parent 666a3a3a73
commit 6b26b2770a
5 changed files with 107 additions and 41 deletions

View File

@@ -30,7 +30,13 @@ import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
@TestInstance(Lifecycle.PER_METHOD)
@Execution(ExecutionMode.CONCURRENT)
public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String SCHEMA_NAME = "dbo";

View File

@@ -8,9 +8,23 @@ import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.ContainerModifier;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
@TestInstance(Lifecycle.PER_METHOD)
@Execution(ExecutionMode.CONCURRENT)
public class CdcMssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest {
private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
@@ -27,17 +41,34 @@ public class CdcMssqlSourceDatatypeTest extends AbstractMssqlSourceDatatypeTest
}
protected void createTables() throws Exception {
super.createTables();
List<Callable<MsSQLTestDatabase>> createTableTasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> enableCdcForTableTasks = new ArrayList<>();
for (var test : testDataHolders) {
testdb.withCdcForTable(test.getNameSpace(), test.getNameWithTestPrefix(), null);
createTableTasks.add(() -> testdb.with(test.getCreateSqlQuery()));
enableCdcForTableTasks.add(() -> testdb.withCdcForTable(test.getNameSpace(), test.getNameWithTestPrefix(), null));
}
executor.invokeAll(createTableTasks);
executor.invokeAll(enableCdcForTableTasks);
}
protected void populateTables() throws Exception {
super.populateTables();
List<Callable<MsSQLTestDatabase>> insertTasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> waitForCdcRecordsTasks = new ArrayList<>();
for (var test : testDataHolders) {
testdb.waitForCdcRecords(test.getNameSpace(), test.getNameWithTestPrefix(), test.getValues().size());
insertTasks.add(() -> {
this.database.query((ctx) -> {
List<String> sql = test.getInsertSqlQueries();
Objects.requireNonNull(ctx);
sql.forEach(ctx::fetch);
return null;
});
return null;
});
waitForCdcRecordsTasks.add(() -> testdb.waitForCdcRecords(test.getNameSpace(), test.getNameWithTestPrefix(), test.getExpectedValues().size()));
}
// executor.invokeAll(insertTasks);
executor.invokeAll(insertTasks);
executor.invokeAll(waitForCdcRecordsTasks);
}
@Override

View File

@@ -70,10 +70,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@TestInstance(Lifecycle.PER_CLASS)
@TestInstance(Lifecycle.PER_METHOD)
@Execution(ExecutionMode.CONCURRENT)
public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestDatabase> {
private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class);

View File

@@ -18,8 +18,12 @@ import java.net.UnknownHostException;
import java.util.Map;
import javax.sql.DataSource;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestInstance(Lifecycle.PER_METHOD)
@Execution(ExecutionMode.CONCURRENT)
public class CdcMssqlSslSourceTest extends CdcMssqlSourceTest {
@Override

View File

@@ -36,24 +36,45 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CdcStateCompressionTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CdcStateCompressionTest.class);
static private final String CDC_ROLE_NAME = "cdc_selector";
static private final String TEST_USER_NAME_PREFIX = "cdc_test_user";
static private final String TEST_SCHEMA = "test_schema";
static private final int TEST_TABLES = 10;
static private final int TEST_TABLES = 4;
// SQLServer tables can't have more than 1024 columns.
static private final int ADDED_COLUMNS = 1000;
private MsSQLTestDatabase testdb;
private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static final String ALTER_TABLE_ADD_COLUMN_SQL;
static {
StringBuilder sb = new StringBuilder();
sb.append("ALTER TABLE ").append(TEST_SCHEMA).append(".%s ADD");
for (int j = 0; j < ADDED_COLUMNS; j++) {
sb.append((j > 0) ? ", " : " ")
// Sqlserver column names can't be longer than 128 characters
.append("rather_long_column_name_________________________________________________________________________________________").append(j)
.append(" INT NULL");
}
ALTER_TABLE_ADD_COLUMN_SQL = sb.toString();
}
@BeforeEach
public void setup() throws Exception {
@@ -64,20 +85,32 @@ public class CdcStateCompressionTest {
// Create a test schema and a bunch of test tables with CDC enabled.
// Insert one row in each table so that they're not empty.
testdb.with("CREATE SCHEMA %s;", TEST_SCHEMA);
for (int i = 0; i < TEST_TABLES; i++) {
String tableName = "test_table_%d".formatted(i);
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
testdb
.with("CREATE TABLE %s.%s (id INT IDENTITY(1,1) PRIMARY KEY);", TEST_SCHEMA, tableName)
.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, cdcInstanceName)
.with("INSERT INTO %s.%s DEFAULT VALUES", TEST_SCHEMA, tableName);
}
List<Callable<MsSQLTestDatabase>> createAndPopulateTableTasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> waitForCdcRecordTasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> alterTabletasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> enableTableCdctasks = new ArrayList<>();
List<Callable<MsSQLTestDatabase>> disableTableCdctasks = new ArrayList<>();
for (int i = 0; i < TEST_TABLES; i++) {
String tableName = "test_table_%d".formatted(i);
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
testdb.waitForCdcRecords(TEST_SCHEMA, tableName, cdcInstanceName, 1);
String initialCdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
String finalCdcInstanceName = "capture_instance_%d_%d".formatted(i, 2);
createAndPopulateTableTasks.add(() -> testdb
.with("CREATE TABLE %s.%s (id INT IDENTITY(1,1) PRIMARY KEY);", TEST_SCHEMA, tableName)
.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, initialCdcInstanceName)
.with("INSERT INTO %s.%s DEFAULT VALUES", TEST_SCHEMA, tableName));
waitForCdcRecordTasks.add(() -> testdb.waitForCdcRecords(TEST_SCHEMA, tableName, initialCdcInstanceName, 1));
// Increase schema history size to trigger state compression.
// We do this by adding lots of columns with long names,
// then migrating to a new CDC capture instance for each table.
// This is admittedly somewhat awkward and perhaps could be improved.
alterTabletasks.add(() -> testdb.with(ALTER_TABLE_ADD_COLUMN_SQL.formatted(tableName)));
enableTableCdctasks.add(() -> testdb.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, finalCdcInstanceName));
disableTableCdctasks.add(() -> testdb.withCdcDisabledForTable(TEST_SCHEMA, tableName, initialCdcInstanceName));
}
executor.invokeAll(createAndPopulateTableTasks);
executor.invokeAll(waitForCdcRecordTasks);
// Create a test user to be used by the source, with proper permissions.
testdb
@@ -91,28 +124,9 @@ public class CdcStateCompressionTest {
.with("GRANT VIEW SERVER STATE TO %s", testUserName())
.with("USE [%s]", testdb.getDatabaseName())
.with("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, testUserName());
// Increase schema history size to trigger state compression.
// We do this by adding lots of columns with long names,
// then migrating to a new CDC capture instance for each table.
// This is admittedly somewhat awkward and perhaps could be improved.
for (int i = 0; i < TEST_TABLES; i++) {
String tableName = "test_table_%d".formatted(i);
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 2);
String oldCdcInstanceName = "capture_instance_%d_%d".formatted(i, 1);
final var sb = new StringBuilder();
sb.append("ALTER TABLE ").append(TEST_SCHEMA).append(".").append(tableName).append(" ADD");
for (int j = 0; j < ADDED_COLUMNS; j++) {
sb.append((j > 0) ? ", " : " ")
.append("rather_long_column_name_________________________________________________________________________________________").append(j)
.append(" INT NULL");
}
testdb
.with(sb.toString())
.withCdcForTable(TEST_SCHEMA, tableName, CDC_ROLE_NAME, cdcInstanceName)
.withCdcDisabledForTable(TEST_SCHEMA, tableName, oldCdcInstanceName);
}
executor.invokeAll(alterTabletasks);
executor.invokeAll(enableTableCdctasks);
executor.invokeAll(disableTableCdctasks);
}
private AirbyteCatalog getCatalog() {
@@ -151,7 +165,7 @@ public class CdcStateCompressionTest {
.with("is_test", true)
.with("replication_method", Map.of(
"method", "CDC",
"initial_waiting_seconds", 60))
"initial_waiting_seconds", 20))
.build();
}
@@ -182,11 +196,19 @@ public class CdcStateCompressionTest {
assertEquals("1", record.getData().get("id").toString());
}
LOGGER.info("inserting new data into test tables");
List<Callable<MsSQLTestDatabase>> waitForCdcTasks = new ArrayList<>();
// Insert a bunch of records (1 per table, again).
for (int i = 0; i < TEST_TABLES; i++) {
testdb.with("INSERT %s.test_table_%d DEFAULT VALUES;", TEST_SCHEMA, i);
String tableName = "test_table_%d".formatted(i);
String cdcInstanceName = "capture_instance_%d_%d".formatted(i, 2);
testdb.with("INSERT %s.%s DEFAULT VALUES;", TEST_SCHEMA, tableName);
waitForCdcTasks.add(() -> testdb.waitForCdcRecords(TEST_SCHEMA, tableName, cdcInstanceName, 1));
}
LOGGER.info("waiting for CDC records");
executor.invokeAll(waitForCdcTasks);
LOGGER.info("starting second sync");
// Second sync.
final var secondBatchStateForRead = Jsons.jsonNode(Collections.singletonList(Iterables.getLast(extractStateMessages(dataFromFirstBatch))));
final var secondBatchIterator = source().read(config(), getConfiguredCatalog(), secondBatchStateForRead);