1
0
mirror of synced 2025-12-25 02:09:19 -05:00

☝🏼Destinations supports destination sync mode (#2460)

* Handle destination sync mode in destinations

* Source & Destination sync modes are required (#2500)

* Provide Migration script making sure it is always defined for previous sync configs
This commit is contained in:
Christophe Duong
2021-03-26 20:23:48 +01:00
committed by GitHub
parent 62263b64b7
commit 8a29584125
64 changed files with 812 additions and 225 deletions

View File

@@ -1887,10 +1887,12 @@ components:
description: the mutable part of the stream to configure the destination
type: object
additionalProperties: false
required:
- syncMode
- destinationSyncMode
properties:
syncMode:
$ref: "#/components/schemas/SyncMode"
default: full_refresh
cursorField:
description: Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.
type: array
@@ -1898,7 +1900,6 @@ components:
type: string
destinationSyncMode:
$ref: "#/components/schemas/DestinationSyncMode"
default: append
primaryKey:
description: Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.
type: array
@@ -2180,7 +2181,7 @@ components:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
AirbyteArchive:
type: string
format: binary

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}

View File

@@ -1,35 +1,35 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch

View File

@@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
@@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
@@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio

View File

@@ -8,4 +8,4 @@ enum:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2

View File

@@ -141,12 +141,12 @@ class ConfiguredAirbyteStream(BaseModel):
extra = Extra.allow
stream: AirbyteStream
sync_mode: Optional[SyncMode] = "full_refresh"
sync_mode: SyncMode
cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.",
)
destination_sync_mode: Optional[DestinationSyncMode] = "append"
destination_sync_mode: DestinationSyncMode
primary_key: Optional[List[str]] = Field(
None,
description="Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.",

View File

@@ -24,7 +24,7 @@
package io.airbyte.integrations.destination;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
public class WriteConfig {
@@ -32,9 +32,9 @@ public class WriteConfig {
private final String outputNamespaceName;
private final String tmpTableName;
private final String outputTableName;
private final SyncMode syncMode;
private final DestinationSyncMode syncMode;
public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, SyncMode syncMode) {
public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) {
this.streamName = streamName;
this.outputNamespaceName = outputNamespaceName;
this.tmpTableName = tmpTableName;
@@ -58,7 +58,7 @@ public class WriteConfig {
return outputTableName;
}
public SyncMode getSyncMode() {
public DestinationSyncMode getSyncMode() {
return syncMode;
}

View File

@@ -48,6 +48,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DefaultCheckConnectionWorker;
@@ -292,7 +293,10 @@ public abstract class TestDestination {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
configuredCatalog.getStreams().forEach(s -> s.withSyncMode(SyncMode.INCREMENTAL));
configuredCatalog.getStreams().forEach(s -> {
s.withSyncMode(SyncMode.INCREMENTAL);
s.withDestinationSyncMode(DestinationSyncMode.APPEND);
});
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), firstSyncMessages, configuredCatalog);

View File

@@ -48,6 +48,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
@@ -378,6 +379,7 @@ public abstract class StandardSourceTest {
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getStream().getSupportedSyncModes().contains(FULL_REFRESH)) {
configuredStream.setSyncMode(FULL_REFRESH);
configuredStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}
}
return clone;

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-bigquery

View File

@@ -64,8 +64,8 @@ import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -231,7 +231,7 @@ public class BigQueryDestination implements Destination {
.setFormatOptions(FormatOptions.json()).build(); // new-line delimited json.
final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration);
final WriteDisposition syncMode = getWriteDisposition(stream.getSyncMode());
final WriteDisposition syncMode = getWriteDisposition(stream.getDestinationSyncMode());
writeConfigs.put(stream.getStream().getName(),
new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode));
@@ -242,13 +242,18 @@ public class BigQueryDestination implements Destination {
return new RecordConsumer(bigquery, writeConfigs, catalog);
}
private static WriteDisposition getWriteDisposition(SyncMode syncMode) {
if (syncMode == null || syncMode == SyncMode.FULL_REFRESH) {
return WriteDisposition.WRITE_TRUNCATE;
} else if (syncMode == SyncMode.INCREMENTAL) {
return WriteDisposition.WRITE_APPEND;
} else {
throw new IllegalStateException("Unrecognized sync mode: " + syncMode);
private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) {
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
switch (syncMode) {
case OVERWRITE -> {
return WriteDisposition.WRITE_TRUNCATE;
}
case APPEND, APPEND_DEDUP -> {
return WriteDisposition.WRITE_APPEND;
}
default -> throw new IllegalStateException("Unrecognized destination sync mode: " + syncMode);
}
}

View File

@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-csv

View File

@@ -39,8 +39,8 @@ import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
@@ -105,12 +105,16 @@ public class CsvDestination implements Destination {
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
csvFormat = csvFormat.withSkipHeaderRecord();
}
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isIncremental);
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isAppendMode);
final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
}

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-jdbc

View File

@@ -38,7 +38,7 @@ import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStre
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.RecordWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
import java.util.Map;
@@ -84,7 +84,10 @@ public class JdbcBufferedConsumerFactory {
final String schemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
final SyncMode syncMode = stream.getSyncMode() != null ? stream.getSyncMode() : SyncMode.FULL_REFRESH;
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
return new WriteConfig(streamName, schemaName, tmpTableName, tableName, syncMode);
}).collect(Collectors.toList());
}
@@ -138,8 +141,9 @@ public class JdbcBufferedConsumerFactory {
sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case FULL_REFRESH -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case INCREMENTAL -> {}
case OVERWRITE -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case APPEND -> {}
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));

View File

@@ -53,6 +53,7 @@ import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
@@ -183,7 +184,10 @@ class JdbcDestinationTest {
@Test
void testWriteIncremental() throws Exception {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
catalog.getStreams().forEach(stream -> {
stream.withSyncMode(SyncMode.INCREMENTAL);
stream.withDestinationSyncMode(DestinationSyncMode.APPEND);
});
final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);

View File

@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-local-json

View File

@@ -40,8 +40,8 @@ import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
@@ -101,13 +101,16 @@ public class LocalJsonDestination implements Destination {
final String streamName = stream.getStream().getName();
final Path finalPath = destinationDir.resolve(namingResolver.getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(namingResolver.getTmpTableName(streamName) + ".jsonl");
final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
}
final Writer writer = new FileWriter(tmpPath.toFile(), isIncremental);
final Writer writer = new FileWriter(tmpPath.toFile(), isAppendMode);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(writer, tmpPath, finalPath));
}

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-meilisearch

View File

@@ -44,7 +44,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
@@ -119,8 +119,11 @@ public class MeiliSearchDestination extends BaseConnector implements Destination
final Map<String, Index> map = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String indexName = getIndexName(stream);
if (stream.getSyncMode() == SyncMode.FULL_REFRESH && indexExists(client, indexName)) {
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
if (syncMode == DestinationSyncMode.OVERWRITE && indexExists(client, indexName)) {
client.deleteIndex(indexName);
}

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-postgres

View File

@@ -52,6 +52,7 @@ import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
@@ -186,7 +187,10 @@ class PostgresDestinationTest {
@Test
void testWriteIncremental() throws Exception {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
catalog.getStreams().forEach(stream -> {
stream.withSyncMode(SyncMode.INCREMENTAL);
stream.withDestinationSyncMode(DestinationSyncMode.APPEND);
});
final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-redshift

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-snowflake

View File

@@ -25,7 +25,8 @@ SOFTWARE.
import unittest
from unittest.mock import Mock, patch
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode
from google_sheets_source.client import GoogleSheetsClient
from google_sheets_source.helpers import Helpers
from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet
@@ -103,8 +104,16 @@ class TestHelpers(unittest.TestCase):
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema)),
ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet2, json_schema=sheet2_schema)),
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet2, json_schema=sheet2_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
]
)

View File

@@ -274,10 +274,12 @@ public abstract class AbstractJdbcSource extends BaseConnector implements Source
cursorOptional.orElse(null),
cursorType),
airbyteMessageIterator);
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH || airbyteStream.getSyncMode() == null) {
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
iterator = getFullRefreshStream(database, streamName, selectedDatabaseFields, table, emittedAt);
} else if (airbyteStream.getSyncMode() == null) {
throw new IllegalArgumentException(String.format("%s requires a source sync mode", AbstractJdbcSource.class));
} else {
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", airbyteStream.getSyncMode(), AbstractJdbcSource.class));
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", AbstractJdbcSource.class, airbyteStream.getSyncMode()));
}
final AtomicLong recordCount = new AtomicLong();

View File

@@ -56,6 +56,7 @@ import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
@@ -437,6 +438,7 @@ public abstract class JdbcSourceStandardTest {
configuredCatalog.getStreams().forEach(airbyteStream -> {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList("id"));
airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
});
final JdbcState state = new JdbcState().withStreams(Lists.newArrayList(new JdbcStreamState().withStreamName(streamName)));
@@ -492,6 +494,7 @@ public abstract class JdbcSourceStandardTest {
configuredCatalog.getStreams().forEach(airbyteStream -> {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList("id"));
airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
});
final JdbcState state = new JdbcState().withStreams(Lists.newArrayList(new JdbcStreamState().withStreamName(streamName)));
@@ -576,6 +579,7 @@ public abstract class JdbcSourceStandardTest {
throws Exception {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList(cursorField));
airbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
final JdbcState state = new JdbcState()
.withStreams(Lists.newArrayList(new JdbcStreamState()

View File

@@ -42,6 +42,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
@@ -199,7 +200,8 @@ public abstract class JdbcStressTest {
return new ConfiguredAirbyteCatalog()
.withStreams(Collections.singletonList(new ConfiguredAirbyteStream().withStream(getCatalog().getStreams().get(0))
.withCursorField(Collections.singletonList("id"))
.withSyncMode(SyncMode.INCREMENTAL)));
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)));
}
private static AirbyteCatalog getCatalog() {

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-mssql

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-mysql

View File

@@ -35,6 +35,7 @@ import io.airbyte.integrations.standardtest.source.StandardSourceTest;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
@@ -113,6 +114,7 @@ public class MySqlStandardTest extends StandardSourceTest {
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get("database").asText(), STREAM_NAME),
Field.of("id", JsonSchemaPrimitive.NUMBER),
@@ -121,6 +123,7 @@ public class MySqlStandardTest extends StandardSourceTest {
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", config.get("database").asText(), STREAM_NAME2),
Field.of("id", JsonSchemaPrimitive.NUMBER),

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-postgres

View File

@@ -35,6 +35,7 @@ import io.airbyte.integrations.standardtest.source.StandardSourceTest;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
@@ -113,6 +114,7 @@ public class PostgresSourceStandardTest extends StandardSourceTest {
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
Field.of("id", JsonSchemaPrimitive.NUMBER),
@@ -121,6 +123,7 @@ public class PostgresSourceStandardTest extends StandardSourceTest {
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
Field.of("id", JsonSchemaPrimitive.NUMBER),

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-redshift

View File

@@ -41,14 +41,15 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
public class RedshiftStandardSourceTest extends StandardSourceTest {
// This test case expects an active redshift cluster that is useable from outside of vpc
private static final String SCHEMA_NAME = "integration_test";
private static final String STREAM_NAME = SCHEMA_NAME + ".customer";
private JsonNode config;
private JdbcDatabase database;
private String schemaName;
private String streamName;
private static JsonNode getStaticConfig() {
return Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json")));
@@ -56,7 +57,9 @@ public class RedshiftStandardSourceTest extends StandardSourceTest {
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
final String createSchemaQuery = String.format("CREATE SCHEMA %s", SCHEMA_NAME);
schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5)).toLowerCase();
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
streamName = schemaName + ".customer";
config = getStaticConfig();
database = Databases.createJdbcDatabase(
@@ -70,9 +73,9 @@ public class RedshiftStandardSourceTest extends StandardSourceTest {
database.execute(connection -> connection.createStatement().execute(createSchemaQuery));
String createTestTable =
String.format("CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n", STREAM_NAME);
String.format("CREATE TABLE IF NOT EXISTS %s (c_custkey INTEGER, c_name VARCHAR(16), c_nation VARCHAR(16));\n", streamName);
database.execute(connection -> connection.createStatement().execute(createTestTable));
String insertTestData = String.format("insert into %s values (1, 'Chris', 'France');\n", STREAM_NAME);
String insertTestData = String.format("insert into %s values (1, 'Chris', 'France');\n", streamName);
database.execute(connection -> {
connection.createStatement().execute(insertTestData);
System.out.println("more to be done.");
@@ -81,7 +84,7 @@ public class RedshiftStandardSourceTest extends StandardSourceTest {
@Override
protected void tearDown(TestDestinationEnv testEnv) throws SQLException {
final String dropSchemaQuery = String.format("DROP SCHEMA IF EXISTS %s CASCADE", SCHEMA_NAME);
final String dropSchemaQuery = String.format("DROP SCHEMA IF EXISTS %s CASCADE", schemaName);
database.execute(connection -> connection.createStatement().execute(dropSchemaQuery));
}
@@ -103,7 +106,7 @@ public class RedshiftStandardSourceTest extends StandardSourceTest {
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
STREAM_NAME,
streamName,
Field.of("c_custkey", Field.JsonSchemaPrimitive.NUMBER),
Field.of("c_name", Field.JsonSchemaPrimitive.STRING),
Field.of("c_nation", Field.JsonSchemaPrimitive.STRING));

View File

@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
import io.airbyte.migrate.migrations.MigrationV0_14_0;
import io.airbyte.migrate.migrations.MigrationV0_14_3;
import io.airbyte.migrate.migrations.MigrationV0_17_0;
import io.airbyte.migrate.migrations.MigrationV0_18_0;
import io.airbyte.migrate.migrations.NoOpMigration;
import java.util.List;
@@ -38,6 +39,7 @@ public class Migrations {
private static final Migration MIGRATION_V_0_15_0 = new NoOpMigration(MIGRATION_V_0_14_3, "0.15.0-alpha");
private static final Migration MIGRATION_V_0_16_0 = new NoOpMigration(MIGRATION_V_0_15_0, "0.16.0-alpha");
private static final Migration MIGRATION_V_0_17_0 = new MigrationV0_17_0(MIGRATION_V_0_16_0);
private static final Migration MIGRATION_V_0_18_0 = new MigrationV0_18_0(MIGRATION_V_0_17_0);
// all migrations must be added to the list in the order that they should be applied.
public static final List<Migration> MIGRATIONS = ImmutableList.of(
@@ -45,6 +47,7 @@ public class Migrations {
MIGRATION_V_0_14_3,
MIGRATION_V_0_15_0,
MIGRATION_V_0_16_0,
MIGRATION_V_0_17_0);
MIGRATION_V_0_17_0,
MIGRATION_V_0_18_0);
}

View File

@@ -0,0 +1,190 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.migrate.migrations;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.migrate.Migration;
import io.airbyte.migrate.ResourceId;
import io.airbyte.migrate.ResourceType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This migration makes sure that ConfiguredAirbyteCatalog always have values for the now required
* fields: syncMode (used by source to specify full_refresh/incremental) and destinationSyncMode
* (used by destination to specify append/overwrite/append_dedup)
*
* The primaryKey column is filled if available from the stream if defined by source
*/
public class MigrationV0_18_0 extends BaseMigration implements Migration {
private static final Logger LOGGER = LoggerFactory.getLogger(MigrationV0_18_0.class);
private static final ResourceId STANDARD_SYNC_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC");
private static final String MIGRATION_VERSION = "0.18.0-alpha";
private final Migration previousMigration;
public MigrationV0_18_0(Migration previousMigration) {
super(previousMigration);
this.previousMigration = previousMigration;
}
@Override
public String getVersion() {
return MIGRATION_VERSION;
}
@Override
public Map<ResourceId, JsonNode> getOutputSchema() {
return previousMigration.getOutputSchema();
}
@Override
public void migrate(Map<ResourceId, Stream<JsonNode>> inputData, Map<ResourceId, Consumer<JsonNode>> outputData) {
for (final Map.Entry<ResourceId, Stream<JsonNode>> entry : inputData.entrySet()) {
final Consumer<JsonNode> recordConsumer = outputData.get(entry.getKey());
entry.getValue().forEach(r -> {
if (entry.getKey().equals(STANDARD_SYNC_RESOURCE_ID)) {
((ObjectNode) r).set("catalog", migrateCatalog(r.get("catalog")));
}
recordConsumer.accept(r);
});
}
}
private JsonNode migrateCatalog(JsonNode catalog) {
final List<Map<String, JsonNode>> configuredStreams = MoreIterators.toList(catalog.get("streams").elements())
.stream()
.map(stream -> {
final JsonNode airbyteStream = stream.get("stream");
assert airbyteStream != null;
JsonNode syncMode = stream.get("sync_mode");
if (syncMode == null) {
syncMode = Jsons.jsonNode(SyncMode.FULL_REFRESH.toString());
LOGGER.info("Migrating {} to default source sync_mode: {}", airbyteStream.get("name"), syncMode);
}
JsonNode destinationSyncMode = stream.get("destination_sync_mode");
if (destinationSyncMode == null) {
if (SyncMode.fromValue(syncMode.asText()) == SyncMode.FULL_REFRESH) {
destinationSyncMode = Jsons.jsonNode(DestinationSyncMode.OVERWRITE.toString());
LOGGER.debug("Migrating {} to source sync_mode: {} destination_sync_mode: {}", airbyteStream.get("name"), syncMode,
destinationSyncMode);
} else if (SyncMode.fromValue(syncMode.asText()) == SyncMode.INCREMENTAL) {
destinationSyncMode = Jsons.jsonNode(DestinationSyncMode.APPEND.toString());
LOGGER.debug("Migrating {} to source sync_mode: {} destination_sync_mode: {}", airbyteStream.get("name"), syncMode,
destinationSyncMode);
} else {
syncMode = Jsons.jsonNode(SyncMode.FULL_REFRESH.toString());
destinationSyncMode = Jsons.jsonNode(DestinationSyncMode.OVERWRITE.toString());
LOGGER.info("Migrating {} to default source sync_mode: {} destination_sync_mode: {}", airbyteStream.get("name"), syncMode,
destinationSyncMode);
}
}
JsonNode primaryKey = stream.get("primary_key");
if (primaryKey == null) {
JsonNode sourceDefinedPrimaryKey = airbyteStream.get("source_defined_primary_key");
primaryKey = sourceDefinedPrimaryKey != null ? sourceDefinedPrimaryKey : Jsons.jsonNode(Collections.emptyList());
}
// configured catalog fields
return (Map<String, JsonNode>) ImmutableMap.<String, JsonNode>builder()
.put("stream", airbyteStream)
.put("sync_mode", syncMode)
.put("cursor_field", stream.get("cursor_field") != null ? stream.get("cursor_field") : Jsons.jsonNode(Collections.emptyList()))
.put("destination_sync_mode", destinationSyncMode)
.put("primary_key", primaryKey)
.build();
})
.collect(Collectors.toList());
return Jsons.jsonNode(ImmutableMap.of("streams", configuredStreams));
}
public enum SyncMode {
FULL_REFRESH("full_refresh"),
INCREMENTAL("incremental");
private String value;
SyncMode(String value) {
this.value = value;
}
public String toString() {
return String.valueOf(value);
}
public static SyncMode fromValue(String value) {
for (SyncMode b : SyncMode.values()) {
if (b.value.equals(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
public enum DestinationSyncMode {
APPEND("append"),
OVERWRITE("overwrite"),
APPEND_DEDUP("append_dedup");
private final String value;
private DestinationSyncMode(String value) {
this.value = value;
}
public String toString() {
return String.valueOf(value);
}
public static DestinationSyncMode fromValue(String value) {
for (DestinationSyncMode b : DestinationSyncMode.values()) {
if (b.value.equals(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
}

View File

@@ -1,84 +0,0 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml
title: AirbyteProtocol
type: object
description: AirbyteProtocol structs
properties:
airbyte_message:
"$ref": "#/definitions/AirbyteMessage"
configured_airbyte_catalog:
"$ref": "#/definitions/ConfiguredAirbyteCatalog"
definitions:
AirbyteCatalog:
description: Airbyte stream schema catalog
type: object
additionalProperties: false
required:
- streams
properties:
streams:
type: array
items:
"$ref": "#/definitions/AirbyteStream"
AirbyteStream:
type: object
additionalProperties: false
required:
- name
- json_schema
# todo (cgardens) - make required once sources are migrated
# - supported_sync_modes
properties:
name:
type: string
description: Stream's name.
json_schema:
description: Stream schema using Json Schema specs.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
supported_sync_modes:
type: array
items:
"$ref": "#/definitions/SyncMode"
source_defined_cursor:
description: If the source defines the cursor field, then it does any other cursor field inputs will be ignored. If it does not either the user_provided one is used or as a backup the default one is used.
type: boolean
default_cursor_field:
description: Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.
type: array
items:
type: string
ConfiguredAirbyteCatalog:
description: Airbyte stream schema catalog
type: object
additionalProperties: false
required:
- streams
properties:
streams:
type: array
items:
"$ref": "#/definitions/ConfiguredAirbyteStream"
ConfiguredAirbyteStream:
type: object
additionalProperties: false
required:
- stream
properties:
stream:
"$ref": "#/definitions/AirbyteStream"
sync_mode:
"$ref": "#/definitions/SyncMode"
default: full_refresh
cursor_field:
description: Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.
type: array
items:
type: string
SyncMode:
type: string
enum:
- full_refresh
- incremental

View File

@@ -44,7 +44,7 @@ import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
class MigrateV0_14_13Test {
class MigrateV0_14_3Test {
private static final ResourceId SYNC_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC");

View File

@@ -0,0 +1,96 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.migrate.migrations;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.ListConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.migrate.Migration;
import io.airbyte.migrate.MigrationTestUtils;
import io.airbyte.migrate.MigrationUtils;
import io.airbyte.migrate.Migrations;
import io.airbyte.migrate.ResourceId;
import io.airbyte.migrate.ResourceType;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
class MigrateV0_18_0Test {
private static final ResourceId SYNC_RESOURCE_ID = ResourceId.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC");
@Test
void testMigration() throws IOException {
final Migration migration = Migrations.MIGRATIONS
.stream()
.filter(m -> m instanceof MigrationV0_18_0)
.findAny()
.orElse(null);
assertNotNull(migration);
// construct a sync object. in this migration we modify the catalog, so we will use this as
// a base.
final JsonNode syncWithoutCatalog = Jsons.jsonNode(ImmutableMap.<String, String>builder()
.put("sourceId", UUID.randomUUID().toString())
.put("destinationId", UUID.randomUUID().toString())
.put("connectionId", UUID.randomUUID().toString())
.put("name", "users_sync")
.put("status", "active")
.build());
// input Catalog
final JsonNode inputCatalog = Jsons.deserialize(MoreResources.readResource("migrations/migrationV0_18_0/example_input_catalog.json"));
final JsonNode syncInputCatalog = Jsons.clone(syncWithoutCatalog);
((ObjectNode) syncInputCatalog).set("catalog", inputCatalog);
// Output Catalog
final JsonNode outputCatalog = Jsons.deserialize(MoreResources.readResource("migrations/migrationV0_18_0/example_output_catalog.json"));
final JsonNode syncOutputCatalog = Jsons.clone(syncWithoutCatalog);
((ObjectNode) syncOutputCatalog).set("catalog", outputCatalog);
final Map<ResourceId, Stream<JsonNode>> records = ImmutableMap.of(SYNC_RESOURCE_ID, Stream.of(syncInputCatalog));
final Map<ResourceId, ListConsumer<JsonNode>> outputConsumer = MigrationTestUtils.createOutputConsumer(migration.getOutputSchema().keySet());
migration.migrate(records, MigrationUtils.mapRecordConsumerToConsumer(outputConsumer));
final Map<ResourceId, List<JsonNode>> expectedOutputOverrides = ImmutableMap.of(SYNC_RESOURCE_ID, ImmutableList.of(syncOutputCatalog));
final Map<ResourceId, List<JsonNode>> expectedOutput =
MigrationTestUtils.createExpectedOutput(migration.getOutputSchema().keySet(), expectedOutputOverrides);
final Map<ResourceId, List<JsonNode>> outputAsList = MigrationTestUtils.collectConsumersToList(outputConsumer);
assertEquals(expectedOutput, outputAsList);
}
}

View File

@@ -0,0 +1,96 @@
{
"streams": [
{
"stream": {
"name": "users",
"json_schema": {
"type": "object",
"properties": {
"last_name": {
"type": "string"
},
"first_name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": ["last_name"],
"source_defined_primary_key": [["first_name"], ["last_name"]]
},
"sync_mode": "incremental",
"cursor_field": ["first_name"]
},
{
"stream": {
"name": "products",
"json_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"sku": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["name"],
"destination_sync_mode": "append_dedup",
"primary_key": [["sku"]]
},
{
"stream": {
"name": "product_lines",
"json_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": []
},
"sync_mode": "full_refresh"
},
{
"stream": {
"name": "product_details",
"json_schema": {
"type": "object",
"properties": {
"number": {
"type": "number"
},
"boolean": {
"type": "boolean"
},
"string": {
"type": "string"
},
"array": {
"type": "array"
},
"object": {
"type": "object"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": []
},
"sync_mode": "full_refresh",
"cursor_field": []
}
]
}

View File

@@ -0,0 +1,103 @@
{
"streams": [
{
"stream": {
"name": "users",
"json_schema": {
"type": "object",
"properties": {
"last_name": {
"type": "string"
},
"first_name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": ["last_name"],
"source_defined_primary_key": [["first_name"], ["last_name"]]
},
"sync_mode": "incremental",
"cursor_field": ["first_name"],
"destination_sync_mode": "append",
"primary_key": [["first_name"], ["last_name"]]
},
{
"stream": {
"name": "products",
"json_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"sku": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": ["name"],
"destination_sync_mode": "append_dedup",
"primary_key": [["sku"]]
},
{
"stream": {
"name": "product_lines",
"json_schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": []
},
"sync_mode": "full_refresh",
"cursor_field": [],
"destination_sync_mode": "overwrite",
"primary_key": []
},
{
"stream": {
"name": "product_details",
"json_schema": {
"type": "object",
"properties": {
"number": {
"type": "number"
},
"boolean": {
"type": "boolean"
},
"string": {
"type": "string"
},
"array": {
"type": "array"
},
"object": {
"type": "object"
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": false,
"default_cursor_field": []
},
"sync_mode": "full_refresh",
"cursor_field": [],
"destination_sync_mode": "overwrite",
"primary_key": []
}
]
}

View File

@@ -64,21 +64,25 @@ public class CatalogHelpers {
public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, List<Field> fields) {
return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)))
.withSyncMode(SyncMode.FULL_REFRESH);
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}
public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream(
String streamName,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
String cursorFieldName,
List<String> primaryKeys,
Field... fields) {
return createIncrementalConfiguredAirbyteStream(streamName, syncMode, cursorFieldName, Arrays.asList(fields));
return createIncrementalConfiguredAirbyteStream(streamName, syncMode, destinationSyncMode, cursorFieldName, primaryKeys, Arrays.asList(fields));
}
public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream(
String streamName,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
String cursorFieldName,
List<String> primaryKeys,
List<Field> fields) {
return new ConfiguredAirbyteStream()
.withStream(new AirbyteStream()
@@ -86,7 +90,9 @@ public class CatalogHelpers {
.withSupportedSyncModes(Collections.singletonList(syncMode))
.withJsonSchema(fieldsToJsonSchema(fields)))
.withSyncMode(syncMode)
.withCursorField(Collections.singletonList(cursorFieldName));
.withCursorField(Collections.singletonList(cursorFieldName))
.withDestinationSyncMode(destinationSyncMode)
.withPrimaryKey(primaryKeys.stream().map(Collections::singletonList).collect(Collectors.toList()));
}
/**
@@ -109,7 +115,7 @@ public class CatalogHelpers {
.withStream(stream)
.withSyncMode(SyncMode.FULL_REFRESH)
.withCursorField(new ArrayList<>())
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withPrimaryKey(new ArrayList<>());
}

View File

@@ -167,6 +167,8 @@ definitions:
additionalProperties: true
required:
- stream
- sync_mode
- destination_sync_mode
properties:
stream:
"$ref": "#/definitions/AirbyteStream"
@@ -199,7 +201,7 @@ definitions:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
ConnectorSpecification:
description: Specification of a connector (source/destination)
type: object

View File

@@ -35,6 +35,7 @@ import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.Optional;
@@ -124,7 +125,7 @@ public class DefaultJobCreator implements JobCreator {
}
// Strategy:
// 1. Set all streams to full refresh.
// 1. Set all streams to full refresh - overwrite.
// 2. Create a job where the source emits no records.
// 3. Run a sync from the empty source to the destination. This will overwrite all data for each
// stream in the destination.
@@ -134,7 +135,10 @@ public class DefaultJobCreator implements JobCreator {
public Optional<Long> createResetConnectionJob(DestinationConnection destination, StandardSync standardSync, String destinationDockerImage)
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH));
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withPrefix(standardSync.getPrefix())

View File

@@ -45,6 +45,7 @@ import io.airbyte.config.StandardSync;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import java.io.IOException;
@@ -238,7 +239,10 @@ public class DefaultJobCreatorTest {
void testCreateResetConnectionJob() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH));
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withPrefix(STANDARD_SYNC.getPrefix())
@@ -264,7 +268,10 @@ public class DefaultJobCreatorTest {
void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
final ConfiguredAirbyteCatalog expectedCatalog = STANDARD_SYNC.getCatalog();
expectedCatalog.getStreams()
.forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH));
.forEach(configuredAirbyteStream -> {
configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});
final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withPrefix(STANDARD_SYNC.getPrefix())

View File

@@ -69,8 +69,8 @@ public class CatalogConverter {
io.airbyte.api.model.AirbyteStreamConfiguration result = new io.airbyte.api.model.AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.primaryKey(stream.getSourceDefinedPrimaryKey())
.destinationSyncMode(io.airbyte.api.model.DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
if (stream.getSupportedSyncModes().size() > 0)
result.setSyncMode(stream.getSupportedSyncModes().get(0));
@@ -86,8 +86,8 @@ public class CatalogConverter {
final io.airbyte.api.model.AirbyteStreamConfiguration configuration = new io.airbyte.api.model.AirbyteStreamConfiguration()
.syncMode(Enums.convertTo(configuredStream.getSyncMode(), io.airbyte.api.model.SyncMode.class))
.cursorField(configuredStream.getCursorField())
.primaryKey(configuredStream.getPrimaryKey())
.destinationSyncMode(Enums.convertTo(configuredStream.getDestinationSyncMode(), io.airbyte.api.model.DestinationSyncMode.class))
.primaryKey(configuredStream.getPrimaryKey())
.aliasName(Names.toAlphanumericAndUnderscore(configuredStream.getStream().getName()))
.selected(true);
return new io.airbyte.api.model.AirbyteStreamAndConfiguration()
@@ -106,9 +106,9 @@ public class CatalogConverter {
.withStream(toProtocol(s.getStream()))
.withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.protocol.models.SyncMode.class))
.withCursorField(s.getConfig().getCursorField())
.withPrimaryKey(s.getConfig().getPrimaryKey())
.withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(),
io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode.class)))
io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode.class))
.withPrimaryKey(s.getConfig().getPrimaryKey()))
.collect(Collectors.toList());
return new io.airbyte.protocol.models.ConfiguredAirbyteCatalog()
.withStreams(streams);

View File

@@ -192,10 +192,18 @@ public class WebBackendConnectionsHandler {
else
outputStreamConfig.setSyncMode(discoveredStreamConfig.getSyncMode());
if (originalStreamConfig.getCursorField().size() > 0)
if (originalStreamConfig.getCursorField().size() > 0) {
outputStreamConfig.setCursorField(originalStreamConfig.getCursorField());
else
} else {
outputStreamConfig.setCursorField(discoveredStreamConfig.getCursorField());
}
outputStreamConfig.setDestinationSyncMode(originalStreamConfig.getDestinationSyncMode());
if (originalStreamConfig.getPrimaryKey().size() > 0) {
outputStreamConfig.setPrimaryKey(originalStreamConfig.getPrimaryKey());
} else {
outputStreamConfig.setPrimaryKey(discoveredStreamConfig.getPrimaryKey());
}
outputStreamConfig.setAliasName(originalStreamConfig.getAliasName());
outputStreamConfig.setSelected(originalStreamConfig.getSelected());

View File

@@ -45,6 +45,7 @@ import io.airbyte.api.model.ConnectionStatus;
import io.airbyte.api.model.ConnectionUpdate;
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.DestinationSyncMode;
import io.airbyte.api.model.JobConfigType;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobListRequestBody;
@@ -353,6 +354,8 @@ class WebBackendConnectionsHandlerTest {
discovered.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
final AirbyteCatalog expected = ConnectionHelpers.generateBasicApiCatalog();
@@ -363,6 +366,8 @@ class WebBackendConnectionsHandlerTest {
expected.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
@@ -384,6 +389,8 @@ class WebBackendConnectionsHandlerTest {
original.getStreams().get(0).getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of("field1"))
.destinationSyncMode(DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName("random_stream");
final AirbyteCatalog discovered = ConnectionHelpers.generateBasicApiCatalog();
@@ -395,6 +402,8 @@ class WebBackendConnectionsHandlerTest {
discovered.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
final AirbyteCatalog expected = ConnectionHelpers.generateBasicApiCatalog();
@@ -406,6 +415,8 @@ class WebBackendConnectionsHandlerTest {
expected.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
@@ -427,6 +438,8 @@ class WebBackendConnectionsHandlerTest {
original.getStreams().get(0).getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of("field1"))
.destinationSyncMode(DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName("renamed_stream");
final AirbyteCatalog discovered = ConnectionHelpers.generateBasicApiCatalog();
@@ -438,6 +451,8 @@ class WebBackendConnectionsHandlerTest {
discovered.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
final AirbyteStreamAndConfiguration newStream = ConnectionHelpers.generateBasicApiCatalog().getStreams().get(0);
newStream.getStream()
@@ -448,6 +463,8 @@ class WebBackendConnectionsHandlerTest {
newStream.getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream2");
discovered.getStreams().add(newStream);
@@ -460,6 +477,8 @@ class WebBackendConnectionsHandlerTest {
expected.getStreams().get(0).getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of("field1"))
.destinationSyncMode(DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName("renamed_stream");
final AirbyteStreamAndConfiguration expectedNewStream = ConnectionHelpers.generateBasicApiCatalog().getStreams().get(0);
expectedNewStream.getStream()
@@ -470,6 +489,8 @@ class WebBackendConnectionsHandlerTest {
expectedNewStream.getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream2");
expected.getStreams().add(expectedNewStream);

View File

@@ -41,6 +41,7 @@ import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import java.util.Collections;
@@ -125,7 +126,8 @@ public class ConnectionHelpers {
final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream()
.withStream(generateBasicAirbyteStream())
.withCursorField(Lists.newArrayList(FIELD_NAME))
.withSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL);
.withSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND);
return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(stream));
}
@@ -146,6 +148,8 @@ public class ConnectionHelpers {
return new AirbyteStreamConfiguration()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(Lists.newArrayList(FIELD_NAME))
.destinationSyncMode(io.airbyte.api.model.DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName(Names.toAlphanumericAndUnderscore(STREAM_NAME))
.selected(true);
}

View File

@@ -311,7 +311,8 @@ public class AcceptanceTests {
final String name = "test-connection-" + UUID.randomUUID().toString();
final ConnectionSchedule schedule = new ConnectionSchedule().timeUnit(MINUTES).units(100L);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode));
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final ConnectionRead createdConnection = createConnection(name, sourceId, destinationId, catalog, schedule, syncMode);
assertEquals(sourceId, createdConnection.getSourceId());
@@ -329,13 +330,13 @@ public class AcceptanceTests {
final UUID destinationId = createDestination().getDestinationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode));
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId = createConnection(connectionName, sourceId, destinationId, catalog, null, syncMode).getConnectionId();
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndTargetDbInSync(sourcePsql);
assertSourceAndTargetDbInSync(sourcePsql, false);
}
@Test
@@ -352,17 +353,21 @@ public class AcceptanceTests {
// instead of assertFalse to avoid NPE from unboxed.
assertNull(stream.getSourceDefinedCursor());
assertTrue(stream.getDefaultCursorField().isEmpty());
assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty());
final SyncMode syncMode = SyncMode.INCREMENTAL;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).cursorField(List.of(COLUMN_ID)));
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode));
final UUID connectionId = createConnection(connectionName, sourceId, destinationId, catalog, null, syncMode).getConnectionId();
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
assertSourceAndTargetDbInSync(sourcePsql);
assertSourceAndTargetDbInSync(sourcePsql, false);
// add new records and run again.
final Database source = getDatabase(sourcePsql);
@@ -380,18 +385,18 @@ public class AcceptanceTests {
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
assertDestinationContains(expectedRecords, STREAM_NAME);
assertRawDestinationContains(expectedRecords, STREAM_NAME);
// reset back to no data.
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
assertDestinationContains(Collections.emptyList(), STREAM_NAME);
assertRawDestinationContains(Collections.emptyList(), STREAM_NAME);
// sync one more time. verify it is the equivalent of a full refresh.
final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
assertSourceAndTargetDbInSync(sourcePsql);
assertSourceAndTargetDbInSync(sourcePsql, false);
}
@Test
@@ -407,18 +412,62 @@ public class AcceptanceTests {
final ConnectionSchedule connectionSchedule = new ConnectionSchedule().units(1L).timeUnit(MINUTES);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode));
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
createConnection(connectionName, sourceId, destinationId, catalog, connectionSchedule, syncMode);
// When a new connection is created, Airbyte might sync it immediately (before the sync interval).
// Then it will wait the sync interval.
Thread.sleep(Duration.of(30, SECONDS).toMillis());
assertSourceAndTargetDbInSync(sourcePsql);
assertSourceAndTargetDbInSync(sourcePsql, false);
}
@Test
@Order(10)
public void testIncrementalDedupSync() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.primaryKey(List.of(List.of(COLUMN_NAME))));
final UUID connectionId = createConnection(connectionName, sourceId, destinationId, catalog, null, syncMode).getConnectionId();
// sync from start
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
assertSourceAndTargetDbInSync(sourcePsql, true);
// add new records and run again.
final Database source = getDatabase(sourcePsql);
final List<JsonNode> expectedRawRecords = retrieveSourceRecords(source, STREAM_NAME);
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "sherif").build()));
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).put(COLUMN_NAME, "chris").build()));
source.query(ctx -> ctx.execute("UPDATE id_and_name SET id=6 WHERE name='sherif'"));
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')"));
// retrieve latest snapshot of source records after modifications; the deduplicated table in
// destination should mirror this latest state of records
final List<JsonNode> expectedNormalizedRecords = retrieveSourceRecords(source, STREAM_NAME);
source.close();
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
assertRawDestinationContains(expectedRawRecords, STREAM_NAME);
assertNormalizedDestinationContains(expectedNormalizedRecords);
}
@Test
@Order(11)
public void testRedactionOfSensitiveRequestBodies() throws Exception {
// check that the source password is not present in the logs
final List<String> serverLogLines = Files.readLines(
@@ -443,12 +492,23 @@ public class AcceptanceTests {
return apiClient.getSourceApi().discoverSchemaForSource(new SourceIdRequestBody().sourceId(sourceId)).getCatalog();
}
private void assertSourceAndTargetDbInSync(PostgreSQLContainer sourceDb) throws Exception {
private void assertSourceAndTargetDbInSync(PostgreSQLContainer sourceDb, boolean withScdTable) throws Exception {
final Database source = getDatabase(sourceDb);
final Set<String> sourceStreams = listStreams(source);
final Set<String> sourceStreamsWithRawPrefix =
sourceStreams.stream().map(x -> String.format("_airbyte_raw_%s%s", OUTPUT_NAMESPACE, x.replace(".", "_"))).collect(Collectors.toSet());
final Set<String> sourceStreamsWithRawPrefix = sourceStreams.stream().flatMap(x -> {
final String cleanedNameStream = x.replace(".", "_");
if (withScdTable) {
return List.of(
String.format("_airbyte_raw_%s%s", OUTPUT_NAMESPACE, cleanedNameStream),
String.format("%s%s_scd", OUTPUT_NAMESPACE, cleanedNameStream),
String.format("%s%s", OUTPUT_NAMESPACE, cleanedNameStream)).stream();
} else {
return List.of(
String.format("_airbyte_raw_%s%s", OUTPUT_NAMESPACE, cleanedNameStream),
String.format("%s%s", OUTPUT_NAMESPACE, cleanedNameStream)).stream();
}
}).collect(Collectors.toSet());
final Database destination = getDatabase(destinationPsql);
final Set<String> destinationStreams = listDestinationStreams(destination);
assertEquals(sourceStreamsWithRawPrefix, destinationStreams,
@@ -489,7 +549,7 @@ public class AcceptanceTests {
.collect(Collectors.toSet());
}
private void assertDestinationContains(List<JsonNode> sourceRecords, String streamName) throws Exception {
private void assertRawDestinationContains(List<JsonNode> sourceRecords, String streamName) throws Exception {
final Set<JsonNode> destinationRecords = new HashSet<>(retrieveDestinationRecords(streamName));
assertEquals(sourceRecords.size(), destinationRecords.size(),
@@ -501,9 +561,26 @@ public class AcceptanceTests {
}
}
private void assertNormalizedDestinationContains(final List<JsonNode> sourceRecords) throws Exception {
final Database destination = getDatabase(destinationPsql);
final String finalDestinationTable = String.format("%s%s", OUTPUT_NAMESPACE, STREAM_NAME.replace(".", "_"));
final List<JsonNode> destinationRecords = retrieveSourceRecords(destination, finalDestinationTable);
assertEquals(sourceRecords.size(), destinationRecords.size(),
String.format("destination contains: %s record. source contains: %s", sourceRecords.size(), destinationRecords.size()));
for (JsonNode sourceStreamRecord : sourceRecords) {
assertTrue(
destinationRecords.stream()
.anyMatch(r -> r.get(COLUMN_NAME).asText().equals(sourceStreamRecord.get(COLUMN_NAME).asText())
&& r.get(COLUMN_ID).asInt() == sourceStreamRecord.get(COLUMN_ID).asInt()),
String.format("destination does not contain record:\n %s \n destination contains:\n %s\n", sourceStreamRecord, destinationRecords));
}
}
private void assertStreamsEquivalent(Database source, String table) throws Exception {
final List<JsonNode> sourceRecords = retrieveSourceRecords(source, table);
assertDestinationContains(sourceRecords, table);
assertRawDestinationContains(sourceRecords, table);
}
private ConnectionRead createConnection(String name,
@@ -589,18 +666,18 @@ public class AcceptanceTests {
}
private JsonNode getDestinationDbConfig() {
return getDbConfig(destinationPsql, false, true);
return getDbConfig(destinationPsql, false, true, true);
}
private JsonNode getDestinationDbConfigWithHiddenPassword() {
return getDbConfig(destinationPsql, true, true);
return getDbConfig(destinationPsql, true, true, true);
}
private JsonNode getDbConfig(PostgreSQLContainer psql) {
return getDbConfig(psql, false, false);
return getDbConfig(psql, false, false, false);
}
private JsonNode getDbConfig(PostgreSQLContainer psql, boolean hiddenPassword, boolean withSchema) {
private JsonNode getDbConfig(PostgreSQLContainer psql, boolean hiddenPassword, boolean withSchema, boolean withNormalization) {
try {
final Map<Object, Object> dbConfig = new HashMap<>();
@@ -621,6 +698,10 @@ public class AcceptanceTests {
dbConfig.put("schema", "public");
}
if (withNormalization) {
dbConfig.put("basic_normalization", true);
}
return Jsons.jsonNode(dbConfig);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
@@ -685,7 +766,7 @@ public class AcceptanceTests {
private static void waitForSuccessfulJob(JobsApi jobsApi, JobRead originalJob) throws InterruptedException, ApiException {
JobRead job = originalJob;
int count = 0;
while (count < 15 && (job.getStatus() == JobStatus.PENDING || job.getStatus() == JobStatus.RUNNING)) {
while (count < 60 && (job.getStatus() == JobStatus.PENDING || job.getStatus() == JobStatus.RUNNING)) {
Thread.sleep(1000);
count++;

View File

@@ -32,7 +32,6 @@ import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.protocols.Destination;
import io.airbyte.workers.protocols.Mapper;
@@ -85,8 +84,7 @@ public class DefaultSyncWorker implements SyncWorker {
LOGGER.info("configured sync modes: {}", syncInput.getCatalog().getStreams()
.stream()
.collect(Collectors.toMap(s -> s.getStream().getName(), s -> s.getSyncMode() != null ? s.getSyncMode() : SyncMode.FULL_REFRESH)));
.collect(Collectors.toMap(s -> s.getStream().getName(), s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput);
final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput);
targetConfig.setCatalog(mapper.mapCatalog(targetConfig.getCatalog()));

View File

@@ -41,13 +41,10 @@ import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
@@ -58,7 +55,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
@@ -71,13 +67,9 @@ class DefaultAirbyteSourceTest {
private static final String STREAM_NAME = "user_preferences";
private static final String FIELD_NAME = "favorite_color";
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Collections.singletonList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(new AirbyteStream()
.withName("hudi:latest")
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING))))));
private static final ConfiguredAirbyteCatalog CATALOG = CatalogHelpers.createConfiguredAirbyteCatalog(
"hudi:latest",
Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING));
private static final StandardTapConfig TAP_CONFIG = new StandardTapConfig()
.withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future."))))

View File

@@ -4344,9 +4344,9 @@ font-style: italic;
<h3><a name="AirbyteStreamConfiguration"><code>AirbyteStreamConfiguration</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'>the mutable part of the stream to configure the destination</div>
<div class="field-items">
<div class="param">syncMode (optional)</div><div class="param-desc"><span class="param-type"><a href="#SyncMode">SyncMode</a></span> </div>
<div class="param">syncMode </div><div class="param-desc"><span class="param-type"><a href="#SyncMode">SyncMode</a></span> </div>
<div class="param">cursorField (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">array[String]</a></span> Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if <code>sync_mode</code> is <code>incremental</code>. Otherwise it is ignored. </div>
<div class="param">destinationSyncMode (optional)</div><div class="param-desc"><span class="param-type"><a href="#DestinationSyncMode">DestinationSyncMode</a></span> </div>
<div class="param">destinationSyncMode </div><div class="param-desc"><span class="param-type"><a href="#DestinationSyncMode">DestinationSyncMode</a></span> </div>
<div class="param">primaryKey (optional)</div><div class="param-desc"><span class="param-type"><a href="#array">array[array[String]]</a></span> Paths to the fields that will be used as primary key. This field is REQUIRED if <code>destination_sync_mode</code> is <code>*_dedup</code>. Otherwise it is ignored. </div>
<div class="param">aliasName (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Alias name to the stream to be used in the destination </div>
<div class="param">selected (optional)</div><div class="param-desc"><span class="param-type"><a href="#boolean">Boolean</a></span> </div>