1
0
mirror of synced 2025-12-19 18:14:56 -05:00

java CDK: clean up dependencies, refactor modules (#34745)

This commit is contained in:
Marius Posta
2024-02-08 17:46:51 -08:00
committed by GitHub
parent 1fb0e27848
commit 796b2e8dad
393 changed files with 684 additions and 5509 deletions

View File

@@ -166,6 +166,7 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject | | Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. |
| 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. | | 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. |
| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. | | 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. |
| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. | | 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. |

View File

@@ -1,41 +0,0 @@
plugins {
id "java-library"
}
java {
compileJava {
options.compilerArgs += "-Xlint:-try"
}
}
dependencies {
implementation group: 'joda-time', name: 'joda-time', version: '2.12.5'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'com.auth0:java-jwt:3.19.2'
implementation libs.guava
implementation(libs.temporal.sdk) {
exclude module: 'guava'
}
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-text:1.10.0'
implementation libs.bundles.datadog
implementation group: 'io.swagger', name: 'swagger-annotations', version: '1.6.2'
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-api')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-protocol')
implementation project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testAnnotationProcessor libs.jmh.annotations
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'org.mockito:mockito-inline:4.7.0'
testImplementation libs.postgresql
testImplementation libs.testcontainers
testImplementation libs.testcontainers.postgresql
testImplementation libs.jmh.core
testImplementation libs.jmh.annotations
testImplementation 'com.github.docker-java:docker-java:3.2.8'
testImplementation 'com.github.docker-java:docker-java-transport-httpclient5:3.2.8'
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.internal;
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigrator;
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.Optional;
public class VersionedAirbyteMessageBufferedWriter<T> extends DefaultAirbyteMessageBufferedWriter {
private final AirbyteMessageSerializer<T> serializer;
private final AirbyteMessageVersionedMigrator<T> migrator;
private final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog;
public VersionedAirbyteMessageBufferedWriter(final BufferedWriter writer,
final AirbyteMessageSerializer<T> serializer,
final AirbyteMessageVersionedMigrator<T> migrator,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
super(writer);
this.serializer = serializer;
this.migrator = migrator;
this.configuredAirbyteCatalog = configuredAirbyteCatalog;
}
@Override
public void write(final AirbyteMessage message) throws IOException {
final T downgradedMessage = migrator.downgrade(message, configuredAirbyteCatalog);
writer.write(serializer.serialize(downgradedMessage));
writer.newLine();
}
}

View File

@@ -1,49 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.internal;
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.BufferedWriter;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VersionedAirbyteMessageBufferedWriterFactory implements AirbyteMessageBufferedWriterFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteMessageBufferedWriterFactory.class);
private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteProtocolVersionedMigratorFactory migratorFactory;
private final Version protocolVersion;
private final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog;
public VersionedAirbyteMessageBufferedWriterFactory(final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
final Version protocolVersion,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.protocolVersion = protocolVersion;
this.configuredAirbyteCatalog = configuredAirbyteCatalog;
}
@Override
public AirbyteMessageBufferedWriter createWriter(BufferedWriter bufferedWriter) {
final boolean needMigration = !protocolVersion.getMajorVersion().equals(migratorFactory.getMostRecentVersion().getMajorVersion());
LOGGER.info(
"Writing messages to protocol version {}{}",
protocolVersion.serialize(),
needMigration ? ", messages will be downgraded from protocol version " + migratorFactory.getMostRecentVersion().serialize() : "");
return new VersionedAirbyteMessageBufferedWriter<>(
bufferedWriter,
serDeProvider.getSerializer(protocolVersion).orElseThrow(),
migratorFactory.getAirbyteMessageMigrator(protocolVersion),
configuredAirbyteCatalog);
}
}

View File

@@ -1,11 +0,0 @@
# airbyte-api
Defines the OpenApi configuration for the Airbyte Configuration API. It also is responsible for generating the following from the API spec:
* Java API client
* Java API server - this generated code is used in `airbyte-server` to allow us to implement the Configuration API in a type safe way. See `ConfigurationApi.java` in `airbyte-server`
* API docs
## Key Files
* src/openapi/config.yaml - Defines the config API interface using OpenApi3
* AirbyteApiClient.java - wraps all api clients so that they can be dependency injected together
* PatchedLogsApi.java - fixes generated code for log api.

View File

@@ -1,7 +0,0 @@
plugins {
id "java-library"
}
dependencies {
implementation 'commons-cli:commons-cli:1.4'
}

View File

@@ -1,3 +0,0 @@
# airbyte-commons-cli
This module houses utility functions for the `commons-cli` library. It is separate from `commons`, because it depends on external library `commons-cli` which we do not want to introduce as a dependency to every module.

View File

@@ -1,10 +0,0 @@
java {
compileJava {
options.compilerArgs += "-Xlint:-unchecked"
}
}
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
}

View File

@@ -1,78 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* AirbyteProtocol Message Migrator
*
* This class is intended to apply the transformations required to go from one version of the
* AirbyteProtocol to another.
*/
public class AirbyteMessageMigrator {
private final MigrationContainer<AirbyteMessageMigration<?, ?>> migrationContainer;
public AirbyteMessageMigrator(final List<AirbyteMessageMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}
public void initialize() {
migrationContainer.initialize();
}
/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
final Version target,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.downgrade(message, target, (migration, msg) -> applyDowngrade(migration, msg, configuredAirbyteCatalog));
}
/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
final Version source,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrationContainer.upgrade(message, source, (migration, msg) -> applyUpgrade(migration, msg, configuredAirbyteCatalog));
}
public Version getMostRecentVersion() {
return migrationContainer.getMostRecentVersion();
}
// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.downgrade((CurrentVersion) message, configuredAirbyteCatalog);
}
// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final AirbyteMessageMigration<PreviousVersion, CurrentVersion> migration,
final Object message,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migration.upgrade((PreviousVersion) message, configuredAirbyteCatalog);
}
// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrationContainer.getMigrationKeys();
}
}

View File

@@ -1,95 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
import io.airbyte.commons.version.Version;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* AirbyteProtocol Message Serializer/Deserializer provider
*
* This class is intended to help access the serializer/deserializer for a given version of the
* Airbyte Protocol.
*/
public class AirbyteMessageSerDeProvider {
private final List<AirbyteMessageDeserializer<?>> deserializersToRegister;
private final List<AirbyteMessageSerializer<?>> serializersToRegister;
private final Map<String, AirbyteMessageDeserializer<?>> deserializers = new HashMap<>();
private final Map<String, AirbyteMessageSerializer<?>> serializers = new HashMap<>();
public AirbyteMessageSerDeProvider(final List<AirbyteMessageDeserializer<?>> deserializers,
final List<AirbyteMessageSerializer<?>> serializers) {
deserializersToRegister = deserializers;
serializersToRegister = serializers;
}
public AirbyteMessageSerDeProvider() {
this(Collections.emptyList(), Collections.emptyList());
}
public void initialize() {
deserializersToRegister.forEach(this::registerDeserializer);
serializersToRegister.forEach(this::registerSerializer);
}
/**
* Returns the Deserializer for the version if known else empty
*/
public Optional<AirbyteMessageDeserializer<?>> getDeserializer(final Version version) {
return Optional.ofNullable(deserializers.get(version.getMajorVersion()));
}
/**
* Returns the Serializer for the version if known else empty
*/
public Optional<AirbyteMessageSerializer<?>> getSerializer(final Version version) {
return Optional.ofNullable(serializers.get(version.getMajorVersion()));
}
@VisibleForTesting
void registerDeserializer(final AirbyteMessageDeserializer<?> deserializer) {
final String key = deserializer.getTargetVersion().getMajorVersion();
if (!deserializers.containsKey(key)) {
deserializers.put(key, deserializer);
} else {
throw new RuntimeException(String.format("Trying to register a deserializer for protocol version {} when {} already exists",
deserializer.getTargetVersion().serialize(), deserializers.get(key).getTargetVersion().serialize()));
}
}
@VisibleForTesting
void registerSerializer(final AirbyteMessageSerializer<?> serializer) {
final String key = serializer.getTargetVersion().getMajorVersion();
if (!serializers.containsKey(key)) {
serializers.put(key, serializer);
} else {
throw new RuntimeException(String.format("Trying to register a serializer for protocol version {} when {} already exists",
serializer.getTargetVersion().serialize(), serializers.get(key).getTargetVersion().serialize()));
}
}
// Used for inspection of the injection
@VisibleForTesting
Set<String> getDeserializerKeys() {
return deserializers.keySet();
}
// Used for inspection of the injection
@VisibleForTesting
Set<String> getSerializerKeys() {
return serializers.keySet();
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Optional;
/**
* Wraps message migration from a fixed version to the most recent version
*/
public class AirbyteMessageVersionedMigrator<OriginalMessageType> {
private final AirbyteMessageMigrator migrator;
private final Version version;
public AirbyteMessageVersionedMigrator(final AirbyteMessageMigrator migrator, final Version version) {
this.migrator = migrator;
this.version = version;
}
public OriginalMessageType downgrade(final AirbyteMessage message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.downgrade(message, version, configuredAirbyteCatalog);
}
public AirbyteMessage upgrade(final OriginalMessageType message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return migrator.upgrade(message, version, configuredAirbyteCatalog);
}
public Version getVersion() {
return version;
}
}

View File

@@ -1,35 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import io.airbyte.commons.version.Version;
/**
* Factory to build AirbyteMessageVersionedMigrator
*/
public class AirbyteProtocolVersionedMigratorFactory {
private final AirbyteMessageMigrator airbyteMessageMigrator;
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
public AirbyteProtocolVersionedMigratorFactory(final AirbyteMessageMigrator airbyteMessageMigrator,
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator) {
this.airbyteMessageMigrator = airbyteMessageMigrator;
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
}
public <T> AirbyteMessageVersionedMigrator<T> getAirbyteMessageMigrator(final Version version) {
return new AirbyteMessageVersionedMigrator<>(airbyteMessageMigrator, version);
}
public final VersionedProtocolSerializer getProtocolSerializer(final Version version) {
return new VersionedProtocolSerializer(configuredAirbyteCatalogMigrator, version);
}
public Version getMostRecentVersion() {
return airbyteMessageMigrator.getMostRecentVersion();
}
}

View File

@@ -1,64 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration;
import io.airbyte.commons.protocol.migrations.MigrationContainer;
import io.airbyte.commons.version.Version;
import java.util.List;
import java.util.Set;
public class ConfiguredAirbyteCatalogMigrator {
private final MigrationContainer<ConfiguredAirbyteCatalogMigration<?, ?>> migrationContainer;
public ConfiguredAirbyteCatalogMigrator(final List<ConfiguredAirbyteCatalogMigration<?, ?>> migrations) {
migrationContainer = new MigrationContainer<>(migrations);
}
public void initialize() {
migrationContainer.initialize();
}
/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message, final Version target) {
return migrationContainer.downgrade(message, target, ConfiguredAirbyteCatalogMigrator::applyDowngrade);
}
/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message, final Version source) {
return migrationContainer.upgrade(message, source, ConfiguredAirbyteCatalogMigrator::applyUpgrade);
}
public Version getMostRecentVersion() {
return migrationContainer.getMostRecentVersion();
}
// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> PreviousVersion applyDowngrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.downgrade((CurrentVersion) message);
}
// Helper function to work around type casting
private static <PreviousVersion, CurrentVersion> CurrentVersion applyUpgrade(final ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> migration,
final Object message) {
return migration.upgrade((PreviousVersion) message);
}
// Used for inspection of the injection
@VisibleForTesting
Set<String> getMigrationKeys() {
return migrationContainer.getMigrationKeys();
}
}

View File

@@ -1,32 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
/**
* Serialize a ConfiguredAirbyteCatalog to the specified version
* <p>
* This Serializer expects a ConfiguredAirbyteCatalog from the Current version of the platform,
* converts it to the target protocol version before serializing it.
*/
public class VersionedProtocolSerializer implements ProtocolSerializer {
private final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator;
private final Version protocolVersion;
public VersionedProtocolSerializer(final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator, final Version protocolVersion) {
this.configuredAirbyteCatalogMigrator = configuredAirbyteCatalogMigrator;
this.protocolVersion = protocolVersion;
}
@Override
public String serialize(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return Jsons.serialize(configuredAirbyteCatalogMigrator.downgrade(configuredAirbyteCatalog, protocolVersion));
}
}

View File

@@ -1,36 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Optional;
/**
* AirbyteProtocol message migration interface
*
* @param <PreviousVersion> The Old AirbyteMessage type
* @param <CurrentVersion> The New AirbyteMessage type
*/
public interface AirbyteMessageMigration<PreviousVersion, CurrentVersion> extends Migration {
/**
* Downgrades a message to from the new version to the old version
*
* @param message: the message to downgrade
* @param configuredAirbyteCatalog: the ConfiguredAirbyteCatalog of the connection when applicable
* @return the downgraded message
*/
PreviousVersion downgrade(final CurrentVersion message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog);
/**
* Upgrades a message from the old version to the new version
*
* @param message: the message to upgrade
* @param configuredAirbyteCatalog: the ConfiguredAirbyteCatalog of the connection when applicable
* @return the upgrade message
*/
CurrentVersion upgrade(final PreviousVersion message, final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog);
}

View File

@@ -1,25 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations;
public interface ConfiguredAirbyteCatalogMigration<PreviousVersion, CurrentVersion> extends Migration {
/**
* Downgrades a ConfiguredAirbyteCatalog from the new version to the old version
*
* @param message: the ConfiguredAirbyteCatalog to downgrade
* @return the downgraded ConfiguredAirbyteCatalog
*/
PreviousVersion downgrade(final CurrentVersion message);
/**
* Upgrades a ConfiguredAirbyteCatalog from the old version to the new version
*
* @param message: the ConfiguredAirbyteCatalog to upgrade
* @return the upgraded ConfiguredAirbyteCatalog
*/
CurrentVersion upgrade(final PreviousVersion message);
}

View File

@@ -1,21 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations;
import io.airbyte.commons.version.Version;
public interface Migration {
/**
* The Old version, note that due to semver, the important piece of information is the Major.
*/
Version getPreviousVersion();
/**
* The New version, note that due to semver, the important piece of information is the Major.
*/
Version getCurrentVersion();
}

View File

@@ -1,103 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations;
import io.airbyte.commons.version.Version;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiFunction;
public class MigrationContainer<T extends Migration> {
private final List<T> migrationsToRegister;
private final SortedMap<String, T> migrations = new TreeMap<>();
// mostRecentMajorVersion defaults to v0 as no migration is required
private String mostRecentMajorVersion = "0";
public MigrationContainer(final List<T> migrations) {
this.migrationsToRegister = migrations;
}
public void initialize() {
migrationsToRegister.forEach(this::registerMigration);
}
public Version getMostRecentVersion() {
return new Version(mostRecentMajorVersion, "0", "0");
}
/**
* Downgrade a message from the most recent version to the target version by chaining all the
* required migrations
*/
public <PreviousVersion, CurrentVersion> PreviousVersion downgrade(final CurrentVersion message,
final Version target,
final BiFunction<T, Object, Object> applyDowngrade) {
if (target.getMajorVersion().equals(mostRecentMajorVersion)) {
return (PreviousVersion) message;
}
Object result = message;
Object[] selectedMigrations = selectMigrations(target).toArray();
for (int i = selectedMigrations.length; i > 0; --i) {
result = applyDowngrade.apply((T) selectedMigrations[i - 1], result);
}
return (PreviousVersion) result;
}
/**
* Upgrade a message from the source version to the most recent version by chaining all the required
* migrations
*/
public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVersion message,
final Version source,
final BiFunction<T, Object, Object> applyUpgrade) {
if (source.getMajorVersion().equals(mostRecentMajorVersion)) {
return (CurrentVersion) message;
}
Object result = message;
for (var migration : selectMigrations(source)) {
result = applyUpgrade.apply(migration, result);
}
return (CurrentVersion) result;
}
public Collection<T> selectMigrations(final Version version) {
final Collection<T> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
throw new RuntimeException("Unsupported migration version " + version.serialize());
}
return results;
}
/**
* Store migration in a sorted map key by the major of the lower version of the migration.
*
* The goal is to be able to retrieve the list of migrations to apply to get to/from a given
* version. We are only keying on the lower version because the right side (most recent version of
* the migration range) is always current version.
*/
private void registerMigration(final T migration) {
final String key = migration.getPreviousVersion().getMajorVersion();
if (!migrations.containsKey(key)) {
migrations.put(key, migration);
if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) {
mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion();
}
} else {
throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());
}
}
public Set<String> getMigrationKeys() {
return migrations.keySet();
}
}

View File

@@ -1,271 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.util;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.ARRAY_TYPE;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.ITEMS_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.OBJECT_TYPE;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.ONEOF_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.PROPERTIES_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.REF_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.TYPE_KEY;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.function.BiFunction;
import java.util.function.Function;
public class RecordMigrations {
/**
* Quick and dirty tuple. Used internally by
* {@link #mutateDataNode(JsonSchemaValidator, Function, Transformer, JsonNode, JsonNode)}; callers
* probably only actually need the node.
*
* matchedSchema is useful for mutating using a oneOf schema, where we need to recognize the correct
* subschema.
*
* @param node Our attempt at mutating the node, under the given schema
* @param matchedSchema Whether the original node actually matched the schema
*/
public record MigratedNode(JsonNode node, boolean matchedSchema) {}
/**
* Extend BiFunction so that we can have named parameters.
*/
@FunctionalInterface
public interface Transformer extends BiFunction<JsonNode, JsonNode, MigratedNode> {
@Override
MigratedNode apply(JsonNode schema, JsonNode data);
}
/**
* Works on a best-effort basis. If the schema doesn't match the data, we'll do our best to mutate
* anything that we can definitively say matches the criteria. Should _not_ throw an exception if
* bad things happen (e.g. we try to parse a non-numerical string as a number).
*
* @param schemaMatcher Accepts a JsonNode schema and returns whether its corresponding entry in the
* data should be mutated. Doesn't need to handle oneOf cases, i.e. should only care about
* type/$ref.
* @param transformer Performs the modification on the given data node. Should not throw exceptions.
*/
public static MigratedNode mutateDataNode(
final JsonSchemaValidator validator,
final Function<JsonNode, Boolean> schemaMatcher,
final Transformer transformer,
final JsonNode data,
final JsonNode schema) {
// If this is a oneOf node, then we need to handle each oneOf case.
if (!schema.hasNonNull(REF_KEY) && !schema.hasNonNull(TYPE_KEY) && schema.hasNonNull(ONEOF_KEY)) {
return mutateOneOfNode(validator, schemaMatcher, transformer, data, schema);
}
// If we should mutate the data, then mutate it appropriately
if (schemaMatcher.apply(schema)) {
return transformer.apply(schema, data);
}
// Otherwise, we need to recurse into non-primitive nodes.
if (data.isObject()) {
return mutateObjectNode(validator, schemaMatcher, transformer, data, schema);
} else if (data.isArray()) {
return mutateArrayNode(validator, schemaMatcher, transformer, data, schema);
} else {
// There's nothing to do in the case of a primitive node.
// So we just check whether the schema is correct and return the node as-is.
return new MigratedNode(data, validator.test(schema, data));
}
}
/**
* Attempt to mutate using each oneOf option in sequence. Returns the result from mutating using the
* first subschema that matches the data, or if none match, then the result of using the first
* subschema.
*/
private static MigratedNode mutateOneOfNode(
final JsonSchemaValidator validator,
final Function<JsonNode, Boolean> schemaMatcher,
final Transformer transformer,
final JsonNode data,
final JsonNode schema) {
final JsonNode schemaOptions = schema.get(ONEOF_KEY);
if (schemaOptions.size() == 0) {
// If the oneOf has no options, then don't do anything interesting.
return new MigratedNode(data, validator.test(schema, data));
}
// Attempt to mutate the node against each oneOf schema.
// Return the first schema that matches the data, or the first schema if none matched successfully.
MigratedNode migratedNode = null;
for (final JsonNode maybeSchema : schemaOptions) {
final MigratedNode maybeMigratedNode = mutateDataNode(validator, schemaMatcher, transformer, data, maybeSchema);
if (maybeMigratedNode.matchedSchema()) {
// If we've found a matching schema, then return immediately
return maybeMigratedNode;
} else if (migratedNode == null) {
// Otherwise - if this is the first subschema, then just take it
migratedNode = maybeMigratedNode;
}
}
// None of the schemas matched, so just return whatever we found first
return migratedNode;
}
/**
* If data is an object, then we need to recursively mutate all of its fields.
*/
private static MigratedNode mutateObjectNode(
final JsonSchemaValidator validator,
final Function<JsonNode, Boolean> schemaMatcher,
final Transformer transformer,
final JsonNode data,
final JsonNode schema) {
boolean isObjectSchema;
// First, check whether the schema is supposed to be an object at all.
if (schema.hasNonNull(REF_KEY)) {
// If the schema uses a reference type, then it's not an object schema.
isObjectSchema = false;
} else if (schema.hasNonNull(TYPE_KEY)) {
// If the schema declares {type: object} or {type: [..., object, ...]}
// Then this is an object schema
final JsonNode typeNode = schema.get(TYPE_KEY);
if (typeNode.isArray()) {
isObjectSchema = false;
for (final JsonNode typeItem : typeNode) {
if (OBJECT_TYPE.equals(typeItem.asText())) {
isObjectSchema = true;
}
}
} else {
isObjectSchema = OBJECT_TYPE.equals(typeNode.asText());
}
} else {
// If the schema doesn't declare a type at all (which is bad practice, but let's handle it anyway)
// Then check for a properties entry, and assume that this is an object if it's present
isObjectSchema = schema.hasNonNull(PROPERTIES_KEY);
}
if (!isObjectSchema) {
// If it's not supposed to be an object, then we can't do anything here.
// Return the data without modification.
return new MigratedNode(data, false);
} else {
// If the schema _is_ for an object, then recurse into each field
final ObjectNode mutatedData = (ObjectNode) Jsons.emptyObject();
final JsonNode propertiesNode = schema.get(PROPERTIES_KEY);
final Iterator<Entry<String, JsonNode>> dataFields = data.fields();
boolean matchedSchema = true;
while (dataFields.hasNext()) {
final Entry<String, JsonNode> field = dataFields.next();
final String key = field.getKey();
final JsonNode value = field.getValue();
if (propertiesNode != null && propertiesNode.hasNonNull(key)) {
// If we have a schema for this property, mutate the value
final JsonNode subschema = propertiesNode.get(key);
final MigratedNode migratedNode = mutateDataNode(validator, schemaMatcher, transformer, value, subschema);
mutatedData.set(key, migratedNode.node);
if (!migratedNode.matchedSchema) {
matchedSchema = false;
}
} else {
// Else it's an additional property - we _could_ check additionalProperties,
// but that's annoying. We don't actually respect that in destinations/normalization anyway.
mutatedData.set(key, value);
}
}
return new MigratedNode(mutatedData, matchedSchema);
}
}
/**
* Much like objects, arrays must be recursively mutated.
*/
private static MigratedNode mutateArrayNode(
final JsonSchemaValidator validator,
final Function<JsonNode, Boolean> schemaMatcher,
final Transformer transformer,
final JsonNode data,
final JsonNode schema) {
// Similar to objects, we first check whether this is even supposed to be an array.
boolean isArraySchema;
if (schema.hasNonNull(REF_KEY)) {
// If the schema uses a reference type, then it's not an array schema.
isArraySchema = false;
} else if (schema.hasNonNull(TYPE_KEY)) {
// If the schema declares {type: array} or {type: [..., array, ...]}
// Then this is an array schema
final JsonNode typeNode = schema.get(TYPE_KEY);
if (typeNode.isArray()) {
isArraySchema = false;
for (final JsonNode typeItem : typeNode) {
if (ARRAY_TYPE.equals(typeItem.asText())) {
isArraySchema = true;
}
}
} else {
isArraySchema = ARRAY_TYPE.equals(typeNode.asText());
}
} else {
// If the schema doesn't declare a type at all (which is bad practice, but let's handle it anyway)
// Then check for an items entry, and assume that this is an array if it's present
isArraySchema = schema.hasNonNull(ITEMS_KEY);
}
if (!isArraySchema) {
return new MigratedNode(data, false);
} else {
final ArrayNode mutatedItems = Jsons.arrayNode();
final JsonNode itemsNode = schema.get(ITEMS_KEY);
if (itemsNode == null) {
// We _could_ check additionalItems, but much like the additionalProperties comment for objects:
// it's a lot of work for no payoff
return new MigratedNode(data, true);
} else if (itemsNode.isArray()) {
// In the case of {items: [schema1, schema2, ...]}
// We need to check schema1 against the first element of the array,
// schema2 against the second element, etc.
boolean allSchemasMatched = true;
for (int i = 0; i < data.size(); i++) {
final JsonNode element = data.get(i);
if (itemsNode.size() > i) {
// If we have a schema for this element, then try mutating the element
final MigratedNode mutatedElement = mutateDataNode(validator, schemaMatcher, transformer, element, itemsNode.get(i));
if (!mutatedElement.matchedSchema()) {
allSchemasMatched = false;
}
mutatedItems.add(mutatedElement.node());
}
}
// If there were more elements in `data` than there were schemas in `itemsNode`,
// then just blindly add the rest of those elements.
for (int i = itemsNode.size(); i < data.size(); i++) {
mutatedItems.add(data.get(i));
}
return new MigratedNode(mutatedItems, allSchemasMatched);
} else {
// IN the case of {items: schema}, we just check every array element against that schema.
boolean matchedSchema = true;
for (final JsonNode item : data) {
final MigratedNode migratedNode = mutateDataNode(validator, schemaMatcher, transformer, item, itemsNode);
mutatedItems.add(migratedNode.node);
if (!migratedNode.matchedSchema) {
matchedSchema = false;
}
}
return new MigratedNode(mutatedItems, matchedSchema);
}
}
}
}

View File

@@ -1,143 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Utility class for recursively modifying JsonSchemas. Useful for up/downgrading AirbyteCatalog
* objects.
*
* See {@link io.airbyte.commons.protocol.migrations.v1.SchemaMigrationV1} for example usage.
*/
public class SchemaMigrations {
/**
* Generic utility method that recurses through all type declarations in the schema. For each type
* declaration that are accepted by matcher, mutate them using transformer. For all other type
* declarations, recurse into their subschemas (if any).
* <p>
* Note that this modifies the schema in-place. Callers who need a copy of the old schema should
* save schema.deepCopy() before calling this method.
*
* @param schema The JsonSchema node to walk down
* @param matcher A function which returns true on any schema node that needs to be transformed
* @param transformer A function which mutates a schema node
*/
public static void mutateSchemas(final Function<JsonNode, Boolean> matcher, final Consumer<JsonNode> transformer, final JsonNode schema) {
if (schema.isBoolean()) {
// We never want to modify a schema of `true` or `false` (e.g. additionalProperties: true)
// so just return immediately
return;
}
if (matcher.apply(schema)) {
// Base case: If this schema should be mutated, then we need to mutate it
transformer.accept(schema);
} else {
// Otherwise, we need to find all the subschemas and mutate them.
// technically, it might be more correct to do something like:
// if schema["type"] == "array": find subschemas for items, additionalItems, contains
// else if schema["type"] == "object": find subschemas for properties, patternProperties,
// additionalProperties
// else if oneof, allof, etc
// but that sounds really verbose for no real benefit
final List<JsonNode> subschemas = findSubschemas(schema);
// recurse into each subschema
for (final JsonNode subschema : subschemas) {
mutateSchemas(matcher, transformer, subschema);
}
}
}
/**
* Returns a list of all the direct children nodes to consider for subSchemas
*
* @param schema The JsonSchema node to start
* @return a list of the JsonNodes to be considered
*/
public static List<JsonNode> findSubschemas(final JsonNode schema) {
final List<JsonNode> subschemas = new ArrayList<>();
// array schemas
findSubschemas(subschemas, schema, "items");
findSubschemas(subschemas, schema, "additionalItems");
findSubschemas(subschemas, schema, "contains");
// object schemas
if (schema.hasNonNull("properties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("properties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
if (schema.hasNonNull("patternProperties")) {
final ObjectNode propertiesNode = (ObjectNode) schema.get("patternProperties");
final Iterator<Entry<String, JsonNode>> propertiesIterator = propertiesNode.fields();
while (propertiesIterator.hasNext()) {
final Entry<String, JsonNode> property = propertiesIterator.next();
subschemas.add(property.getValue());
}
}
findSubschemas(subschemas, schema, "additionalProperties");
// combining restrictions - destinations have limited support for these, but we should handle the
// schemas correctly anyway
findSubschemas(subschemas, schema, "allOf");
findSubschemas(subschemas, schema, "oneOf");
findSubschemas(subschemas, schema, "anyOf");
findSubschemas(subschemas, schema, "not");
return subschemas;
}
/**
* If schema contains key, then grab the subschema(s) at schema[key] and add them to the subschemas
* list.
* <p>
* For example:
* <ul>
* <li>schema = {"items": [{"type": "string}]}
* <p>
* key = "items"
* <p>
* -> add {"type": "string"} to subschemas</li>
* <li>schema = {"items": {"type": "string"}}
* <p>
* key = "items"
* <p>
* -> add {"type": "string"} to subschemas</li>
* <li>schema = {"additionalProperties": true}
* <p>
* key = "additionalProperties"
* <p>
* -> add nothing to subschemas
* <p>
* (technically `true` is a valid JsonSchema, but we don't want to modify it)</li>
* </ul>
*/
public static void findSubschemas(final List<JsonNode> subschemas, final JsonNode schema, final String key) {
if (schema.hasNonNull(key)) {
final JsonNode subschemaNode = schema.get(key);
if (subschemaNode.isArray()) {
for (final JsonNode subschema : subschemaNode) {
subschemas.add(subschema);
}
} else if (subschemaNode.isObject()) {
subschemas.add(subschemaNode);
}
}
}
}

View File

@@ -1,178 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.v1;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.REF_KEY;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.util.RecordMigrations;
import io.airbyte.commons.protocol.migrations.util.RecordMigrations.MigratedNode;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> {
private final JsonSchemaValidator validator;
public AirbyteMessageMigrationV1() {
this(new JsonSchemaValidator());
}
@VisibleForTesting
public AirbyteMessageMigrationV1(final JsonSchemaValidator validator) {
this.validator = validator;
}
@Override
public io.airbyte.protocol.models.v0.AirbyteMessage downgrade(final AirbyteMessage oldMessage,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
final io.airbyte.protocol.models.v0.AirbyteMessage newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
io.airbyte.protocol.models.v0.AirbyteMessage.class);
if (oldMessage.getType() == Type.CATALOG && oldMessage.getCatalog() != null) {
for (final io.airbyte.protocol.models.v0.AirbyteStream stream : newMessage.getCatalog().getStreams()) {
final JsonNode schema = stream.getJsonSchema();
SchemaMigrationV1.downgradeSchema(schema);
}
} else if (oldMessage.getType() == Type.RECORD && oldMessage.getRecord() != null) {
if (configuredAirbyteCatalog.isPresent()) {
final ConfiguredAirbyteCatalog catalog = configuredAirbyteCatalog.get();
final io.airbyte.protocol.models.v0.AirbyteRecordMessage record = newMessage.getRecord();
final Optional<ConfiguredAirbyteStream> maybeStream = catalog.getStreams().stream()
.filter(stream -> Objects.equals(stream.getStream().getName(), record.getStream())
&& Objects.equals(stream.getStream().getNamespace(), record.getNamespace()))
.findFirst();
// If this record doesn't belong to any configured stream, then there's no point downgrading it
// So only do the downgrade if we can find its stream
if (maybeStream.isPresent()) {
final JsonNode schema = maybeStream.get().getStream().getJsonSchema();
final JsonNode oldData = record.getData();
final MigratedNode downgradedNode = downgradeRecord(oldData, schema);
record.setData(downgradedNode.node());
}
}
}
return newMessage;
}
@Override
public AirbyteMessage upgrade(final io.airbyte.protocol.models.v0.AirbyteMessage oldMessage,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
// We're not introducing any changes to the structure of the record/catalog
// so just clone a new message object, which we can edit in-place
final AirbyteMessage newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
AirbyteMessage.class);
if (oldMessage.getType() == io.airbyte.protocol.models.v0.AirbyteMessage.Type.CATALOG && oldMessage.getCatalog() != null) {
for (final AirbyteStream stream : newMessage.getCatalog().getStreams()) {
final JsonNode schema = stream.getJsonSchema();
SchemaMigrationV1.upgradeSchema(schema);
}
} else if (oldMessage.getType() == io.airbyte.protocol.models.v0.AirbyteMessage.Type.RECORD && oldMessage.getRecord() != null) {
final JsonNode oldData = newMessage.getRecord().getData();
final JsonNode newData = upgradeRecord(oldData);
newMessage.getRecord().setData(newData);
}
return newMessage;
}
/**
* Returns a copy of oldData, with numeric values converted to strings. String and boolean values
* are returned as-is for convenience, i.e. this is not a true deep copy.
*/
private static JsonNode upgradeRecord(final JsonNode oldData) {
if (oldData.isNumber()) {
// Base case: convert numbers to strings
return Jsons.convertValue(oldData.asText(), TextNode.class);
} else if (oldData.isObject()) {
// Recurse into each field of the object
final ObjectNode newData = (ObjectNode) Jsons.emptyObject();
final Iterator<Entry<String, JsonNode>> fieldsIterator = oldData.fields();
while (fieldsIterator.hasNext()) {
final Entry<String, JsonNode> next = fieldsIterator.next();
final String key = next.getKey();
final JsonNode value = next.getValue();
final JsonNode newValue = upgradeRecord(value);
newData.set(key, newValue);
}
return newData;
} else if (oldData.isArray()) {
// Recurse into each element of the array
final ArrayNode newData = Jsons.arrayNode();
for (final JsonNode element : oldData) {
newData.add(upgradeRecord(element));
}
return newData;
} else {
// Base case: this is a string or boolean, so we don't need to modify it
return oldData;
}
}
/**
* We need the schema to recognize which fields are integers, since it would be wrong to just assume
* any numerical string should be parsed out.
*
* Works on a best-effort basis. If the schema doesn't match the data, we'll do our best to
* downgrade anything that we can definitively say is a number. Should _not_ throw an exception if
* bad things happen (e.g. we try to parse a non-numerical string as a number).
*/
private MigratedNode downgradeRecord(final JsonNode data, final JsonNode schema) {
return RecordMigrations.mutateDataNode(
validator,
s -> {
if (s.hasNonNull(REF_KEY)) {
final String type = s.get(REF_KEY).asText();
return JsonSchemaReferenceTypes.INTEGER_REFERENCE.equals(type)
|| JsonSchemaReferenceTypes.NUMBER_REFERENCE.equals(type);
} else {
return false;
}
},
(s, d) -> {
if (d.asText().matches("-?\\d+(\\.\\d+)?")) {
// If this string is a numeric literal, convert it to a numeric node.
return new MigratedNode(Jsons.deserialize(d.asText()), true);
} else {
// Otherwise, just leave the node unchanged.
return new MigratedNode(d, false);
}
},
data, schema);
}
@Override
public Version getPreviousVersion() {
return AirbyteProtocolVersion.V0;
}
@Override
public Version getCurrentVersion() {
return AirbyteProtocolVersion.V1;
}
}

View File

@@ -1,219 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.v1;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.protocol.migrations.util.SchemaMigrations;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
/**
* For the v0 to v1 migration, it appears that we are persisting some protocol objects without
* version. Until this gets addressed more properly, this class contains the helper functions used
* to handle this on the fly migration.
*
* Once persisted objects are versioned, this code should be deleted.
*/
public class CatalogMigrationV1Helper {
/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV0DataTypes(configuredAirbyteCatalog)) {
upgradeSchema(configuredAirbyteCatalog);
}
}
/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV0DataTypes(airbyteCatalog)) {
upgradeSchema(airbyteCatalog);
}
}
/**
* Performs an in-place migration of the schema from v0 to v1
*
* @param configuredAirbyteCatalog to migrate
*/
private static void upgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getStream().getJsonSchema());
}
}
/**
* Performs an in-place migration of the schema from v0 to v1
*
* @param airbyteCatalog to migrate
*/
private static void upgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getJsonSchema());
}
}
/**
* Returns true if catalog contains v0 data types
*/
private static boolean containsV0DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}
return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}
/**
* Returns true if catalog contains v0 data types
*/
private static boolean containsV0DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}
return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}
private static boolean streamContainsV0DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV0DataType(airbyteStream.getJsonSchema());
}
/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV0DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveTypeDeclaration(schema)) {
return true;
}
for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV0DataType(subSchema)) {
return true;
}
}
return false;
}
/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV1DataTypes(configuredAirbyteCatalog)) {
downgradeSchema(configuredAirbyteCatalog);
}
}
/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV1DataTypes(airbyteCatalog)) {
downgradeSchema(airbyteCatalog);
}
}
/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param configuredAirbyteCatalog to migrate
*/
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
}
}
/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param airbyteCatalog to migrate
*/
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
}
}
/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}
return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}
/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}
return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}
private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV1DataType(airbyteStream.getJsonSchema());
}
/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV1DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
return true;
}
for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV1DataType(subSchema)) {
return true;
}
}
return false;
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.v1;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.ConfiguredAirbyteCatalogMigration;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class ConfiguredAirbyteCatalogMigrationV1
implements ConfiguredAirbyteCatalogMigration<io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, ConfiguredAirbyteCatalog> {
@Override
public io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog downgrade(final ConfiguredAirbyteCatalog oldMessage) {
final io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog.class);
for (final io.airbyte.protocol.models.v0.ConfiguredAirbyteStream stream : newMessage.getStreams()) {
final JsonNode schema = stream.getStream().getJsonSchema();
SchemaMigrationV1.downgradeSchema(schema);
}
return newMessage;
}
@Override
public ConfiguredAirbyteCatalog upgrade(final io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog oldMessage) {
final ConfiguredAirbyteCatalog newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
ConfiguredAirbyteCatalog.class);
for (final ConfiguredAirbyteStream stream : newMessage.getStreams()) {
final JsonNode schema = stream.getStream().getJsonSchema();
SchemaMigrationV1.upgradeSchema(schema);
}
return newMessage;
}
@Override
public Version getPreviousVersion() {
return AirbyteProtocolVersion.V0;
}
@Override
public Version getCurrentVersion() {
return AirbyteProtocolVersion.V1;
}
}

View File

@@ -1,306 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.v1;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.ONEOF_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.REF_KEY;
import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.TYPE_KEY;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.util.SchemaMigrations;
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.StreamSupport;
public class SchemaMigrationV1 {
/**
* Perform the {type: foo} -> {$ref: foo} upgrade. Modifies the schema in-place.
*/
public static void upgradeSchema(final JsonNode schema) {
SchemaMigrations.mutateSchemas(
SchemaMigrationV1::isPrimitiveTypeDeclaration,
SchemaMigrationV1::upgradeTypeDeclaration,
schema);
}
/**
* Perform the {$ref: foo} -> {type: foo} downgrade. Modifies the schema in-place.
*/
public static void downgradeSchema(final JsonNode schema) {
SchemaMigrations.mutateSchemas(
SchemaMigrationV1::isPrimitiveReferenceTypeDeclaration,
SchemaMigrationV1::downgradeTypeDeclaration,
schema);
}
/**
* Detects any schema that looks like a primitive type declaration, e.g.: { "type": "string" } or {
* "type": ["string", "object"] }
*/
static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
if (!schema.isObject() || !schema.hasNonNull(TYPE_KEY)) {
return false;
}
final JsonNode typeNode = schema.get(TYPE_KEY);
if (typeNode.isArray()) {
return StreamSupport.stream(typeNode.spliterator(), false)
.anyMatch(n -> JsonSchemaReferenceTypes.PRIMITIVE_JSON_TYPES.contains(n.asText()));
} else {
return JsonSchemaReferenceTypes.PRIMITIVE_JSON_TYPES.contains(typeNode.asText());
}
}
/**
* Detects any schema that looks like a reference type declaration, e.g.: { "$ref":
* "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] }
*/
static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
if (!schema.isObject()) {
// Non-object schemas (i.e. true/false) never need to be modified
return false;
} else if (schema.hasNonNull(REF_KEY) && schema.get(REF_KEY).asText().startsWith("WellKnownTypes.json")) {
// If this schema has a $ref, then we need to convert it back to type/airbyte_type/format
return true;
} else if (schema.hasNonNull(ONEOF_KEY)) {
// If this is a oneOf with at least one primitive $ref option, then we should consider converting it
// back
final List<JsonNode> subschemas = getSubschemas(schema, ONEOF_KEY);
return subschemas.stream().anyMatch(
subschema -> subschema.hasNonNull(REF_KEY)
&& subschema.get(REF_KEY).asText().startsWith("WellKnownTypes.json"));
} else {
return false;
}
}
/**
* Modifies the schema in-place to upgrade from the old-style type declaration to the new-style $ref
* declaration. Assumes that the schema is an ObjectNode containing a primitive declaration, i.e.
* either something like: {"type": "string"} or: {"type": ["string", "object"]}
* <p>
* In the latter case, the schema may contain subschemas. This method mutually recurses with
* {@link SchemaMigrations#mutateSchemas(Function, Consumer, JsonNode)} to upgrade those subschemas.
*
* @param schema An ObjectNode representing a primitive type declaration
*/
private static void upgradeTypeDeclaration(final JsonNode schema) {
final ObjectNode schemaNode = (ObjectNode) schema;
if (schemaNode.hasNonNull("airbyte_type")) {
// If airbyte_type is defined, always respect it
final String referenceType = JsonSchemaReferenceTypes.LEGACY_AIRBYTE_PROPERY_TO_REFERENCE.get(schemaNode.get("airbyte_type").asText());
schemaNode.removeAll();
schemaNode.put(REF_KEY, referenceType);
} else {
// Otherwise, fall back to type/format
final JsonNode typeNode = schemaNode.get(TYPE_KEY);
if (typeNode.isTextual()) {
// If the type is a single string, then replace this node with the appropriate reference type
final String type = typeNode.asText();
final String referenceType = getReferenceType(type, schemaNode);
schemaNode.removeAll();
schemaNode.put(REF_KEY, referenceType);
} else {
// If type is an array of strings, then things are more complicated
final List<String> types = StreamSupport.stream(typeNode.spliterator(), false)
.map(JsonNode::asText)
// Everything is implicitly nullable by just not declaring the `required `field
// so filter out any explicit null types
.filter(type -> !"null".equals(type))
.toList();
final boolean exactlyOneType = types.size() == 1;
if (exactlyOneType) {
// If there's only one type, e.g. {type: [string]}, just treat that as equivalent to {type: string}
final String type = types.get(0);
final String referenceType = getReferenceType(type, schemaNode);
schemaNode.removeAll();
schemaNode.put(REF_KEY, referenceType);
} else {
// If there are multiple types, we'll need to convert this to a oneOf.
// For arrays and objects, we do a mutual recursion back into mutateSchemas to upgrade their
// subschemas.
final ArrayNode oneOfOptions = Jsons.arrayNode();
for (final String type : types) {
final ObjectNode option = (ObjectNode) Jsons.emptyObject();
switch (type) {
case "array" -> {
option.put(TYPE_KEY, "array");
copyKey(schemaNode, option, "items");
copyKey(schemaNode, option, "additionalItems");
copyKey(schemaNode, option, "contains");
upgradeSchema(option);
}
case "object" -> {
option.put(TYPE_KEY, "object");
copyKey(schemaNode, option, "properties");
copyKey(schemaNode, option, "patternProperties");
copyKey(schemaNode, option, "additionalProperties");
upgradeSchema(option);
}
default -> {
final String referenceType = getReferenceType(type, schemaNode);
option.put(REF_KEY, referenceType);
}
}
oneOfOptions.add(option);
}
schemaNode.removeAll();
schemaNode.set(ONEOF_KEY, oneOfOptions);
}
}
}
}
/**
* Modifies the schema in-place to downgrade from the new-style $ref declaration to the old-style
* type declaration. Assumes that the schema is an ObjectNode containing a primitive declaration,
* i.e. either something like: {"$ref": "WellKnownTypes..."} or: {"oneOf": [{"$ref":
* "WellKnownTypes..."}, ...]}
* <p>
* In the latter case, the schema may contain subschemas. This method mutually recurses with
* {@link SchemaMigrations#mutateSchemas(Function, Consumer, JsonNode)} to downgrade those
* subschemas.
*
* @param schema An ObjectNode representing a primitive type declaration
*/
private static void downgradeTypeDeclaration(final JsonNode schema) {
if (schema.hasNonNull(REF_KEY)) {
// If this is a direct type declaration, then we can just replace it with the old-style declaration
final String referenceType = schema.get(REF_KEY).asText();
((ObjectNode) schema).removeAll();
((ObjectNode) schema).setAll(JsonSchemaReferenceTypes.REFERENCE_TYPE_TO_OLD_TYPE.get(referenceType));
} else if (schema.hasNonNull(ONEOF_KEY)) {
// If this is a oneOf, then we need to check whether we can recombine it into a single type
// declaration.
// This means we must do three things:
// 1. Downgrade each subschema
// 2. Build a new `type` array, containing the `type` of each subschema
// 3. Combine all the fields in each subschema (properties, items, etc)
// If any two subschemas have the same `type`, or the same field, then we can't combine them, but we
// should still downgrade them.
// See V0ToV1MigrationTest.CatalogDowngradeTest#testDowngradeMultiTypeFields for some examples.
// We'll build up a node containing the combined subschemas.
final ObjectNode replacement = (ObjectNode) Jsons.emptyObject();
// As part of this, we need to build up a list of `type` entries. For ease of access, we'll keep it
// in a List.
final List<String> types = new ArrayList<>();
boolean canRecombineSubschemas = true;
for (final JsonNode subschemaNode : schema.get(ONEOF_KEY)) {
// No matter what - we always need to downgrade the subschema node.
downgradeSchema(subschemaNode);
if (subschemaNode instanceof ObjectNode subschema) {
// If this subschema is an object, then we can attempt to combine it with the other subschemas.
// First, update our list of types.
final JsonNode subschemaType = subschema.get(TYPE_KEY);
if (subschemaType != null) {
if (types.contains(subschemaType.asText())) {
// If another subschema has the same type, then we can't combine them.
canRecombineSubschemas = false;
} else {
types.add(subschemaType.asText());
}
}
// Then, update the combined schema with this subschema's fields.
if (canRecombineSubschemas) {
final Iterator<Entry<String, JsonNode>> fields = subschema.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> field = fields.next();
if (TYPE_KEY.equals(field.getKey())) {
// We're handling the `type` field outside this loop, so ignore it here.
continue;
}
if (replacement.has(field.getKey())) {
// A previous subschema is already using this field, so we should stop trying to combine them.
canRecombineSubschemas = false;
break;
} else {
replacement.set(field.getKey(), field.getValue());
}
}
}
} else {
// If this subschema is a boolean, then the oneOf is doing something funky, and we shouldn't attempt
// to
// combine it into a single type entry
canRecombineSubschemas = false;
}
}
if (canRecombineSubschemas) {
// Update our replacement node with the full list of types
final ArrayNode typeNode = Jsons.arrayNode();
types.forEach(typeNode::add);
replacement.set(TYPE_KEY, typeNode);
// And commit our changes to the actual schema node
((ObjectNode) schema).removeAll();
((ObjectNode) schema).setAll(replacement);
}
}
}
private static void copyKey(final ObjectNode source, final ObjectNode target, final String key) {
if (source.hasNonNull(key)) {
target.set(key, source.get(key));
}
}
/**
* Given a primitive (string/int/num/bool) type declaration _without_ an airbyte_type, get the
* appropriate $ref type. In most cases, this only depends on the "type" key. When type=string, also
* checks the "format" key.
*/
private static String getReferenceType(final String type, final ObjectNode schemaNode) {
return switch (type) {
case "string" -> {
if (schemaNode.hasNonNull("format")) {
yield switch (schemaNode.get("format").asText()) {
case "date" -> JsonSchemaReferenceTypes.DATE_REFERENCE;
// In these two cases, we default to the "with timezone" type, rather than "without timezone".
// This matches existing behavior in normalization.
case "date-time" -> JsonSchemaReferenceTypes.TIMESTAMP_WITH_TIMEZONE_REFERENCE;
case "time" -> JsonSchemaReferenceTypes.TIME_WITH_TIMEZONE_REFERENCE;
// If we don't recognize the format, just use a plain string
default -> JsonSchemaReferenceTypes.STRING_REFERENCE;
};
} else if (schemaNode.hasNonNull("contentEncoding")) {
if ("base64".equals(schemaNode.get("contentEncoding").asText())) {
yield JsonSchemaReferenceTypes.BINARY_DATA_REFERENCE;
} else {
yield JsonSchemaReferenceTypes.STRING_REFERENCE;
}
} else {
yield JsonSchemaReferenceTypes.STRING_REFERENCE;
}
}
case "integer" -> JsonSchemaReferenceTypes.INTEGER_REFERENCE;
case "number" -> JsonSchemaReferenceTypes.NUMBER_REFERENCE;
case "boolean" -> JsonSchemaReferenceTypes.BOOLEAN_REFERENCE;
// This is impossible, because we'll only call this method on string/integer/number/boolean
default -> throw new IllegalStateException("Somehow got non-primitive type: " + type + " for schema: " + schemaNode);
};
}
private static List<JsonNode> getSubschemas(final JsonNode schema, final String key) {
final List<JsonNode> subschemas = new ArrayList<>();
SchemaMigrations.findSubschemas(subschemas, schema, key);
return subschemas;
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.Version;
public interface AirbyteMessageDeserializer<T> {
T deserialize(final JsonNode json);
Version getTargetVersion();
}

View File

@@ -1,28 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.Version;
import lombok.Getter;
public class AirbyteMessageGenericDeserializer<T> implements AirbyteMessageDeserializer<T> {
@Getter
final Version targetVersion;
final Class<T> typeClass;
public AirbyteMessageGenericDeserializer(final Version targetVersion, final Class<T> typeClass) {
this.targetVersion = targetVersion;
this.typeClass = typeClass;
}
@Override
public T deserialize(JsonNode json) {
return Jsons.object(json, typeClass);
}
}

View File

@@ -1,23 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.Version;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
public class AirbyteMessageGenericSerializer<T> implements AirbyteMessageSerializer<T> {
@Getter
private final Version targetVersion;
@Override
public String serialize(T message) {
return Jsons.serialize(message);
}
}

View File

@@ -1,15 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.version.Version;
public interface AirbyteMessageSerializer<T> {
String serialize(final T message);
Version getTargetVersion();
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
public class AirbyteMessageV0Deserializer extends AirbyteMessageGenericDeserializer<AirbyteMessage> {
public AirbyteMessageV0Deserializer() {
super(AirbyteProtocolVersion.V0, AirbyteMessage.class);
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
public class AirbyteMessageV0Serializer extends AirbyteMessageGenericSerializer<AirbyteMessage> {
public AirbyteMessageV0Serializer() {
super(AirbyteProtocolVersion.V0);
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
public class AirbyteMessageV1Deserializer extends AirbyteMessageGenericDeserializer<AirbyteMessage> {
public AirbyteMessageV1Deserializer() {
super(AirbyteProtocolVersion.V1, AirbyteMessage.class);
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.AirbyteMessage;
public class AirbyteMessageV1Serializer extends AirbyteMessageGenericSerializer<AirbyteMessage> {
public AirbyteMessageV1Serializer() {
super(AirbyteProtocolVersion.V1);
}
}

View File

@@ -1,139 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class AirbyteMessageMigratorTest {
static final Version v0 = new Version("0.0.0");
static final Version v1 = new Version("1.0.0");
static final Version v2 = new Version("2.0.0");
record ObjectV0(String name0) {}
record ObjectV1(String name1) {}
record ObjectV2(String name2) {}
static class Migrate0to1 implements AirbyteMessageMigration<ObjectV0, ObjectV1> {
@Override
public ObjectV0 downgrade(ObjectV1 message, Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return new ObjectV0(message.name1);
}
@Override
public ObjectV1 upgrade(ObjectV0 message, Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return new ObjectV1(message.name0);
}
@Override
public Version getPreviousVersion() {
return v0;
}
@Override
public Version getCurrentVersion() {
return v1;
}
}
static class Migrate1to2 implements AirbyteMessageMigration<ObjectV1, ObjectV2> {
@Override
public ObjectV1 downgrade(ObjectV2 message, Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return new ObjectV1(message.name2);
}
@Override
public ObjectV2 upgrade(ObjectV1 message, Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return new ObjectV2(message.name1);
}
@Override
public Version getPreviousVersion() {
return v1;
}
@Override
public Version getCurrentVersion() {
return v2;
}
}
AirbyteMessageMigrator migrator;
@BeforeEach
void beforeEach() {
migrator = new AirbyteMessageMigrator(
List.of(new Migrate0to1(), new Migrate1to2()));
migrator.initialize();
}
@Test
void testDowngrade() {
final ObjectV2 obj = new ObjectV2("my name");
final ObjectV0 objDowngradedTo0 = migrator.downgrade(obj, v0, Optional.empty());
assertEquals(obj.name2, objDowngradedTo0.name0);
final ObjectV1 objDowngradedTo1 = migrator.downgrade(obj, v1, Optional.empty());
assertEquals(obj.name2, objDowngradedTo1.name1);
final ObjectV2 objDowngradedTo2 = migrator.downgrade(obj, v2, Optional.empty());
assertEquals(obj.name2, objDowngradedTo2.name2);
}
@Test
void testUpgrade() {
final ObjectV0 obj0 = new ObjectV0("my name 0");
final ObjectV2 objUpgradedFrom0 = migrator.upgrade(obj0, v0, Optional.empty());
assertEquals(obj0.name0, objUpgradedFrom0.name2);
final ObjectV1 obj1 = new ObjectV1("my name 1");
final ObjectV2 objUpgradedFrom1 = migrator.upgrade(obj1, v1, Optional.empty());
assertEquals(obj1.name1, objUpgradedFrom1.name2);
final ObjectV2 obj2 = new ObjectV2("my name 2");
final ObjectV2 objUpgradedFrom2 = migrator.upgrade(obj2, v2, Optional.empty());
assertEquals(obj2.name2, objUpgradedFrom2.name2);
}
@Test
void testUnsupportedDowngradeShouldFailExplicitly() {
assertThrows(RuntimeException.class, () -> {
migrator.downgrade(new ObjectV2("woot"), new Version("5.0.0"), Optional.empty());
});
}
@Test
void testUnsupportedUpgradeShouldFailExplicitly() {
assertThrows(RuntimeException.class, () -> {
migrator.upgrade(new ObjectV0("woot"), new Version("4.0.0"), Optional.empty());
});
}
@Test
void testRegisterCollisionsShouldFail() {
assertThrows(RuntimeException.class, () -> {
migrator = new AirbyteMessageMigrator(
List.of(new Migrate0to1(), new Migrate1to2(), new Migrate0to1()));
migrator.initialize();
});
}
}

View File

@@ -1,86 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageSerializer;
import io.airbyte.commons.version.Version;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class AirbyteMessageSerDeProviderTest {
AirbyteMessageSerDeProvider serDeProvider;
AirbyteMessageDeserializer<String> deserV0;
AirbyteMessageDeserializer<String> deserV1;
AirbyteMessageSerializer<String> serV0;
AirbyteMessageSerializer<String> serV1;
@BeforeEach
void beforeEach() {
serDeProvider = new AirbyteMessageSerDeProvider();
deserV0 = buildDeserializer(new Version("0.1.0"));
deserV1 = buildDeserializer(new Version("1.1.0"));
serDeProvider.registerDeserializer(deserV0);
serDeProvider.registerDeserializer(deserV1);
serV0 = buildSerializer(new Version("0.2.0"));
serV1 = buildSerializer(new Version("1.0.0"));
serDeProvider.registerSerializer(serV0);
serDeProvider.registerSerializer(serV1);
}
@Test
void testGetDeserializer() {
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new Version("0.1.0")));
assertEquals(Optional.of(deserV0), serDeProvider.getDeserializer(new Version("0.2.0")));
assertEquals(Optional.of(deserV1), serDeProvider.getDeserializer(new Version("1.1.0")));
assertEquals(Optional.empty(), serDeProvider.getDeserializer(new Version("2.0.0")));
}
@Test
void testGetSerializer() {
assertEquals(Optional.of(serV0), serDeProvider.getSerializer(new Version("0.1.0")));
assertEquals(Optional.of(serV1), serDeProvider.getSerializer(new Version("1.0.0")));
assertEquals(Optional.empty(), serDeProvider.getSerializer(new Version("3.2.0")));
}
@Test
void testRegisterDeserializerShouldFailOnVersionCollision() {
AirbyteMessageDeserializer<?> deser = buildDeserializer(new Version("0.2.0"));
assertThrows(RuntimeException.class, () -> {
serDeProvider.registerDeserializer(deser);
});
}
@Test
void testRegisterSerializerShouldFailOnVersionCollision() {
AirbyteMessageSerializer<?> ser = buildSerializer(new Version("0.5.0"));
assertThrows(RuntimeException.class, () -> {
serDeProvider.registerSerializer(ser);
});
}
private <T> AirbyteMessageDeserializer<T> buildDeserializer(Version version) {
final AirbyteMessageDeserializer<T> deser = mock(AirbyteMessageDeserializer.class);
when(deser.getTargetVersion()).thenReturn(version);
return deser;
}
private <T> AirbyteMessageSerializer<T> buildSerializer(Version version) {
final AirbyteMessageSerializer<T> ser = mock(AirbyteMessageSerializer.class);
when(ser.getTargetVersion()).thenReturn(version);
return ser;
}
}

View File

@@ -1,108 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.migrations.v1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* These depend on the same {@link SchemaMigrationV1} class as
* {@link io.airbyte.commons.protocol.migrations.v1.AirbyteMessageMigrationV1}. So, uh, I didn't
* bother writing a ton of tests for it.
*
* Check out {@link AirbyteMessageMigrationV1} for more comprehensive tests. Theoretically
* SchemaMigrationV1 should have its own set of tests, but for various (development history-related)
* reasons, that would be a lot of work.
*/
class ConfiguredAirbyteCatalogMigrationV1Test {
private ConfiguredAirbyteCatalogMigrationV1 migration;
@BeforeEach
void setup() {
migration = new ConfiguredAirbyteCatalogMigrationV1();
}
@Test
void testVersionMetadata() {
assertEquals("0.3.0", migration.getPreviousVersion().serialize());
assertEquals("1.0.0", migration.getCurrentVersion().serialize());
}
@Test
void testBasicUpgrade() {
// This isn't actually a valid stream schema (since it's not an object)
// but this test case is mostly about preserving the message structure, so it's not super relevant
final io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog downgradedCatalog = new io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog()
.withStreams(List.of(
new io.airbyte.protocol.models.v0.ConfiguredAirbyteStream().withStream(new io.airbyte.protocol.models.v0.AirbyteStream().withJsonSchema(
Jsons.deserialize(
"""
{
"type": "string"
}
""")))));
final ConfiguredAirbyteCatalog upgradedMessage = migration.upgrade(downgradedCatalog);
final ConfiguredAirbyteCatalog expectedMessage = Jsons.deserialize(
"""
{
"streams": [
{
"stream": {
"json_schema": {
"$ref": "WellKnownTypes.json#/definitions/String"
}
}
}
]
}
""",
ConfiguredAirbyteCatalog.class);
assertEquals(expectedMessage, upgradedMessage);
}
@Test
void testBasicDowngrade() {
// This isn't actually a valid stream schema (since it's not an object)
// but this test case is mostly about preserving the message structure, so it's not super relevant
final ConfiguredAirbyteCatalog upgradedCatalog = new ConfiguredAirbyteCatalog()
.withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withJsonSchema(
Jsons.deserialize("""
{
"$ref": "WellKnownTypes.json#/definitions/String"
}
""")))));
final io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog downgradedMessage = migration.downgrade(upgradedCatalog);
final io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog expectedMessage = Jsons.deserialize(
"""
{
"streams": [
{
"stream": {
"json_schema": {
"type": "string"
}
}
}
]
}
""",
io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog.class);
assertEquals(expectedMessage, downgradedMessage);
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
class AirbyteMessageV0SerDeTest {
@Test
void v0SerDeRoundTripTest() throws URISyntaxException {
final AirbyteMessageV0Deserializer deser = new AirbyteMessageV0Deserializer();
final AirbyteMessageV0Serializer ser = new AirbyteMessageV0Serializer();
final AirbyteMessage message = new AirbyteMessage()
.withType(Type.SPEC)
.withSpec(
new ConnectorSpecification()
.withProtocolVersion("0.3.0")
.withDocumentationUrl(new URI("file:///tmp/doc")));
final String serializedMessage = ser.serialize(message);
final AirbyteMessage deserializedMessage = deser.deserialize(Jsons.deserialize(serializedMessage));
assertEquals(message, deserializedMessage);
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.commons.protocol.serde;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
class AirbyteMessageV1SerDeTest {
@Test
void v1SerDeRoundTripTest() throws URISyntaxException {
final AirbyteMessageV1Deserializer deser = new AirbyteMessageV1Deserializer();
final AirbyteMessageV1Serializer ser = new AirbyteMessageV1Serializer();
final AirbyteMessage message = new AirbyteMessage()
.withType(Type.SPEC)
.withSpec(
new ConnectorSpecification()
.withProtocolVersion("1.0.0")
.withDocumentationUrl(new URI("file:///tmp/doc")));
final String serializedMessage = ser.serialize(message);
final AirbyteMessage deserializedMessage = deser.deserialize(Jsons.deserialize(serializedMessage));
assertEquals(message, deserializedMessage);
}
}

View File

@@ -1,86 +0,0 @@
{
"definitions": {
"String": {
"type": "string",
"description": "Arbitrary text"
},
"BinaryData": {
"type": "string",
"description": "Arbitrary binary data. Represented as base64-encoded strings in the JSON transport. In the future, if we support other transports, may be encoded differently.\n",
"pattern": "^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$"
},
"Date": {
"type": "string",
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "RFC 3339\u00a75.6's full-date format, extended with BC era support and (-)Infinity"
},
"TimestampWithTimezone": {
"type": "string",
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support and (-)Infinity. Note that we do _not_ accept Unix epochs here.\n"
},
"TimestampWithoutTimezone": {
"type": "string",
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support and (-)Infinity.\n"
},
"TimeWithTimezone": {
"type": "string",
"pattern": "^\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})$",
"description": "An RFC 3339\u00a75.6 full-time"
},
"TimeWithoutTimezone": {
"type": "string",
"pattern": "^\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?$",
"description": "An RFC 3339\u00a75.6 partial-time"
},
"Number": {
"type": "string",
"oneOf": [
{
"pattern": "-?(0|[0-9]\\d*)(\\.\\d+)?"
},
{
"enum": ["Infinity", "-Infinity", "NaN"]
}
],
"description": "Note the mix of regex validation for normal numbers, and enum validation for special values."
},
"Integer": {
"type": "string",
"oneOf": [
{
"pattern": "-?(0|[0-9]\\d*)"
},
{
"enum": ["Infinity", "-Infinity", "NaN"]
}
]
},
"Boolean": {
"type": "boolean",
"description": "Note the direct usage of a primitive boolean rather than string. Unlike Numbers and Integers, we don't expect unusual values here."
}
}
}

View File

@@ -1,21 +0,0 @@
MIT License
Copyright (c) 2020 Airbyte, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,19 +0,0 @@
plugins {
id 'java-library'
}
java {
compileJava {
options.compilerArgs += "-Xlint:-varargs,-try,-deprecation"
}
compileTestJava {
options.compilerArgs += "-Xlint:-try"
}
}
dependencies {
// Dependencies for this module should be specified in the top-level build.gradle. See readme for more explanation.
// this dependency is an exception to the above rule because it is only used INTERNALLY to the commons library.
implementation 'com.jayway.jsonpath:json-path:2.7.0'
}

View File

@@ -1,11 +0,0 @@
# airbyte-commons
Common java helpers.
This submodule is inherited by all other java modules in the monorepo! It is therefore important that we do not add dependencies to it, as those
dependencies will also be added to every java module. The only dependencies that this module uses are the ones declared in the `build.gradle` at the
root of the Airbyte monorepo. In other words it only uses dependencies that are already shared across all modules. The `dependencies` section of
the `build.gradle` of `airbyte-commons` should always be empty.
For other common java code that needs to be shared across modules that requires additional dependencies, we follow this
convention: `airbyte-commons-<name of lib>`. See for example `airbyte-commons-cli`.

View File

@@ -1,21 +0,0 @@
MIT License
Copyright (c) 2020 Airbyte, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,10 +0,0 @@
plugins {
id "java-library"
}
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation 'com.networknt:json-schema-validator:1.0.72'
// needed so that we can follow $ref when parsing json. jackson does not support this natively.
implementation 'me.andrz.jackson:jackson-json-reference-core:0.3.2'
}

View File

@@ -1,7 +0,0 @@
# airbyte-json-validation
This module contains shared Java code for validating JSON objects.
## Key Files
* `JsonSchemaValidator.java` is the main entrypoint into this library, defining convenience methods for validation.
* `ConfigSchemaValidator.java` is additional sugar to make it easy to validate objects whose schemas are defined in `ConfigSchema`.

View File

@@ -0,0 +1,6 @@
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
implementation 'com.azure:azure-storage-blob:12.12.0'
}

View File

@@ -1,20 +0,0 @@
# Config Models
This module uses `jsonschema2pojo` to generate Java config objects from [json schema](https://json-schema.org/) definitions. See [build.gradle](./build.gradle) for details.
## How to use
- Update json schema under:
```
src/main/resources/types/
```
- Run the following command under the project root:
```sh
./gradlew airbyte-cdk:java:airbyte-cdk:config-models-oss:generateJsonSchema2Pojo
```
The generated file is under:
```
build/generated/src/gen/java/io/airbyte/config/
```
## Reference
- [`jsonschema2pojo` plugin](https://github.com/joelittlejohn/jsonschema2pojo/tree/master/jsonschema2pojo-gradle-plugin).

View File

@@ -1,36 +0,0 @@
import org.jsonschema2pojo.SourceType
plugins {
id "java-library"
id "com.github.eirnym.js2p" version "1.0"
}
java {
compileJava {
options.compilerArgs += "-Xlint:-unchecked"
}
}
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
}
jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/types")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
targetPackage = 'io.airbyte.configoss'
useLongIntegers = true
removeOldOutput = true
generateBuilders = true
includeConstructors = false
includeSetters = true
serializable = true
}
tasks.register('generate').configure {
dependsOn tasks.named('generateJsonSchema2Pojo')
}

View File

@@ -1,105 +1,48 @@
java { java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava { compileJava {
options.compilerArgs += "-Xlint:-deprecation,-try,-rawtypes,-overloads,-cast,-unchecked" options.compilerArgs += "-Xlint:-deprecation,-try,-rawtypes,-overloads"
} }
compileTestJava { compileTestJava {
options.compilerArgs += "-Xlint:-try,-divzero,-cast" options.compilerArgs += "-Xlint:-try,-divzero,-cast"
} }
} compileTestFixturesJava {
options.compilerArgs += "-Xlint:-cast,-deprecation"
configurations.all {
resolutionStrategy {
// TODO: Diagnose conflicting dependencies and remove these force overrides:
force 'org.mockito:mockito-core:4.6.1'
} }
} }
dependencies { dependencies {
// Exported dependencies from upstream projects
api libs.airbyte.protocol
api libs.hikaricp
api libs.jooq
api libs.jooq.meta
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-api') api 'com.datadoghq:dd-trace-api:1.28.0'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons') api 'com.datadoghq:dd-trace-ot:1.28.0'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli') api 'com.zaxxer:HikariCP:5.1.0'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss') api 'org.jooq:jooq:3.16.23'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation') api 'org.apache.commons:commons-csv:1.10.0'
testCompileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli') implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
// SSH dependencies
implementation 'net.i2p.crypto:eddsa:0.3.0'
// First party test dependencies
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:db-sources'))
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
testImplementation libs.bundles.junit
testImplementation libs.junit.jupiter.api
testImplementation libs.junit.jupiter.params
testImplementation 'org.junit.platform:junit-platform-launcher:1.7.0'
testImplementation libs.junit.jupiter.engine
implementation libs.jooq
implementation 'net.sourceforge.argparse4j:argparse4j:0.8.1'
implementation "io.aesy:datasize:1.0.0"
implementation libs.apache.commons
implementation libs.apache.commons.lang
testImplementation 'commons-lang:commons-lang:2.6'
implementation 'commons-cli:commons-cli:1.4' implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.commons:commons-csv:1.4' implementation 'io.aesy:datasize:1.0.0'
implementation 'net.i2p.crypto:eddsa:0.3.0'
// Optional dependencies implementation 'org.apache.httpcomponents:httpcore:4.4.16'
// TODO: Change these to 'compileOnly' or 'testCompileOnly' implementation 'org.apache.logging.log4j:log4j-layout-template-json:2.17.2'
implementation 'com.azure:azure-storage-blob:12.12.0'
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation libs.postgresql
// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb
api libs.bundles.datadog
implementation 'org.apache.sshd:sshd-mina:2.11.0' implementation 'org.apache.sshd:sshd-mina:2.11.0'
implementation libs.testcontainers
implementation libs.testcontainers.mysql
implementation libs.testcontainers.jdbc
implementation libs.testcontainers.postgresql
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.postgresql
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
// bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java // bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java
// because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation // because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation
implementation 'org.bouncycastle:bcpkix-jdk15on:1.66' implementation 'org.bouncycastle:bcpkix-jdk15on:1.66'
implementation 'org.bouncycastle:bcprov-jdk15on:1.66' implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
implementation 'org.bouncycastle:bctls-jdk15on:1.66' implementation 'org.bouncycastle:bctls-jdk15on:1.66'
// Lombok testFixturesApi 'org.testcontainers:testcontainers:1.19.0'
implementation 'org.projectlombok:lombok:1.18.20' testFixturesApi 'org.testcontainers:jdbc:1.19.0'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
testImplementation libs.junit.jupiter.system.stubs testImplementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:db-sources'))
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))
implementation libs.jackson.annotations testImplementation 'mysql:mysql-connector-java:8.0.33'
implementation group: 'org.apache.logging.log4j', name: 'log4j-layout-template-json', version: '2.17.2' testImplementation 'org.postgresql:postgresql:42.6.0'
testImplementation 'org.testcontainers:mysql:1.19.0'
testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation 'org.testcontainers:postgresql:1.19.0'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4' testImplementation 'org.xbib.elasticsearch:joptsimple:6.3.2.1'
testImplementation 'org.mockito:mockito-core:4.6.1'
} }

View File

@@ -28,8 +28,8 @@ import java.time.OffsetDateTime;
import java.time.OffsetTime; import java.time.OffsetTime;
import java.time.chrono.IsoEra; import java.time.chrono.IsoEra;
import java.time.format.DateTimeParseException; import java.time.format.DateTimeParseException;
import java.util.Base64;
import java.util.Collections; import java.util.Collections;
import javax.xml.bind.DatatypeConverter;
/** /**
* Source operation skeleton for JDBC compatible databases. * Source operation skeleton for JDBC compatible databases.
@@ -222,7 +222,7 @@ public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implement
} }
protected void setBinary(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { protected void setBinary(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseBase64Binary(value)); preparedStatement.setBytes(parameterIndex, Base64.getDecoder().decode(value));
} }
protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException { protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException {

View File

@@ -4,8 +4,6 @@
package io.airbyte.cdk.integrations; package io.airbyte.cdk.integrations;
import static org.postgresql.PGProperty.CONNECT_TIMEOUT;
import io.airbyte.cdk.db.factory.DatabaseDriver; import io.airbyte.cdk.db.factory.DatabaseDriver;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
@@ -15,6 +13,9 @@ import java.util.Optional;
public abstract class JdbcConnector extends BaseConnector { public abstract class JdbcConnector extends BaseConnector {
public static final String POSTGRES_CONNECT_TIMEOUT_KEY = "connectTimeout";
public static final Duration POSTGRES_CONNECT_TIMEOUT_DEFAULT_DURATION = Duration.ofSeconds(10);
public static final String CONNECT_TIMEOUT_KEY = "connectTimeout"; public static final String CONNECT_TIMEOUT_KEY = "connectTimeout";
public static final Duration CONNECT_TIMEOUT_DEFAULT = Duration.ofSeconds(60); public static final Duration CONNECT_TIMEOUT_DEFAULT = Duration.ofSeconds(60);
@@ -44,8 +45,8 @@ public abstract class JdbcConnector extends BaseConnector {
*/ */
public static Duration getConnectionTimeout(final Map<String, String> connectionProperties, String driverClassName) { public static Duration getConnectionTimeout(final Map<String, String> connectionProperties, String driverClassName) {
final Optional<Duration> parsedConnectionTimeout = switch (DatabaseDriver.findByDriverClassName(driverClassName)) { final Optional<Duration> parsedConnectionTimeout = switch (DatabaseDriver.findByDriverClassName(driverClassName)) {
case POSTGRESQL -> maybeParseDuration(connectionProperties.get(CONNECT_TIMEOUT.getName()), ChronoUnit.SECONDS) case POSTGRESQL -> maybeParseDuration(connectionProperties.get(POSTGRES_CONNECT_TIMEOUT_KEY), ChronoUnit.SECONDS)
.or(() -> maybeParseDuration(CONNECT_TIMEOUT.getDefaultValue(), ChronoUnit.SECONDS)); .or(() -> Optional.of(POSTGRES_CONNECT_TIMEOUT_DEFAULT_DURATION));
case MYSQL -> maybeParseDuration(connectionProperties.get("connectTimeout"), ChronoUnit.MILLIS); case MYSQL -> maybeParseDuration(connectionProperties.get("connectTimeout"), ChronoUnit.MILLIS);
case MSSQLSERVER -> maybeParseDuration(connectionProperties.get("loginTimeout"), ChronoUnit.SECONDS); case MSSQLSERVER -> maybeParseDuration(connectionProperties.get("loginTimeout"), ChronoUnit.SECONDS);
default -> maybeParseDuration(connectionProperties.get(CONNECT_TIMEOUT_KEY), ChronoUnit.SECONDS) default -> maybeParseDuration(connectionProperties.get(CONNECT_TIMEOUT_KEY), ChronoUnit.SECONDS)

View File

@@ -12,8 +12,8 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@@ -10,8 +10,6 @@ import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.v0.ConnectorSpecification; import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.testcontainers.containers.Container;
public class SshHelpers { public class SshHelpers {
@@ -40,30 +38,4 @@ public class SshHelpers {
return originalSpec; return originalSpec;
} }
/**
* Returns the inner docker network ip address and port of a container. This can be used to reach a
* container from another container running on the same network
*
* @param container container
* @return a pair of host and port
*/
public static ImmutablePair<String, Integer> getInnerContainerAddress(final Container container) {
return ImmutablePair.of(
container.getContainerInfo().getNetworkSettings().getNetworks().entrySet().stream().findFirst().get().getValue().getIpAddress(),
(Integer) container.getExposedPorts().stream().findFirst().get());
}
/**
* Returns the outer docker network ip address and port of a container. This can be used to reach a
* container from the host machine
*
* @param container container
* @return a pair of host and port
*/
public static ImmutablePair<String, Integer> getOuterContainerAddress(final Container container) {
return ImmutablePair.of(
container.getHost(),
container.getFirstMappedPort());
}
} }

View File

@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import javax.validation.constraints.NotNull;
import org.apache.sshd.client.SshClient; import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.client.session.ClientSession;
@@ -33,7 +34,6 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.security.SecurityUtils; import org.apache.sshd.common.util.security.SecurityUtils;
import org.apache.sshd.core.CoreModuleProperties; import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.forward.AcceptAllForwardingFilter; import org.apache.sshd.server.forward.AcceptAllForwardingFilter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
*/ */
public interface SqlOperations { public interface SqlOperations {
Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class); Logger LOGGER = LoggerFactory.getLogger(SqlOperations.class);
/** /**
* Create a schema with provided name if it does not already exist. * Create a schema with provided name if it does not already exist.

View File

@@ -7,9 +7,9 @@ package io.airbyte.cdk.integrations.destination.staging;
import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer; import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.joda.time.DateTime;
/** /**
* Staging operations focuses on the SQL queries that are needed to success move data into a staging * Staging operations focuses on the SQL queries that are needed to success move data into a staging
@@ -27,7 +27,7 @@ public interface StagingOperations extends SqlOperations {
* raw table). Not all destinations use the table name in the staging path (e.g. Snowflake * raw table). Not all destinations use the table name in the staging path (e.g. Snowflake
* simply uses a timestamp + UUID), but e.g. Redshift does rely on this to ensure uniqueness. * simply uses a timestamp + UUID), but e.g. Redshift does rely on this to ensure uniqueness.
*/ */
String getStagingPath(UUID connectionId, String namespace, String streamName, String outputTableName, DateTime writeDatetime); String getStagingPath(UUID connectionId, String namespace, String streamName, String outputTableName, Instant writeDatetime);
/** /**
* Returns the staging environment's name * Returns the staging environment's name

View File

@@ -1 +1 @@
version=0.18.0 version=0.19.0

View File

@@ -39,7 +39,6 @@ class CommonDatabaseCheckTest {
@AfterEach @AfterEach
void cleanup() throws Exception { void cleanup() throws Exception {
DataSourceFactory.close(dataSource); DataSourceFactory.close(dataSource);
dslContext.close();
container.stop(); container.stop();
} }

View File

@@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@@ -51,7 +51,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@@ -1,23 +0,0 @@
create table "public"."airbyte_toy_migrations"(
"installed_rank" int4 not null,
"version" varchar(50) null,
"description" varchar(200) not null,
"type" varchar(20) not null,
"script" varchar(1000) not null,
"checksum" int4 null,
"installed_by" varchar(100) not null,
"installed_on" timestamp(29) not null default null,
"execution_time" int4 not null,
"success" bool not null,
constraint "airbyte_toy_migrations_pk"
primary key ("installed_rank")
);
create table "public"."toy_cars"(
"id" int8 generated by default as identity not null,
"value" varchar(50) null,
constraint "toy_cars_pkey"
primary key ("id")
);
create unique index "airbyte_toy_migrations_pk" on "public"."airbyte_toy_migrations"("installed_rank" asc);
create index "airbyte_toy_migrations_s_idx" on "public"."airbyte_toy_migrations"("success" asc);
create unique index "toy_cars_pkey" on "public"."toy_cars"("id" asc);

View File

@@ -1,6 +0,0 @@
CREATE
TABLE
IF NOT EXISTS TOY_CARS(
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
value VARCHAR(50)
);

View File

@@ -1,24 +0,0 @@
create table "public"."airbyte_toy_migrations"(
"installed_rank" int4 not null,
"version" varchar(50) null,
"description" varchar(200) not null,
"type" varchar(20) not null,
"script" varchar(1000) not null,
"checksum" int4 null,
"installed_by" varchar(100) not null,
"installed_on" timestamp(29) not null default null,
"execution_time" int4 not null,
"success" bool not null,
constraint "airbyte_toy_migrations_pk"
primary key ("installed_rank")
);
create table "public"."toy_cars"(
"id" int8 generated by default as identity not null,
"value" varchar(50) null,
"created_at" timestamp(29) not null default null,
constraint "toy_cars_pkey"
primary key ("id")
);
create unique index "airbyte_toy_migrations_pk" on "public"."airbyte_toy_migrations"("installed_rank" asc);
create index "airbyte_toy_migrations_s_idx" on "public"."airbyte_toy_migrations"("success" asc);
create unique index "toy_cars_pkey" on "public"."toy_cars"("id" asc);

View File

@@ -4,8 +4,6 @@
package io.airbyte.cdk.integrations.base.ssh; package io.airbyte.cdk.integrations.base.ssh;
import static io.airbyte.cdk.integrations.base.ssh.SshHelpers.getInnerContainerAddress;
import static io.airbyte.cdk.integrations.base.ssh.SshHelpers.getOuterContainerAddress;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
@@ -16,6 +14,8 @@ import io.airbyte.commons.json.Jsons;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.Network; import org.testcontainers.containers.Network;
@@ -98,4 +98,30 @@ public class SshBastionContainer implements AutoCloseable {
return bastion; return bastion;
} }
/**
* Returns the inner docker network ip address and port of a container. This can be used to reach a
* container from another container running on the same network
*
* @param container container
* @return a pair of host and port
*/
public static ImmutablePair<String, Integer> getInnerContainerAddress(final Container container) {
return ImmutablePair.of(
container.getContainerInfo().getNetworkSettings().getNetworks().entrySet().stream().findFirst().get().getValue().getIpAddress(),
(Integer) container.getExposedPorts().stream().findFirst().get());
}
/**
* Returns the outer docker network ip address and port of a container. This can be used to reach a
* container from the host machine
*
* @param container container
* @return a pair of host and port
*/
public static ImmutablePair<String, Integer> getOuterContainerAddress(final Container container) {
return ImmutablePair.of(
container.getHost(),
container.getFirstMappedPort());
}
} }

View File

@@ -4,8 +4,6 @@
package io.airbyte.cdk.integrations.util; package io.airbyte.cdk.integrations.util;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.Objects;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
@@ -23,13 +21,6 @@ public class HostPortResolver {
return getIpAddress(container); return getIpAddress(container);
} }
public static String encodeValue(final String value) {
if (value != null) {
return URLEncoder.encode(value, StandardCharsets.UTF_8);
}
return null;
}
private static String getIpAddress(GenericContainer container) { private static String getIpAddress(GenericContainer container) {
return Objects.requireNonNull(container.getContainerInfo() return Objects.requireNonNull(container.getContainerInfo()
.getNetworkSettings() .getNetworkSettings()

View File

@@ -233,7 +233,6 @@ abstract public class TestDatabase<C extends JdbcDatabaseContainer<?>, T extends
@Override @Override
public void close() { public void close() {
execSQL(this.cleanupSQL.stream()); execSQL(this.cleanupSQL.stream());
dslContext.close();
execInContainer(inContainerUndoBootstrapCmd()); execInContainer(inContainerUndoBootstrapCmd());
} }

View File

@@ -0,0 +1,7 @@
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
api 'com.google.cloud:google-cloud-bigquery:2.37.0'
}

View File

@@ -0,0 +1,15 @@
java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-unchecked"
}
}
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
api 'org.mongodb:mongodb-driver-sync:4.10.2'
testFixturesApi 'org.testcontainers:mongodb:1.19.0'
}

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.integrations.destination.mongodb; package io.airbyte.cdk.db.mongodb;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.integrations.destination.mongodb.exception; package io.airbyte.cdk.db.mongodb;
public class MongodbDatabaseException extends RuntimeException { public class MongoDatabaseException extends RuntimeException {
public static final String MONGO_DATA_BASE_NOT_FOUND = "Data Base with given name - %s not found."; public static final String MONGO_DATA_BASE_NOT_FOUND = "Data Base with given name - %s not found.";
public MongodbDatabaseException(final String databaseName) { public MongoDatabaseException(final String databaseName) {
super(String.format(MONGO_DATA_BASE_NOT_FOUND, databaseName)); super(String.format(MONGO_DATA_BASE_NOT_FOUND, databaseName));
} }

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.integrations.destination.mongodb; package io.airbyte.cdk.db.mongodb;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.bson.BsonType.ARRAY; import static org.bson.BsonType.ARRAY;
@@ -19,7 +19,6 @@ import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.client.util.DateTime;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.mongodb.DBRefCodecProvider; import com.mongodb.DBRefCodecProvider;
@@ -31,6 +30,7 @@ import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators; import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.JsonSchemaType;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@@ -121,8 +121,8 @@ public class MongoUtils {
case INT64 -> new BsonInt64(Long.parseLong(value)); case INT64 -> new BsonInt64(Long.parseLong(value));
case DOUBLE -> new BsonDouble(Double.parseDouble(value)); case DOUBLE -> new BsonDouble(Double.parseDouble(value));
case DECIMAL128 -> Decimal128.parse(value); case DECIMAL128 -> Decimal128.parse(value);
case TIMESTAMP -> new BsonTimestamp(new DateTime(value).getValue()); case TIMESTAMP -> new BsonTimestamp((int) Instant.parse(value).getEpochSecond(), 0);
case DATE_TIME -> new BsonDateTime(new DateTime(value).getValue()); case DATE_TIME -> new BsonDateTime(Instant.parse(value).toEpochMilli());
case OBJECT_ID -> new ObjectId(value); case OBJECT_ID -> new ObjectId(value);
case SYMBOL -> new Symbol(value); case SYMBOL -> new Symbol(value);
case STRING -> new BsonString(value); case STRING -> new BsonString(value);

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.integrations.destination.mongodb; package io.airbyte.cdk.db.mongodb;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.integrations.destination.mongodb; package io.airbyte.cdk.db.mongodb;
import static io.airbyte.integrations.destination.mongodb.MongoUtils.AIRBYTE_SUFFIX; import static io.airbyte.cdk.db.mongodb.MongoUtils.AIRBYTE_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;

View File

@@ -0,0 +1,8 @@
dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
api 'org.postgresql:postgresql:42.6.0'
testFixturesApi 'org.testcontainers:postgresql:1.19.0'
}

View File

@@ -1,93 +1,28 @@
java { java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava { compileJava {
options.compilerArgs += "-Xlint:-deprecation" options.compilerArgs += "-Xlint:-deprecation,-removal"
}
compileTestFixturesJava {
options.compilerArgs += "-Xlint:-try"
} }
} }
dependencies { dependencies {
// Depends on core CDK classes (OK 👍) api 'org.apache.commons:commons-csv:1.10.0'
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core') implementation project(':airbyte-cdk:java:airbyte-cdk:core')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
implementation 'io.aesy:datasize:1.0.0'
testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:dependencies'))
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:core')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:core'))
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping') testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')) testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping'))
compileOnly project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:acceptance-test-harness')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-api')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
implementation libs.bundles.junit
// implementation libs.junit.jupiter.api
implementation libs.junit.jupiter.params
implementation 'org.junit.platform:junit-platform-launcher:1.7.0'
implementation libs.jooq
testImplementation libs.junit.jupiter.engine
implementation 'net.sourceforge.argparse4j:argparse4j:0.8.1'
implementation "io.aesy:datasize:1.0.0"
implementation libs.apache.commons
implementation libs.apache.commons.lang
testImplementation 'commons-lang:commons-lang:2.6'
implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.commons:commons-csv:1.4'
implementation libs.google.cloud.storage
// Optional dependencies
// TODO: Change these to 'compileOnly' or 'testCompileOnly'
implementation 'com.azure:azure-storage-blob:12.12.0'
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'
implementation libs.postgresql
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
// testImplementation libs.junit.jupiter.api
implementation libs.hikaricp
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.sqlserver
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb
implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
implementation 'org.apache.sshd:sshd-mina:2.8.0'
implementation libs.testcontainers
implementation libs.testcontainers.mysql
implementation libs.testcontainers.jdbc
implementation libs.testcontainers.postgresql
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.postgresql
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation ('org.apache.hadoop:hadoop-common:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.3') {exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.slf4j', module: 'slf4j-reload4j'}
testImplementation libs.junit.jupiter.system.stubs
} }

View File

@@ -5,8 +5,7 @@
package io.airbyte.cdk.integrations.destination.jdbc; package io.airbyte.cdk.integrations.destination.jdbc;
import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.DestinationSyncMode;
import org.joda.time.DateTime; import java.time.Instant;
import org.joda.time.DateTimeZone;
/** /**
* Write configuration POJO (plain old java object) for all destinations extending * Write configuration POJO (plain old java object) for all destinations extending
@@ -20,7 +19,7 @@ public class WriteConfig {
private final String tmpTableName; private final String tmpTableName;
private final String outputTableName; private final String outputTableName;
private final DestinationSyncMode syncMode; private final DestinationSyncMode syncMode;
private final DateTime writeDatetime; private final Instant writeDatetime;
public WriteConfig(final String streamName, public WriteConfig(final String streamName,
final String namespace, final String namespace,
@@ -28,7 +27,7 @@ public class WriteConfig {
final String tmpTableName, final String tmpTableName,
final String outputTableName, final String outputTableName,
final DestinationSyncMode syncMode) { final DestinationSyncMode syncMode) {
this(streamName, namespace, outputSchemaName, tmpTableName, outputTableName, syncMode, DateTime.now(DateTimeZone.UTC)); this(streamName, namespace, outputSchemaName, tmpTableName, outputTableName, syncMode, Instant.now());
} }
public WriteConfig(final String streamName, public WriteConfig(final String streamName,
@@ -37,7 +36,7 @@ public class WriteConfig {
final String tmpTableName, final String tmpTableName,
final String outputTableName, final String outputTableName,
final DestinationSyncMode syncMode, final DestinationSyncMode syncMode,
final DateTime writeDatetime) { final Instant writeDatetime) {
this.streamName = streamName; this.streamName = streamName;
this.namespace = namespace; this.namespace = namespace;
this.outputSchemaName = outputSchemaName; this.outputSchemaName = outputSchemaName;
@@ -77,7 +76,7 @@ public class WriteConfig {
return syncMode; return syncMode;
} }
public DateTime getWriteDatetime() { public Instant getWriteDatetime() {
return writeDatetime; return writeDatetime;
} }

View File

@@ -24,12 +24,11 @@ import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -49,7 +48,7 @@ public class SerialStagingConsumerFactory {
// in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead. // in a previous attempt but failed to load to the warehouse for some reason (interrupted?) instead.
// This would also allow other programs/scripts // This would also allow other programs/scripts
// to load (or reload backups?) in the connection's staging area to be loaded at the next sync. // to load (or reload backups?) in the connection's staging area to be loaded at the next sync.
private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC); private static final Instant SYNC_DATETIME = Instant.now();
public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID(); public static final UUID RANDOM_CONNECTION_ID = UUID.randomUUID();
public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector, public AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,

View File

@@ -82,7 +82,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
@@ -1549,7 +1548,7 @@ public abstract class DestinationAcceptanceTest {
while (true) { while (true) {
System.out.println( System.out.println(
"currentStreamNumber=" + currentStreamNumber + ", currentRecordNumberForStream=" "currentStreamNumber=" + currentStreamNumber + ", currentRecordNumberForStream="
+ currentRecordNumberForStream + ", " + DateTime.now()); + currentRecordNumberForStream + ", " + Instant.now());
try { try {
Thread.sleep(10000); Thread.sleep(10000);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {

View File

@@ -1,37 +1,16 @@
import org.jsonschema2pojo.SourceType import org.jsonschema2pojo.SourceType
import org.jsoup.Jsoup
buildscript {
dependencies {
// from standard-source-test:
classpath 'org.jsoup:jsoup:1.13.1' // for generateSourceTestDocs
}
}
plugins { plugins {
id "com.github.eirnym.js2p" version "1.0" id "com.github.eirnym.js2p" version "1.0"
id 'application'
id 'airbyte-integration-test-java'
id "java-library"
id "java-test-fixtures" // https://docs.gradle.org/current/userguide/java_testing.html#sec:java_test_fixtures
} }
java { java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava { compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes,-unchecked,-removal" options.compilerArgs += "-Xlint:-try,-rawtypes,-unchecked,-removal"
} }
} }
project.configurations {
// From `base-debezium`:
testFixturesImplementation.extendsFrom implementation
// From source-jdbc
testFixturesImplementation.extendsFrom implementation
testFixturesRuntimeOnly.extendsFrom runtimeOnly
}
// Convert yaml to java: relationaldb.models // Convert yaml to java: relationaldb.models
jsonSchema2Pojo { jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA sourceType = SourceType.YAMLSCHEMA
@@ -48,154 +27,28 @@ jsonSchema2Pojo {
} }
dependencies { dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core') implementation project(':airbyte-cdk:java:airbyte-cdk:core')
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:acceptance-test-harness')
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons') implementation 'io.debezium:debezium-api:2.4.0.Final'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-cli') implementation 'io.debezium:debezium-embedded:2.4.0.Final'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss') implementation 'org.codehaus.plexus:plexus-utils:4.0.0'
compileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons') testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:config-models-oss') testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:dependencies'))
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:core')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:core'))
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons') testFixturesImplementation 'net.sourceforge.argparse4j:argparse4j:0.9.0'
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:airbyte-api') testFixturesImplementation 'io.swagger:swagger-annotations:1.6.13'
testFixturesCompileOnly project(':airbyte-cdk:java:airbyte-cdk:config-models-oss') testFixturesImplementation 'org.hamcrest:hamcrest-all:1.3'
testFixturesImplementation 'org.junit.platform:junit-platform-launcher:1.10.1'
testFixturesImplementation "org.hamcrest:hamcrest-all:1.3"
implementation libs.bundles.junit testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:core'))
// implementation libs.junit.jupiter.api testImplementation project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres')
implementation libs.junit.jupiter.params testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))
implementation 'org.junit.platform:junit-platform-launcher:1.7.0'
implementation libs.jooq
testImplementation libs.junit.jupiter.engine
implementation 'net.sourceforge.argparse4j:argparse4j:0.8.1'
implementation "io.aesy:datasize:1.0.0"
implementation libs.apache.commons
implementation libs.apache.commons.lang
testImplementation 'commons-lang:commons-lang:2.6'
implementation 'commons-cli:commons-cli:1.4'
implementation 'org.apache.commons:commons-csv:1.4'
// Optional dependencies testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
// TODO: Change these to 'compileOnly' or 'testCompileOnly'
implementation libs.hikaricp
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
implementation 'org.apache.sshd:sshd-mina:2.8.0'
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.mssqlserver
testImplementation libs.testcontainers.postgresql
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
testFixturesImplementation 'org.projectlombok:lombok:1.18.20'
testFixturesAnnotationProcessor 'org.projectlombok:lombok:1.18.20'
testImplementation libs.junit.jupiter.system.stubs
// From `base-debezium`:
// implementation project(':airbyte-db:db-lib')
// testFixturesImplementation project(':airbyte-db:db-lib')
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2'
// From source-jdbc
implementation 'org.apache.commons:commons-lang3:3.11'
testImplementation libs.postgresql
integrationTestJavaImplementation libs.testcontainers.postgresql
testFixturesImplementation libs.airbyte.protocol
// todo (cgardens) - the java-test-fixtures plugin doesn't by default extend from test.
// we cannot make it depend on the dependencies of source-jdbc:test, because source-jdbc:test
// is going to depend on these fixtures. need to find a way to get fixtures to inherit the
// common test classes without duplicating them. this should be part of whatever solution we
// decide on for a "test-java-lib". the current implementation is leveraging the existing
// plugin, but we can something different if we don't like this tool.
testFixturesRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
testFixturesImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '4.0.0'
// From `standard-source-test`:
testFixturesImplementation 'org.mockito:mockito-core:4.6.1'
testFixturesRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.8.1'
// From `airbyte-test-utils`:
// api project(':airbyte-db:db-lib')
testFixturesImplementation 'io.fabric8:kubernetes-client:5.12.2'
testFixturesImplementation libs.temporal.sdk
testFixturesApi libs.junit.jupiter.api
// Mark as compile only to avoid leaking transitively to connectors
testFixturesCompileOnly libs.testcontainers.jdbc
testFixturesCompileOnly libs.testcontainers.postgresql
testFixturesCompileOnly libs.testcontainers.mssqlserver
testFixturesCompileOnly libs.testcontainers.cockroachdb
testFixturesImplementation libs.testcontainers.cockroachdb
}
def getFullPath(String className) {
def matchingFiles = project.fileTree("src/testFixtures/java")
.filter { file -> file.getName().equals("${className}.java".toString()) }.asCollection()
if (matchingFiles.size() == 0) {
throw new IllegalArgumentException("Ambiguous class name ${className}: no file found.")
}
if (matchingFiles.size() > 1) {
throw new IllegalArgumentException("Ambiguous class name ${className}: more than one matching file was found. Files found: ${matchingFiles}")
}
def absoluteFilePath = matchingFiles[0].toString()
def pathInPackage = project.relativePath(absoluteFilePath.toString()).replaceAll("src/testFixtures/java/", "").replaceAll("\\.java", "")
return pathInPackage
}
def generateSourceTestDocs = tasks.register('generateSourceTestDocs', Javadoc) {
def javadocOutputDir = project.file("${project.buildDir}/docs/testFixturesJavadoc")
options.addStringOption('Xdoclint:none', '-quiet')
classpath = sourceSets.testFixtures.compileClasspath
source = sourceSets.testFixtures.allJava
destinationDir = javadocOutputDir
doLast {
def className = "SourceAcceptanceTest"
// this can be made into a list once we have multiple standard tests, and can also be used for destinations
def pathInPackage = getFullPath(className)
def stdSrcTest = project.file("${javadocOutputDir}/${pathInPackage}.html").readLines().join("\n")
def methodList = Jsoup.parse(stdSrcTest).body().select("section.methodDetails>ul>li>section")
def md = ""
for (methodInfo in methodList) {
def annotations = methodInfo.select(".memberSignature>.annotations").text()
if (!annotations.contains("@Test")) {
continue
}
def methodName = methodInfo.selectFirst("div>span.memberName").text()
def methodDocstring = methodInfo.selectFirst("div.block")
md += "## ${methodName}\n\n"
md += "${methodDocstring != null ? methodDocstring.text().replaceAll(/([()])/, '\\\\$1') : 'No method description was provided'}\n\n"
}
def outputDoc = new File("${rootDir}/docs/connector-development/testing-connectors/standard-source-tests.md")
outputDoc.write "# Standard Source Test Suite\n\n"
outputDoc.append "Test methods start with `test`. Other methods are internal helpers in the java class implementing the test suite.\n\n"
outputDoc.append md
}
outputs.upToDateWhen { false }
}
tasks.register('generate').configure {
dependsOn generateSourceTestDocs
} }

View File

@@ -1,4 +0,0 @@
{
"username": "default",
"jdbc_url": "default"
}

Some files were not shown because too many files have changed in this diff Show More