1
0
mirror of synced 2025-12-19 18:14:56 -05:00

java CDK: spin off s3 destinations (#32050)

Co-authored-by: postamar <postamar@users.noreply.github.com>
This commit is contained in:
Marius Posta
2023-11-03 13:31:58 -07:00
committed by GitHub
parent 85b11c5c88
commit 04ae91ff96
130 changed files with 209 additions and 77 deletions

View File

@@ -156,6 +156,7 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.0 | 2023-11-02 | [\#32050](https://github.com/airbytehq/airbyte/pull/32050) | Add 's3-destinations' CDK module. |
| 0.3.0 | 2023-11-02 | [\#31983](https://github.com/airbytehq/airbyte/pull/31983) | Add deinterpolation feature to AirbyteExceptionHandler. |
| 0.2.4 | 2023-10-31 | [\#31807](https://github.com/airbytehq/airbyte/pull/31807) | Handle case of debezium update and delete of records in mongodb. |
| 0.2.3 | 2023-10-31 | [\#32022](https://github.com/airbytehq/airbyte/pull/32022) | Update Debezium version from 2.20 -> 2.4.0. |

View File

@@ -30,8 +30,6 @@ dependencies {
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
// SSH dependencies
implementation 'net.i2p.crypto:eddsa:0.3.0'
@@ -39,8 +37,6 @@ dependencies {
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:db-sources'))
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
testImplementation libs.bundles.junit
@@ -63,7 +59,6 @@ dependencies {
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation libs.postgresql
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
@@ -93,22 +88,11 @@ dependencies {
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation ('org.apache.hadoop:hadoop-aws:3.3.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
testImplementation libs.junit.jupiter.system.stubs
implementation libs.jackson.annotations
implementation group: 'org.apache.logging.log4j', name: 'log4j-layout-template-json', version: '2.17.2'
implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'
// parquet
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
}
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4'
testImplementation 'org.mockito:mockito-core:4.6.1'

View File

@@ -4,7 +4,6 @@
package io.airbyte.cdk.integrations.destination.dest_state_lifecycle_manager;
import com.amazonaws.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.protocol.models.v0.AirbyteMessage;
@@ -50,7 +49,8 @@ public class DestStreamStateLifecycleManager implements DestStateLifecycleManage
Preconditions.checkArgument(message.getState().getType() == AirbyteStateType.STREAM);
final StreamDescriptor originalStreamId = message.getState().getStream().getStreamDescriptor();
final StreamDescriptor actualStreamId;
if (StringUtils.isNullOrEmpty(originalStreamId.getNamespace())) {
final String namespace = originalStreamId.getNamespace();
if (namespace == null || namespace.isEmpty()) {
// If the state's namespace is null/empty, we need to be able to find it using the default namespace
// (because many destinations actually set records' namespace to the default namespace before
// they make it into this class).

View File

@@ -8,6 +8,7 @@ java {
dependencies {
// Depends on core CDK classes (OK 👍)
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:acceptance-test-harness')
@@ -29,8 +30,6 @@ dependencies {
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
implementation libs.bundles.junit
@@ -82,7 +81,6 @@ dependencies {
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation ('org.apache.hadoop:hadoop-aws:3.3.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}

View File

@@ -36,7 +36,7 @@ public class GeneralStagingFunctions {
final String dstTableName = writeConfig.getOutputTableName();
final String stageName = stagingOperations.getStageName(schema, dstTableName);
final String stagingPath =
stagingOperations.getStagingPath(StagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getWriteDatetime());
stagingOperations.getStagingPath(SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schema, stream, writeConfig.getWriteDatetime());
log.info("Preparing staging area in destination started for schema {} stream {}: target table: {}, stage: {}",
schema, stream, dstTableName, stagingPath);

View File

@@ -83,7 +83,8 @@ public class SerialFlush {
final String schemaName = writeConfig.getOutputSchemaName();
final String stageName = stagingOperations.getStageName(schemaName, writeConfig.getOutputTableName());
final String stagingPath =
stagingOperations.getStagingPath(StagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
stagingOperations.getStagingPath(
SerialStagingConsumerFactory.RANDOM_CONNECTION_ID, schemaName, writeConfig.getStreamName(),
writeConfig.getWriteDatetime());
try (writer) {
writer.flush();

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination.staging;
import static java.util.stream.Collectors.toList;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig;
import io.airbyte.cdk.integrations.destination.record_buffer.BufferCreateFunction;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Uses both Factory and Consumer design pattern to create a single point of creation for consuming
* {@link AirbyteMessage} for processing
*/
public class SerialStagingConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(SerialStagingConsumerFactory.class);
// using a random string here as a placeholder for the moment.
// This would avoid mixing data in the staging area between different syncs (especially if they
// manipulate streams with similar names)
// if we replaced the random connection id by the actual connection_id, we'd gain the opportunity to
// leverage data that was uploaded to stage
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC);
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();
public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final BufferCreateFunction onCreateBuffer,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog, useDestinationsV2Columns);
return new BufferedStreamConsumer(
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
new SerializedBufferingStrategy(
onCreateBuffer,
catalog,
SerialFlush.function(database, stagingOperations, writeConfigs, catalog, typerDeduperValve, typerDeduper)),
GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData, typerDeduper),
catalog,
stagingOperations::isValidData,
defaultNamespace);
}
/**
* Creates a list of all {@link WriteConfig} for each stream within a
* {@link ConfiguredAirbyteCatalog}. Each write config represents the configuration settings for
* writing to a destination connector
*
* @param namingResolver {@link NamingConventionTransformer} used to transform names that are
* acceptable by each destination connector
* @param config destination connector configuration parameters
* @param catalog {@link ConfiguredAirbyteCatalog} collection of configured
* {@link ConfiguredAirbyteStream}
* @return list of all write configs for each stream in a {@link ConfiguredAirbyteCatalog}
*/
private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
final boolean useDestinationsV2Columns) {
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns)).collect(toList());
}
private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(final NamingConventionTransformer namingResolver,
final JsonNode config,
final ParsedCatalog parsedCatalog,
final boolean useDestinationsV2Columns) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();
final String streamName = abStream.getName();
final String outputSchema;
final String tableName;
if (useDestinationsV2Columns) {
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).id();
outputSchema = streamId.rawNamespace();
tableName = streamId.rawName();
} else {
outputSchema = getOutputSchema(abStream, config.get("schema").asText(), namingResolver);
tableName = namingResolver.getRawTableName(streamName);
}
final String tmpTableName = namingResolver.getTmpTableName(streamName);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final WriteConfig writeConfig =
new WriteConfig(streamName, abStream.getNamespace(), outputSchema, tmpTableName, tableName, syncMode, SYNC_DATETIME);
LOGGER.info("Write config: {}", writeConfig);
return writeConfig;
};
}
private static String getOutputSchema(final AirbyteStream stream,
final String defaultDestSchema,
final NamingConventionTransformer namingResolver) {
return stream.getNamespace() != null
? namingResolver.getNamespace(stream.getNamespace())
: namingResolver.getNamespace(defaultDestSchema);
}
}

View File

@@ -11,7 +11,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException;
import java.util.List;
import org.junit.jupiter.api.Test;
class StagingConsumerFactoryTest {
class SerialStagingConsumerFactoryTest {
@Test()
void detectConflictingStreams() {

View File

@@ -71,12 +71,9 @@ dependencies {
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:init-oss')
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
implementation libs.bundles.junit
// implementation libs.junit.jupiter.api
implementation libs.junit.jupiter.params
@@ -93,13 +90,7 @@ dependencies {
// Optional dependencies
// TODO: Change these to 'compileOnly' or 'testCompileOnly'
implementation 'com.azure:azure-storage-blob:12.12.0'
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation libs.postgresql
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
implementation libs.bundles.debezium.bundle
@@ -124,10 +115,6 @@ dependencies {
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation ('org.apache.hadoop:hadoop-aws:3.3.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
testImplementation libs.junit.jupiter.system.stubs
// From `base-debezium`:

View File

@@ -15,7 +15,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.amazonaws.util.Base64;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.DataTypeUtils;
@@ -23,6 +22,7 @@ import io.airbyte.commons.json.Jsons;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.bson.BsonBinary;
import org.bson.BsonBoolean;
import org.bson.BsonDateTime;
@@ -113,7 +113,7 @@ class MongoDbCdcEventUtilsTest {
assertEquals(4.0, transformed.get("field5").asDouble());
assertEquals(expectedTimestamp, transformed.get("field6").asText());
assertEquals(expectedTimestamp, transformed.get("field7").asText());
assertEquals(Base64.encodeAsString("test".getBytes(Charset.defaultCharset())), transformed.get("field8").asText());
assertEquals(Base64.encodeBase64String("test".getBytes(Charset.defaultCharset())), transformed.get("field8").asText());
assertEquals("test2", transformed.get("field9").asText());
assertEquals("test3", transformed.get("field10").asText());
assertEquals(OBJECT_ID, transformed.get("field11").asText());

View File

@@ -0,0 +1,47 @@
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
implementation project(':airbyte-cdk:java:airbyte-cdk:db-destinations')
implementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:db-destinations')
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:db-destinations')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:db-destinations'))
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-api')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:init-oss')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testImplementation 'org.mockito:mockito-core:4.6.1'
// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation ('org.apache.hadoop:hadoop-aws:3.3.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.slf4j', module: 'slf4j-reload4j'
}
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'
implementation 'org.apache.commons:commons-csv:1.4'
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation libs.bundles.junit
testImplementation libs.junit.jupiter.system.stubs
}

View File

@@ -11,13 +11,9 @@ import static java.util.stream.Collectors.toList;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig;
import io.airbyte.cdk.integrations.destination.record_buffer.BufferCreateFunction;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy;
import io.airbyte.cdk.integrations.destination_async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
import io.airbyte.commons.exceptions.ConfigErrorException;
@@ -49,7 +45,7 @@ import org.slf4j.LoggerFactory;
* Uses both Factory and Consumer design pattern to create a single point of creation for consuming
* {@link AirbyteMessage} for processing
*/
public class StagingConsumerFactory {
public class StagingConsumerFactory extends SerialStagingConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(StagingConsumerFactory.class);
@@ -64,33 +60,6 @@ public class StagingConsumerFactory {
private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC);
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();
public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final BufferCreateFunction onCreateBuffer,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog, useDestinationsV2Columns);
return new BufferedStreamConsumer(
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
new SerializedBufferingStrategy(
onCreateBuffer,
catalog,
SerialFlush.function(database, stagingOperations, writeConfigs, catalog, typerDeduperValve, typerDeduper)),
GeneralStagingFunctions.onCloseFunction(database, stagingOperations, writeConfigs, purgeStagingData, typerDeduper),
catalog,
stagingOperations::isValidData,
defaultNamespace);
}
public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,

Some files were not shown because too many files have changed in this diff Show More