migrate mysql source to incremental (#1267)
This commit is contained in:
@@ -29,13 +29,13 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.integrations.base.IntegrationRunner;
|
||||
import io.airbyte.integrations.base.Source;
|
||||
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
|
||||
import io.airbyte.integrations.source.jdbc.AbstractJooqSource;
|
||||
import java.util.List;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MySqlSource extends AbstractJdbcSource implements Source {
|
||||
public class MySqlSource extends AbstractJooqSource implements Source {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class);
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ package io.airbyte.integrations.source.mysql;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import io.airbyte.db.Database;
|
||||
@@ -33,9 +34,11 @@ import io.airbyte.db.Databases;
|
||||
import io.airbyte.integrations.standardtest.source.StandardSourceTest;
|
||||
import io.airbyte.protocol.models.CatalogHelpers;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
|
||||
import io.airbyte.protocol.models.ConnectorSpecification;
|
||||
import io.airbyte.protocol.models.Field;
|
||||
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
|
||||
import io.airbyte.protocol.models.SyncMode;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -45,6 +48,7 @@ import org.testcontainers.containers.MySQLContainer;
|
||||
public class MySqlIntegrationTest extends StandardSourceTest {
|
||||
|
||||
private static final String STREAM_NAME = "id_and_name";
|
||||
private static final String STREAM_NAME2 = "public.starships";
|
||||
|
||||
private MySQLContainer<?> container;
|
||||
private JsonNode config;
|
||||
@@ -75,6 +79,8 @@ public class MySqlIntegrationTest extends StandardSourceTest {
|
||||
database.query(ctx -> {
|
||||
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
|
||||
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
|
||||
ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
|
||||
ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
|
||||
return null;
|
||||
});
|
||||
|
||||
@@ -103,10 +109,23 @@ public class MySqlIntegrationTest extends StandardSourceTest {
|
||||
|
||||
@Override
|
||||
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
|
||||
return CatalogHelpers.createConfiguredAirbyteCatalog(
|
||||
String.format("%s.%s", config.get("database").asText(), STREAM_NAME),
|
||||
Field.of("id", JsonSchemaPrimitive.NUMBER),
|
||||
Field.of("name", JsonSchemaPrimitive.STRING));
|
||||
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
|
||||
new ConfiguredAirbyteStream()
|
||||
.withSyncMode(SyncMode.INCREMENTAL)
|
||||
.withCursorField(Lists.newArrayList("id"))
|
||||
.withStream(CatalogHelpers.createAirbyteStream(
|
||||
String.format("%s.%s", config.get("database").asText(), STREAM_NAME),
|
||||
Field.of("id", JsonSchemaPrimitive.NUMBER),
|
||||
Field.of("name", JsonSchemaPrimitive.STRING))
|
||||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
|
||||
new ConfiguredAirbyteStream()
|
||||
.withSyncMode(SyncMode.INCREMENTAL)
|
||||
.withCursorField(Lists.newArrayList("id"))
|
||||
.withStream(CatalogHelpers.createAirbyteStream(
|
||||
String.format("%s.%s", config.get("database").asText(), STREAM_NAME2),
|
||||
Field.of("id", JsonSchemaPrimitive.NUMBER),
|
||||
Field.of("name", JsonSchemaPrimitive.STRING))
|
||||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -50,6 +50,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream;
|
||||
import io.airbyte.protocol.models.ConnectorSpecification;
|
||||
import io.airbyte.protocol.models.Field;
|
||||
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
|
||||
import io.airbyte.protocol.models.SyncMode;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
@@ -177,10 +178,11 @@ class MySqlSourceTest {
|
||||
}
|
||||
|
||||
private AirbyteCatalog generateExpectedCatalog() {
|
||||
return CatalogHelpers.createAirbyteCatalog(
|
||||
return new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream(
|
||||
getStreamName(),
|
||||
Field.of("id", JsonSchemaPrimitive.NUMBER),
|
||||
Field.of("name", JsonSchemaPrimitive.STRING));
|
||||
Field.of("name", JsonSchemaPrimitive.STRING))
|
||||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
|
||||
}
|
||||
|
||||
private ConfiguredAirbyteCatalog generateConfiguredCatalog() {
|
||||
|
||||
Reference in New Issue
Block a user