diff --git a/airbyte-integrations/bases/debezium-v1-4-2/build.gradle b/airbyte-integrations/bases/debezium-v1-4-2/build.gradle deleted file mode 100644 index 6b0c22a5c1f..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/build.gradle +++ /dev/null @@ -1,25 +0,0 @@ -plugins { - id "java-test-fixtures" -} - -project.configurations { - testFixturesImplementation.extendsFrom implementation -} -dependencies { - implementation project(':airbyte-protocol:protocol-models') - implementation project(':airbyte-db:db-lib') - - implementation 'io.debezium:debezium-api:1.4.2.Final' - implementation ('io.debezium:debezium-embedded:1.4.2.Final') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' } - implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final' - implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final' - implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final' - - testFixturesImplementation project(':airbyte-db:db-lib') - testFixturesImplementation project(':airbyte-integrations:bases:base-java') - - testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2' - testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2' - testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2' - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java deleted file mode 100644 index afb6a5555fa..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.commons.util.CompositeIterator; -import io.airbyte.commons.util.MoreIterators; -import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; -import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage; -import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; -import io.airbyte.integrations.debezium.internals.DebeziumRecordIterator; -import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; -import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.SyncMode; -import io.debezium.engine.ChangeEvent; -import java.time.Instant; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants - * to use debezium for CDC, it should use this class - */ -public class AirbyteDebeziumHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class); - /** - * We use 10000 as capacity cause the default queue size and batch size of debezium is : - * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048 - * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192 - */ - private static final int QUEUE_CAPACITY = 10000; - - private final Properties connectorProperties; - private final JsonNode config; - private final CdcTargetPosition targetPosition; - private final ConfiguredAirbyteCatalog catalog; - private final boolean trackSchemaHistory; - - private final LinkedBlockingQueue> queue; - - public AirbyteDebeziumHandler(final JsonNode config, - final CdcTargetPosition targetPosition, - final Properties connectorProperties, - final ConfiguredAirbyteCatalog catalog, - final boolean trackSchemaHistory) { - this.config = config; - this.targetPosition = targetPosition; - this.connectorProperties = connectorProperties; - this.catalog = catalog; - this.trackSchemaHistory = trackSchemaHistory; - this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); - } - - public List> getIncrementalIterators(final CdcSavedInfoFetcher cdcSavedInfoFetcher, - final CdcStateHandler cdcStateHandler, - final CdcMetadataInjector cdcMetadataInjector, - final Instant emittedAt) { - LOGGER.info("using CDC: {}", true); - final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset()); - final Optional schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher); - final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager, - schemaHistoryManager); - publisher.start(queue); - - // handle state machine around pub/sub logic. - final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( - queue, - targetPosition, - publisher::hasClosed, - publisher::close); - - // convert to airbyte message. - final AutoCloseableIterator messageIterator = AutoCloseableIterators - .transform( - eventIterator, - (event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt)); - - // our goal is to get the state at the time this supplier is called (i.e. after all message records - // have been produced) - final Supplier stateMessageSupplier = () -> { - final Map offset = offsetManager.read(); - final String dbHistory = trackSchemaHistory ? schemaHistoryManager - .orElseThrow(() -> new RuntimeException("Schema History Tracking is true but manager is not initialised")).read() : null; - - return cdcStateHandler.saveState(offset, dbHistory); - }; - - // wrap the supplier in an iterator so that we can concat it to the message iterator. - final Iterator stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier); - - // this structure guarantees that the debezium engine will be closed, before we attempt to emit the - // state file. we want this so that we have a guarantee that the debezium offset file (which we use - // to produce the state file) is up-to-date. - final CompositeIterator messageIteratorWithStateDecorator = - AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator)); - - return Collections.singletonList(messageIteratorWithStateDecorator); - } - - private Optional schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) { - if (trackSchemaHistory) { - FilteredFileDatabaseHistory.setDatabaseName(config.get(JdbcUtils.DATABASE_KEY).asText()); - return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(cdcSavedInfoFetcher.getSavedSchemaHistory())); - } - - return Optional.empty(); - } - - public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode) - .anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java deleted file mode 100644 index f5612fe0450..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcMetadataInjector.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * This interface is used to add metadata to the records fetched from the database. For instance, in - * Postgres we add the lsn to the records. In MySql we add the file name and position to the - * records. - */ -public interface CdcMetadataInjector { - - /** - * A debezium record contains multiple pieces. Ref : - * https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events - * - * @param event is the actual record which contains data and would be written to the destination - * @param source contains the metadata about the record and we need to extract that metadata and add - * it to the event before writing it to destination - */ - void addMetaData(ObjectNode event, JsonNode source); - - /** - * As part of Airbyte record we need to add the namespace (schema name) - * - * @param source part of debezium record and contains the metadata about the record. We need to - * extract namespace out of this metadata and return Ref : - * https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events - */ - String namespace(JsonNode source); - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java deleted file mode 100644 index a0efa36f05a..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcSavedInfoFetcher.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import com.fasterxml.jackson.databind.JsonNode; -import java.util.Optional; - -/** - * This interface is used to fetch the saved info required for debezium to run incrementally. Each - * connector saves offset and schema history in different manner - */ -public interface CdcSavedInfoFetcher { - - JsonNode getSavedOffset(); - - Optional getSavedSchemaHistory(); - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java deleted file mode 100644 index 7b76186fc9c..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcStateHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import io.airbyte.protocol.models.AirbyteMessage; -import java.util.Map; - -/** - * This interface is used to allow connectors to save the offset and schema history in the manner - * which suits them - */ -@FunctionalInterface -public interface CdcStateHandler { - - AirbyteMessage saveState(Map offset, String dbHistory); - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java deleted file mode 100644 index 47209ada28f..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * This interface is used to define the target position at the beginning of the sync so that once we - * reach the desired target, we can shutdown the sync. This is needed because it might happen that - * while we are syncing the data, new changes are being made in the source database and as a result - * we might end up syncing forever. In order to tackle that, we need to define a point to end at the - * beginning of the sync - */ -public interface CdcTargetPosition { - - boolean reachedTargetPosition(JsonNode valueAsJson); - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java deleted file mode 100644 index 89dbd3d1f47..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteFileOffsetBackingStore.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Preconditions; -import io.airbyte.commons.json.Jsons; -import java.io.EOFException; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.io.FileUtils; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.util.SafeObjectInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class handles reading and writing a debezium offset file. In many cases it is duplicating - * logic in debezium because that logic is not exposed in the public API. We mostly treat the - * contents of this state file like a black box. We know it is a Map<ByteBuffer, Bytebuffer>. - * We deserialize it to a Map<String, String> so that the state file can be human readable. If - * we ever discover that any of the contents of these offset files is not string serializable we - * will likely have to drop the human readability support and just base64 encode it. - */ -public class AirbyteFileOffsetBackingStore { - - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class); - - private final Path offsetFilePath; - - public AirbyteFileOffsetBackingStore(final Path offsetFilePath) { - this.offsetFilePath = offsetFilePath; - } - - public Path getOffsetFilePath() { - return offsetFilePath; - } - - public Map read() { - final Map raw = load(); - - return raw.entrySet().stream().collect(Collectors.toMap( - e -> byteBufferToString(e.getKey()), - e -> byteBufferToString(e.getValue()))); - } - - @SuppressWarnings("unchecked") - public void persist(final JsonNode cdcState) { - final Map mapAsString = - cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap(); - final Map mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap( - e -> stringToByteBuffer(e.getKey()), - e -> stringToByteBuffer(e.getValue()))); - - FileUtils.deleteQuietly(offsetFilePath.toFile()); - save(mappedAsStrings); - } - - private static String byteBufferToString(final ByteBuffer byteBuffer) { - Preconditions.checkNotNull(byteBuffer); - return new String(byteBuffer.array(), StandardCharsets.UTF_8); - } - - private static ByteBuffer stringToByteBuffer(final String s) { - Preconditions.checkNotNull(s); - return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); - } - - /** - * See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this - * method is not public. - */ - @SuppressWarnings("unchecked") - private Map load() { - try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) { - // todo (cgardens) - we currently suppress a security warning for this line. use of readObject from - // untrusted sources is considered unsafe. Since the source is controlled by us in this case it - // should be safe. That said, changing this implementation to not use readObject would remove some - // headache. - final Object obj = is.readObject(); - if (!(obj instanceof HashMap)) - throw new ConnectException("Expected HashMap but found " + obj.getClass()); - final Map raw = (Map) obj; - final Map data = new HashMap<>(); - for (final Map.Entry mapEntry : raw.entrySet()) { - final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; - final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; - data.put(key, value); - } - - return data; - } catch (final NoSuchFileException | EOFException e) { - // NoSuchFileException: Ignore, may be new. - // EOFException: Ignore, this means the file was missing or corrupt - return Collections.emptyMap(); - } catch (final IOException | ClassNotFoundException e) { - throw new ConnectException(e); - } - } - - /** - * See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this - * method is not public. - */ - private void save(final Map data) { - try (final ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) { - final Map raw = new HashMap<>(); - for (final Map.Entry mapEntry : data.entrySet()) { - final byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; - final byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; - raw.put(key, value); - } - os.writeObject(raw); - } catch (final IOException e) { - throw new ConnectException(e); - } - } - - public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcState) { - final Path cdcWorkingDir; - try { - cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset"); - } catch (final IOException e) { - throw new RuntimeException(e); - } - final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat"); - - final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath); - offsetManager.persist(cdcState); - return offsetManager; - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java deleted file mode 100644 index c36bd3cc48c..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/AirbyteSchemaHistoryStorage.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import io.debezium.document.Document; -import io.debezium.document.DocumentReader; -import io.debezium.document.DocumentWriter; -import io.debezium.relational.history.HistoryRecord; -import java.io.BufferedWriter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Optional; -import java.util.function.Consumer; -import org.apache.commons.io.FileUtils; - -/** - * The purpose of this class is : to , 1. Read the contents of the file {@link #path} which contains - * the schema history at the end of the sync so that it can be saved in state for future syncs. - * Check {@link #read()} 2. Write the saved content back to the file {@link #path} at the beginning - * of the sync so that debezium can function smoothly. Check persist(Optional<JsonNode>). To - * understand more about file, please refer {@link FilteredFileDatabaseHistory} - */ -public class AirbyteSchemaHistoryStorage { - - private final Path path; - private static final Charset UTF8 = StandardCharsets.UTF_8; - private final DocumentReader reader = DocumentReader.defaultReader(); - private final DocumentWriter writer = DocumentWriter.defaultWriter(); - - public AirbyteSchemaHistoryStorage(final Path path) { - this.path = path; - } - - public Path getPath() { - return path; - } - - /** - * This implementation is kind of similar to - * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} - */ - public String read() { - final StringBuilder fileAsString = new StringBuilder(); - try { - for (final String line : Files.readAllLines(path, UTF8)) { - if (line != null && !line.isEmpty()) { - final Document record = reader.read(line); - final String recordAsString = writer.write(record); - fileAsString.append(recordAsString); - fileAsString.append(System.lineSeparator()); - } - } - return fileAsString.toString(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - /** - * This implementation is kind of similar to - * {@link io.debezium.relational.history.FileDatabaseHistory#start()} - */ - private void makeSureFileExists() { - try { - // Make sure the file exists ... - if (!Files.exists(path)) { - // Create parent directories if we have them ... - if (path.getParent() != null) { - Files.createDirectories(path.getParent()); - } - try { - Files.createFile(path); - } catch (final FileAlreadyExistsException e) { - // do nothing - } - } - } catch (final IOException e) { - throw new IllegalStateException( - "Unable to create history file at " + path + ": " + e.getMessage(), e); - } - } - - public void persist(final Optional schemaHistory) { - if (schemaHistory.isEmpty()) { - return; - } - final String fileAsString = Jsons.object(schemaHistory.get(), String.class); - - if (fileAsString == null || fileAsString.isEmpty()) { - return; - } - - FileUtils.deleteQuietly(path.toFile()); - makeSureFileExists(); - writeToFile(fileAsString); - } - - /** - * This implementation is kind of similar to - * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} - * - * @param fileAsString Represents the contents of the file saved in state from previous syncs - */ - private void writeToFile(final String fileAsString) { - try { - final String[] split = fileAsString.split(System.lineSeparator()); - for (final String element : split) { - final Document read = reader.read(element); - final String line = writer.write(read); - - try (final BufferedWriter historyWriter = Files - .newBufferedWriter(path, StandardOpenOption.APPEND)) { - try { - historyWriter.append(line); - historyWriter.newLine(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - } - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - public static AirbyteSchemaHistoryStorage initializeDBHistory(final Optional schemaHistory) { - final Path dbHistoryWorkingDir; - try { - dbHistoryWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-db-history"); - } catch (final IOException e) { - throw new RuntimeException(e); - } - final Path dbHistoryFilePath = dbHistoryWorkingDir.resolve("dbhistory.dat"); - - final AirbyteSchemaHistoryStorage schemaHistoryManager = new AirbyteSchemaHistoryStorage(dbHistoryFilePath); - schemaHistoryManager.persist(schemaHistory); - return schemaHistoryManager; - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java deleted file mode 100644 index ab0a9e6cde1..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import io.airbyte.db.DataTypeUtils; -import io.debezium.spi.converter.RelationalColumn; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.Duration; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class DebeziumConverterUtils { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class); - - private DebeziumConverterUtils() { - throw new UnsupportedOperationException(); - } - - public static String convertDate(final Object input) { - /** - * While building this custom converter we were not sure what type debezium could return cause there - * is no mention of it in the documentation. Secondly if you take a look at - * {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)} - * method, even it is handling multiple data types but its not clear under what circumstances which - * data type would be returned. I just went ahead and handled the data types that made sense. - * Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA - */ - if (input instanceof LocalDateTime) { - return DataTypeUtils.toISO8601String((LocalDateTime) input); - } else if (input instanceof LocalDate) { - return DataTypeUtils.toISO8601String((LocalDate) input); - } else if (input instanceof Duration) { - return DataTypeUtils.toISO8601String((Duration) input); - } else if (input instanceof Timestamp) { - return DataTypeUtils.toISO8601StringWithMicroseconds((((Timestamp) input).toInstant())); - } else if (input instanceof Number) { - return DataTypeUtils.toISO8601String( - new Timestamp(((Number) input).longValue()).toLocalDateTime()); - } else if (input instanceof Date) { - return DataTypeUtils.toISO8601String((Date) input); - } else if (input instanceof String) { - try { - return LocalDateTime.parse((String) input).toString(); - } catch (final DateTimeParseException e) { - LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", input); - return input.toString(); - } - } - LOGGER.warn("Uncovered date class type '{}'. Use default converter", input.getClass().getName()); - return input.toString(); - } - - public static Object convertDefaultValue(RelationalColumn field) { - if (field.isOptional()) { - return null; - } else if (field.hasDefaultValue()) { - return field.defaultValue(); - } - return null; - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java deleted file mode 100644 index da31b614321..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumEventUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.debezium.CdcMetadataInjector; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.debezium.engine.ChangeEvent; -import java.sql.Timestamp; -import java.time.Instant; - -public class DebeziumEventUtils { - - public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at"; - public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at"; - - public static AirbyteMessage toAirbyteMessage(final ChangeEvent event, - final CdcMetadataInjector cdcMetadataInjector, - final Instant emittedAt) { - final JsonNode debeziumRecord = Jsons.deserialize(event.value()); - final JsonNode before = debeziumRecord.get("before"); - final JsonNode after = debeziumRecord.get("after"); - final JsonNode source = debeziumRecord.get("source"); - - final JsonNode data = formatDebeziumData(before, after, source, cdcMetadataInjector); - final String schemaName = cdcMetadataInjector.namespace(source); - final String streamName = source.get("table").asText(); - - final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage() - .withStream(streamName) - .withNamespace(schemaName) - .withEmittedAt(emittedAt.toEpochMilli()) - .withData(data); - - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(airbyteRecordMessage); - } - - // warning mutates input args. - private static JsonNode formatDebeziumData(final JsonNode before, - final JsonNode after, - final JsonNode source, - final CdcMetadataInjector cdcMetadataInjector) { - final ObjectNode base = (ObjectNode) (after.isNull() ? before : after); - - final long transactionMillis = source.get("ts_ms").asLong(); - final String transactionTimestamp = new Timestamp(transactionMillis).toInstant().toString(); - - base.put(CDC_UPDATED_AT, transactionTimestamp); - cdcMetadataInjector.addMetaData(base, source); - - if (after.isNull()) { - base.put(CDC_DELETED_AT, transactionTimestamp); - } else { - base.put(CDC_DELETED_AT, (String) null); - } - - return base; - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java deleted file mode 100644 index 76305dabf25..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordIterator.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.AbstractIterator; -import io.airbyte.commons.concurrency.VoidCallable; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.MoreBooleans; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.integrations.debezium.CdcTargetPosition; -import io.debezium.engine.ChangeEvent; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The record iterator is the consumer (in the producer / consumer relationship with debezium) - * responsible for 1. making sure every record produced by the record publisher is processed 2. - * signalling to the record publisher when it is time for it to stop producing records. It emits - * this signal either when the publisher had not produced a new record for a long time or when it - * has processed at least all of the records that were present in the database when the source was - * started. Because the publisher might publish more records between the consumer sending this - * signal and the publisher actually shutting down, the consumer must stay alive as long as the - * publisher is not closed. Even after the publisher is closed, the consumer will finish processing - * any produced records before closing. - */ -public class DebeziumRecordIterator extends AbstractIterator> - implements AutoCloseableIterator> { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class); - - private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES); - private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES); - - private final LinkedBlockingQueue> queue; - private final CdcTargetPosition targetPosition; - private final Supplier publisherStatusSupplier; - private final VoidCallable requestClose; - private boolean receivedFirstRecord; - private boolean hasSnapshotFinished; - private boolean signalledClose; - - public DebeziumRecordIterator(final LinkedBlockingQueue> queue, - final CdcTargetPosition targetPosition, - final Supplier publisherStatusSupplier, - final VoidCallable requestClose) { - this.queue = queue; - this.targetPosition = targetPosition; - this.publisherStatusSupplier = publisherStatusSupplier; - this.requestClose = requestClose; - this.receivedFirstRecord = false; - this.hasSnapshotFinished = true; - this.signalledClose = false; - } - - @Override - protected ChangeEvent computeNext() { - // keep trying until the publisher is closed or until the queue is empty. the latter case is - // possible when the publisher has shutdown but the consumer has not yet processed all messages it - // emitted. - while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { - final ChangeEvent next; - try { - final WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES; - next = queue.poll(waitTime.period, waitTime.timeUnit); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - - // if within the timeout, the consumer could not get a record, it is time to tell the producer to - // shutdown. - if (next == null) { - LOGGER.info("Closing cause next is returned as null"); - requestClose(); - LOGGER.info("no record found. polling again."); - continue; - } - - final JsonNode eventAsJson = Jsons.deserialize(next.value()); - hasSnapshotFinished = hasSnapshotFinished(eventAsJson); - - // if the last record matches the target file position, it is time to tell the producer to shutdown. - if (!signalledClose && shouldSignalClose(eventAsJson)) { - requestClose(); - } - receivedFirstRecord = true; - return next; - } - return endOfData(); - } - - private boolean hasSnapshotFinished(final JsonNode eventAsJson) { - final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase()); - return SnapshotMetadata.TRUE != snapshot; - } - - /** - * Debezium was built as an ever running process which keeps on listening for new changes on DB and - * immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order - * to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the - * beginning of the sync we define a target position in the logs of the DB. This can be an LSN or - * anything specific to the DB which can help us identify that we have reached a specific position - * in the log based replication When we start processing records from debezium, we extract the the - * log position from the metadata of the record and compare it with our target that we defined at - * the beginning of the sync. If we have reached the target position, we shutdown the debezium - * engine 2. The TargetPosition logic might not always work and in order to tackle that we have - * another logic where if we do not receive records from debezium for a given duration, we ask - * debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is - * running for the first time, we let it complete the snapshot and only after the completion of - * snapshot we should shutdown the engine. If we are closing the engine before completion of - * snapshot, we throw an exception - */ - @Override - public void close() throws Exception { - requestClose(); - } - - private boolean shouldSignalClose(final JsonNode eventAsJson) { - return targetPosition.reachedTargetPosition(eventAsJson); - } - - private void requestClose() { - try { - requestClose.call(); - signalledClose = true; - } catch (final Exception e) { - throw new RuntimeException(e); - } - throwExceptionIfSnapshotNotFinished(); - } - - private void throwExceptionIfSnapshotNotFinished() { - if (!hasSnapshotFinished) { - throw new RuntimeException("Closing down debezium engine but snapshot has not finished"); - } - } - - private static class WaitTime { - - public final int period; - public final TimeUnit timeUnit; - - public WaitTime(final int period, final TimeUnit timeUnit) { - this.period = period; - this.timeUnit = timeUnit; - } - - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java deleted file mode 100644 index 91a244b8f4b..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumRecordPublisher.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.SyncMode; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.engine.format.Json; -import io.debezium.engine.spi.OffsetCommitPolicy; -import java.util.Optional; -import java.util.Properties; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import org.codehaus.plexus.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The purpose of this class is to intiliaze and spawn the debezium engine with the right properties - * to fetch records - */ -public class DebeziumRecordPublisher implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class); - private final ExecutorService executor; - private DebeziumEngine> engine; - - private final JsonNode config; - private final AirbyteFileOffsetBackingStore offsetManager; - private final Optional schemaHistoryManager; - - private final AtomicBoolean hasClosed; - private final AtomicBoolean isClosing; - private final AtomicReference thrownError; - private final CountDownLatch engineLatch; - private final Properties properties; - private final ConfiguredAirbyteCatalog catalog; - - public DebeziumRecordPublisher(final Properties properties, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final AirbyteFileOffsetBackingStore offsetManager, - final Optional schemaHistoryManager) { - this.properties = properties; - this.config = config; - this.catalog = catalog; - this.offsetManager = offsetManager; - this.schemaHistoryManager = schemaHistoryManager; - this.hasClosed = new AtomicBoolean(false); - this.isClosing = new AtomicBoolean(false); - this.thrownError = new AtomicReference<>(); - this.executor = Executors.newSingleThreadExecutor(); - this.engineLatch = new CountDownLatch(1); - } - - public void start(final Queue> queue) { - engine = DebeziumEngine.create(Json.class) - .using(getDebeziumProperties()) - .using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) - .notifying(e -> { - // debezium outputs a tombstone event that has a value of null. this is an artifact of how it - // interacts with kafka. we want to ignore it. - // more on the tombstone: - // https://debezium.io/documentation/reference/configuration/event-flattening.html - if (e.value() != null) { - boolean inserted = false; - while (!inserted) { - inserted = queue.offer(e); - } - } - }) - .using((success, message, error) -> { - LOGGER.info("Debezium engine shutdown."); - thrownError.set(error); - engineLatch.countDown(); - }) - .build(); - - // Run the engine asynchronously ... - executor.execute(engine); - } - - public boolean hasClosed() { - return hasClosed.get(); - } - - public void close() throws Exception { - if (isClosing.compareAndSet(false, true)) { - // consumers should assume records can be produced until engine has closed. - if (engine != null) { - engine.close(); - } - - // wait for closure before shutting down executor service - engineLatch.await(5, TimeUnit.MINUTES); - - // shut down and await for thread to actually go down - executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); - - // after the engine is completely off, we can mark this as closed - hasClosed.set(true); - - if (thrownError.get() != null) { - throw new RuntimeException(thrownError.get()); - } - } - } - - protected Properties getDebeziumProperties() { - final Properties props = new Properties(); - props.putAll(properties); - - // debezium engine configuration - props.setProperty("name", "engine"); - props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); - props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); - props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer - // default values from debezium CommonConnectorConfig - props.setProperty("max.batch.size", "2048"); - props.setProperty("max.queue.size", "8192"); - - if (schemaHistoryManager.isPresent()) { - // https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-file-filename - // https://debezium.io/documentation/reference/development/engine.html#_in_the_code - // As mentioned in the documents above, debezium connector for MySQL needs to track the schema - // changes. If we don't do this, we can't fetch records for the table - // We have implemented our own implementation to filter out the schema information from other - // databases that the connector is not syncing - props.setProperty("database.history", "io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory"); - props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString()); - } - - // https://debezium.io/documentation/reference/configuration/avro.html - props.setProperty("key.converter.schemas.enable", "false"); - props.setProperty("value.converter.schemas.enable", "false"); - - // debezium names - props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText()); - props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText()); - - // db connection configuration - props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText()); - props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText()); - props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText()); - props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText()); - - if (config.has(JdbcUtils.PASSWORD_KEY)) { - props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText()); - } - - // By default "decimal.handing.mode=precise" which's caused returning this value as a binary. - // The "double" type may cause a loss of precision, so set Debezium's config to store it as a String - // explicitly in its Kafka messages for more details see: - // https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-decimal-types - // https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation - props.setProperty("decimal.handling.mode", "string"); - - // table selection - final String tableWhitelist = getTableWhitelist(catalog); - props.setProperty("table.include.list", tableWhitelist); - props.setProperty("database.include.list", config.get(JdbcUtils.DATABASE_KEY).asText()); - - return props; - } - - @VisibleForTesting - public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams().stream() - .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) - .map(ConfiguredAirbyteStream::getStream) - .map(stream -> stream.getNamespace() + "." + stream.getName()) - // debezium needs commas escaped to split properly - .map(x -> StringUtils.escape(x, new char[] {','}, "\\,")) - .collect(Collectors.joining(",")); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java deleted file mode 100644 index f0469076753..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/FilteredFileDatabaseHistory.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import io.debezium.config.Configuration; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; -import io.debezium.relational.history.DatabaseHistoryListener; -import io.debezium.relational.history.FileDatabaseHistory; -import io.debezium.relational.history.HistoryRecord; -import io.debezium.relational.history.HistoryRecord.Fields; -import io.debezium.relational.history.HistoryRecordComparator; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.function.Consumer; - -/** - * MySQL Debezium connector monitors the database schema evolution over the time and stores the data - * in a database history file. Without this file we can't fetch the records from binlog. We need to - * save the contents of the file. Debezium by default uses - * {@link io.debezium.relational.history.FileDatabaseHistory} class to write the schema information - * in the file. The problem is that the Debezium tracks the schema evolution of all the tables in - * all the databases, because of that the file content can grow. In order to make sure that debezium - * tracks only the schema of the tables that are present in the database that Airbyte is syncing, we - * created this class. In the method {@link #storeRecord(HistoryRecord)}, we introduced a check to - * make sure only those records are being saved whose database name matches the database Airbyte is - * syncing. We tell debezium to use this class by passing it as property in debezium engine. Look - * for "database.history" property in {@link DebeziumRecordPublisher#getDebeziumProperties()} - * Ideally {@link FilteredFileDatabaseHistory} should have extended - * {@link io.debezium.relational.history.FileDatabaseHistory} and overridden the - * {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final - * class and can not be inherited - */ -public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory { - - private final FileDatabaseHistory fileDatabaseHistory; - private static String databaseName; - - public FilteredFileDatabaseHistory() { - this.fileDatabaseHistory = new FileDatabaseHistory(); - } - - /** - * Ideally the databaseName should have been initialized in the constructor of the class. But since - * we supply the class name to debezium and it uses reflection to construct the object of the class, - * we can't pass in the databaseName as a parameter to the constructor. That's why we had to take - * the static approach. - * - * @param databaseName Name of the database that the connector is syncing - */ - public static void setDatabaseName(final String databaseName) { - if (FilteredFileDatabaseHistory.databaseName == null) { - FilteredFileDatabaseHistory.databaseName = databaseName; - } else if (!FilteredFileDatabaseHistory.databaseName.equals(databaseName)) { - throw new RuntimeException( - "Database name has already been set : " + FilteredFileDatabaseHistory.databaseName - + " can't set to : " + databaseName); - } - } - - @Override - public void configure(final Configuration config, - final HistoryRecordComparator comparator, - final DatabaseHistoryListener listener, - final boolean useCatalogBeforeSchema) { - fileDatabaseHistory.configure(config, comparator, listener, useCatalogBeforeSchema); - } - - @Override - public void start() { - fileDatabaseHistory.start(); - } - - @Override - public void storeRecord(final HistoryRecord record) throws DatabaseHistoryException { - if (record == null) { - return; - } - try { - final String dbNameInRecord = record.document().getString(Fields.DATABASE_NAME); - if (databaseName != null && dbNameInRecord != null && !dbNameInRecord.equals(databaseName)) { - return; - } - - /** - * We are using reflection because the method - * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} is - * protected and can not be accessed from here - */ - final Method storeRecordMethod = fileDatabaseHistory.getClass() - .getDeclaredMethod("storeRecord", record.getClass()); - storeRecordMethod.setAccessible(true); - storeRecordMethod.invoke(fileDatabaseHistory, record); - } catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - @Override - public void stop() { - fileDatabaseHistory.stop(); - // this is just for tests - resetDbName(); - } - - public static void resetDbName() { - databaseName = null; - } - - @Override - protected void recoverRecords(final Consumer records) { - try { - /** - * We are using reflection because the method - * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} is protected - * and can not be accessed from here - */ - final Method recoverRecords = fileDatabaseHistory.getClass() - .getDeclaredMethod("recoverRecords", Consumer.class); - recoverRecords.setAccessible(true); - recoverRecords.invoke(fileDatabaseHistory, records); - } catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean storageExists() { - return fileDatabaseHistory.storageExists(); - } - - @Override - public void initializeStorage() { - fileDatabaseHistory.initializeStorage(); - } - - @Override - public boolean exists() { - return fileDatabaseHistory.exists(); - } - - @Override - public String toString() { - return fileDatabaseHistory.toString(); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java deleted file mode 100644 index 50a5c77057f..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MSSQLConverter.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import com.microsoft.sqlserver.jdbc.Geography; -import com.microsoft.sqlserver.jdbc.Geometry; -import com.microsoft.sqlserver.jdbc.SQLServerException; -import io.airbyte.db.DataTypeUtils; -import io.debezium.spi.converter.CustomConverter; -import io.debezium.spi.converter.RelationalColumn; -import java.math.BigDecimal; -import java.nio.charset.Charset; -import java.sql.Timestamp; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; -import microsoft.sql.DateTimeOffset; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MSSQLConverter implements CustomConverter { - - private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class); - - private final Set DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "SMALLDATETIME"); - private final Set BINARY = Set.of("VARBINARY", "BINARY"); - private static final String DATETIMEOFFSET = "DATETIMEOFFSET"; - private static final String TIME_TYPE = "TIME"; - private static final String SMALLMONEY_TYPE = "SMALLMONEY"; - private static final String GEOMETRY = "GEOMETRY"; - private static final String GEOGRAPHY = "GEOGRAPHY"; - private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss XXX"; - - @Override - public void configure(Properties props) {} - - @Override - public void converterFor(final RelationalColumn field, - final ConverterRegistration registration) { - if (DATE_TYPES.contains(field.typeName().toUpperCase())) { - registerDate(field, registration); - } else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) { - registerMoney(field, registration); - } else if (BINARY.contains(field.typeName().toUpperCase())) { - registerBinary(field, registration); - } else if (GEOMETRY.equalsIgnoreCase(field.typeName())) { - registerGeometry(field, registration); - } else if (GEOGRAPHY.equalsIgnoreCase(field.typeName())) { - registerGeography(field, registration); - } else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) { - registerTime(field, registration); - } else if (DATETIMEOFFSET.equalsIgnoreCase(field.typeName())) { - registerDateTimeOffSet(field, registration); - } - } - - private void registerGeometry(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof byte[]) { - try { - return Geometry.deserialize((byte[]) input).toString(); - } catch (SQLServerException e) { - LOGGER.error(e.getMessage()); - } - } - - LOGGER.warn("Uncovered Geometry class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - - private void registerGeography(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof byte[]) { - try { - return Geography.deserialize((byte[]) input).toString(); - } catch (SQLServerException e) { - LOGGER.error(e.getMessage()); - } - } - - LOGGER.warn("Uncovered Geography class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - - private void registerDate(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - return DebeziumConverterUtils.convertDate(input); - }); - } - - private void registerDateTimeOffSet(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof DateTimeOffset) { - return DataTypeUtils.toISO8601String( - OffsetDateTime.parse(input.toString(), - DateTimeFormatter.ofPattern(DEBEZIUM_DATETIMEOFFSET_FORMAT))); - } - - LOGGER.warn("Uncovered DateTimeOffSet class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - - private void registerTime(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof Timestamp) { - return DataTypeUtils.toISOTimeString(((Timestamp) input).toLocalDateTime()); - } - - LOGGER.warn("Uncovered time class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - - private void registerMoney(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.float64(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof BigDecimal) { - return ((BigDecimal) input).doubleValue(); - } - - LOGGER.warn("Uncovered money class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - - private void registerBinary(final RelationalColumn field, - final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), input -> { - if (Objects.isNull(input)) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (input instanceof byte[]) { - return new String((byte[]) input, Charset.defaultCharset()); - } - - LOGGER.warn("Uncovered binary class type '{}'. Use default converter", - input.getClass().getName()); - return input.toString(); - }); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java deleted file mode 100644 index ac099bc15cd..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLConverter.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import io.airbyte.db.DataTypeUtils; -import io.debezium.spi.converter.CustomConverter; -import io.debezium.spi.converter.RelationalColumn; -import java.nio.charset.StandardCharsets; -import java.time.LocalDate; -import java.util.Arrays; -import java.util.Properties; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a custom debezium converter used in MySQL to handle the DATETIME data type. We need a - * custom converter cause by default debezium returns the DATETIME values as numbers. We need to - * convert it to proper format. Ref : - * https://debezium.io/documentation/reference/1.4/development/converters.html This is built from - * reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you - * rename this class then remember to rename the datetime.type property value in - * io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties() (If you don't - * rename, a test would still fail but it might be tricky to figure out where to change the property - * name) - */ -public class MySQLConverter implements CustomConverter { - - private static final Logger LOGGER = LoggerFactory.getLogger(MySQLConverter.class); - - private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"}; - private final String[] TEXT_TYPES = {"CHAR", "VARCHAR", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT"}; - - @Override - public void configure(final Properties props) {} - - @Override - public void converterFor(final RelationalColumn field, final ConverterRegistration registration) { - if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { - registerDate(field, registration); - } else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { - registerText(field, registration); - } - } - - private void registerText(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), x -> { - if (x == null) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - if (x instanceof byte[]) { - return new String((byte[]) x, StandardCharsets.UTF_8); - } else { - return x.toString(); - } - }); - } - - /** - * The debezium driver replaces Zero-value by Null even when this column is mandatory. According to - * the doc, it should be done by driver, but it fails. - */ - private Object convertDefaultValueNullDate(final RelationalColumn field) { - final var defaultValue = DebeziumConverterUtils.convertDefaultValue(field); - return (defaultValue == null && !field.isOptional() ? DataTypeUtils.toISO8601String(LocalDate.EPOCH) : defaultValue); - } - - private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string(), - x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x)); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java deleted file mode 100644 index aee741b6aac..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import io.debezium.spi.converter.CustomConverter; -import io.debezium.spi.converter.RelationalColumn; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Properties; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.postgresql.util.PGInterval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PostgresConverter implements CustomConverter { - - private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class); - - private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"}; - private final String[] BIT_TYPES = {"BIT", "VARBIT"}; - private final String[] MONEY_ITEM_TYPE = {"MONEY"}; - private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"}; - private final String[] TEXT_TYPES = - {"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"}; - - @Override - public void configure(final Properties props) {} - - @Override - public void converterFor(final RelationalColumn field, final ConverterRegistration registration) { - if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { - registerDate(field, registration); - } else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) - || Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName())) - || Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { - registerText(field, registration); - } else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { - registerMoney(field, registration); - } - } - - private void registerText(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string().optional(), x -> { - if (x == null) { - return DebeziumConverterUtils.convertDefaultValue(field); - } - - if (x instanceof byte[]) { - return new String((byte[]) x, StandardCharsets.UTF_8); - } else { - return x.toString(); - } - }); - } - - private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string().optional(), x -> { - if (x == null) { - return DebeziumConverterUtils.convertDefaultValue(field); - } else if (x instanceof PGInterval) { - return convertInterval((PGInterval) x); - } else { - return DebeziumConverterUtils.convertDate(x); - } - }); - } - - private String convertInterval(final PGInterval pgInterval) { - final StringBuilder resultInterval = new StringBuilder(); - formatDateUnit(resultInterval, pgInterval.getYears(), " year "); - formatDateUnit(resultInterval, pgInterval.getMonths(), " mons "); - formatDateUnit(resultInterval, pgInterval.getDays(), " days "); - - formatTimeValues(resultInterval, pgInterval); - return resultInterval.toString(); - } - - private void registerMoney(final RelationalColumn field, final ConverterRegistration registration) { - registration.register(SchemaBuilder.string().optional(), x -> { - if (x == null) { - return DebeziumConverterUtils.convertDefaultValue(field); - } else if (x instanceof Double) { - final BigDecimal result = BigDecimal.valueOf((Double) x); - if (result.compareTo(new BigDecimal("999999999999999")) == 1 - || result.compareTo(new BigDecimal("-999999999999999")) == -1) { - return null; - } - return result.toString(); - } else { - return x.toString(); - } - }); - } - - private void formatDateUnit(final StringBuilder resultInterval, final int dateUnit, final String s) { - if (dateUnit != 0) { - resultInterval - .append(dateUnit) - .append(s); - } - } - - private void formatTimeValues(final StringBuilder resultInterval, final PGInterval pgInterval) { - if (isNegativeTime(pgInterval)) { - resultInterval.append("-"); - } - // TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE, - final int hours = Math.abs(pgInterval.getHours()); - final int minutes = Math.abs(pgInterval.getMinutes()); - final int seconds = Math.abs(pgInterval.getWholeSeconds()); - resultInterval.append(addFirstDigit(hours)); - resultInterval.append(hours); - resultInterval.append(":"); - resultInterval.append(addFirstDigit(minutes)); - resultInterval.append(minutes); - resultInterval.append(":"); - resultInterval.append(addFirstDigit(seconds)); - resultInterval.append(seconds); - } - - private String addFirstDigit(final int hours) { - return hours <= 9 ? "0" : ""; - } - - private boolean isNegativeTime(final PGInterval pgInterval) { - return pgInterval.getHours() < 0 - || pgInterval.getMinutes() < 0 - || pgInterval.getWholeSeconds() < 0; - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java b/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java deleted file mode 100644 index b24cdf71fbe..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/main/java/io/airbyte/integrations/debezium/internals/SnapshotMetadata.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -public enum SnapshotMetadata { - TRUE, - FALSE, - LAST -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java deleted file mode 100644 index 45d50612f79..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteDebeziumHandlerTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import com.google.common.collect.Lists; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; -import java.util.List; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class AirbyteDebeziumHandlerTest { - - @Test - public void shouldUseCdcTestShouldReturnTrue() { - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - "MODELS_STREAM_NAME", - "MODELS_SCHEMA", - Field.of("COL_ID", JsonSchemaType.NUMBER), - Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER), - Field.of("COL_MODEL", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("COL_ID"))))); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers - .toDefaultConfiguredCatalog(catalog); - // set all streams to incremental. - configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); - - Assertions.assertTrue(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog)); - } - - @Test - public void shouldUseCdcTestShouldReturnFalse() { - final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - "MODELS_STREAM_NAME", - "MODELS_SCHEMA", - Field.of("COL_ID", JsonSchemaType.NUMBER), - Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER), - Field.of("COL_MODEL", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of("COL_ID"))))); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers - .toDefaultConfiguredCatalog(catalog); - - Assertions.assertFalse(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog)); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java deleted file mode 100644 index 9f1e6d0ea05..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/AirbyteFileOffsetBackingStoreTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; -import org.junit.jupiter.api.Test; - -class AirbyteFileOffsetBackingStoreTest { - - @SuppressWarnings("UnstableApiUsage") - @Test - void test() throws IOException { - final Path testRoot = Files.createTempDirectory(Path.of("/tmp"), "offset-store-test"); - - final byte[] bytes = MoreResources.readBytes("test_debezium_offset.dat"); - final Path templateFilePath = testRoot.resolve("template_offset.dat"); - IOs.writeFile(templateFilePath, bytes); - - final Path writeFilePath = testRoot.resolve("offset.dat"); - - final AirbyteFileOffsetBackingStore offsetStore = new AirbyteFileOffsetBackingStore(templateFilePath); - final Map offset = offsetStore.read(); - - final JsonNode asJson = Jsons.jsonNode(offset); - - final AirbyteFileOffsetBackingStore offsetStore2 = new AirbyteFileOffsetBackingStore(writeFilePath); - offsetStore2.persist(asJson); - - final Map stateFromOffsetStoreRoundTrip = offsetStore2.read(); - - // verify that, after a round trip through the offset store, we get back the same data. - assertEquals(offset, stateFromOffsetStoreRoundTrip); - // verify that the file written by the offset store is identical to the template file. - assertTrue(com.google.common.io.Files.equal(templateFilePath.toFile(), writeFilePath.toFile())); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java deleted file mode 100644 index 4de1b36524d..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumEventUtilsTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.debezium.engine.ChangeEvent; -import java.io.IOException; -import java.time.Instant; -import org.junit.jupiter.api.Test; - -class DebeziumEventUtilsTest { - - @Test - public void testConvertChangeEvent() throws IOException { - final String stream = "names"; - final Instant emittedAt = Instant.now(); - final CdcMetadataInjector cdcMetadataInjector = new DummyMetadataInjector(); - final ChangeEvent insertChangeEvent = mockChangeEvent("insert_change_event.json"); - final ChangeEvent updateChangeEvent = mockChangeEvent("update_change_event.json"); - final ChangeEvent deleteChangeEvent = mockChangeEvent("delete_change_event.json"); - - final AirbyteMessage actualInsert = DebeziumEventUtils.toAirbyteMessage(insertChangeEvent, cdcMetadataInjector, emittedAt); - final AirbyteMessage actualUpdate = DebeziumEventUtils.toAirbyteMessage(updateChangeEvent, cdcMetadataInjector, emittedAt); - final AirbyteMessage actualDelete = DebeziumEventUtils.toAirbyteMessage(deleteChangeEvent, cdcMetadataInjector, emittedAt); - - final AirbyteMessage expectedInsert = createAirbyteMessage(stream, emittedAt, "insert_message.json"); - final AirbyteMessage expectedUpdate = createAirbyteMessage(stream, emittedAt, "update_message.json"); - final AirbyteMessage expectedDelete = createAirbyteMessage(stream, emittedAt, "delete_message.json"); - - deepCompare(expectedInsert, actualInsert); - deepCompare(expectedUpdate, actualUpdate); - deepCompare(expectedDelete, actualDelete); - } - - private static ChangeEvent mockChangeEvent(final String resourceName) throws IOException { - final ChangeEvent mocked = mock(ChangeEvent.class); - final String resource = MoreResources.readResource(resourceName); - when(mocked.value()).thenReturn(resource); - - return mocked; - } - - private static AirbyteMessage createAirbyteMessage(final String stream, final Instant emittedAt, final String resourceName) throws IOException { - final String data = MoreResources.readResource(resourceName); - - final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage() - .withStream(stream) - .withNamespace("public") - .withData(Jsons.deserialize(data)) - .withEmittedAt(emittedAt.toEpochMilli()); - - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(recordMessage); - } - - private static void deepCompare(final Object expected, final Object actual) { - assertEquals(Jsons.deserialize(Jsons.serialize(expected)), Jsons.deserialize(Jsons.serialize(actual))); - } - - public static class DummyMetadataInjector implements CdcMetadataInjector { - - @Override - public void addMetaData(final ObjectNode event, final JsonNode source) { - final long lsn = source.get("lsn").asLong(); - event.put("_ab_cdc_lsn", lsn); - } - - @Override - public String namespace(final JsonNode source) { - return source.get("schema").asText(); - } - - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java deleted file mode 100644 index 31dacbc563a..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import static org.junit.jupiter.api.Assertions.*; - -import com.google.common.collect.ImmutableList; -import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.SyncMode; -import org.junit.jupiter.api.Test; - -class DebeziumRecordPublisherTest { - - @Test - public void testWhitelistCreation() { - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( - CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public").withSyncMode(SyncMode.INCREMENTAL))); - - final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.n\"aMéS"; - final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); - - assertEquals(expectedWhitelist, actualWhitelist); - } - - @Test - public void testWhitelistFiltersFullRefresh() { - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( - CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL), - CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public").withSyncMode(SyncMode.FULL_REFRESH))); - - final String expectedWhitelist = "public.id_and_name"; - final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog); - - assertEquals(expectedWhitelist, actualWhitelist); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java deleted file mode 100644 index facb86d0bc5..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtilsTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import io.debezium.spi.converter.RelationalColumn; -import java.sql.Timestamp; -import java.time.Duration; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -class DebeziumConverterUtilsTest { - - @Test - public void convertDefaultValueTest() { - - final RelationalColumn relationalColumn = mock(RelationalColumn.class); - - when(relationalColumn.isOptional()).thenReturn(true); - Object actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); - Assertions.assertNull(actualColumnDefaultValue, "Default value for optional relational column should be null"); - - when(relationalColumn.isOptional()).thenReturn(false); - when(relationalColumn.hasDefaultValue()).thenReturn(false); - actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); - Assertions.assertNull(actualColumnDefaultValue); - - when(relationalColumn.isOptional()).thenReturn(false); - when(relationalColumn.hasDefaultValue()).thenReturn(true); - final String expectedColumnDefaultValue = "default value"; - when(relationalColumn.defaultValue()).thenReturn(expectedColumnDefaultValue); - actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn); - Assertions.assertEquals(actualColumnDefaultValue, expectedColumnDefaultValue); - } - - @Test - public void convertLocalDate() { - final LocalDate localDate = LocalDate.of(2021, 1, 1); - - final String actual = DebeziumConverterUtils.convertDate(localDate); - Assertions.assertEquals("2021-01-01T00:00:00Z", actual); - } - - @Test - public void convertTLocalTime() { - final LocalTime localTime = LocalTime.of(8, 1, 1); - final String actual = DebeziumConverterUtils.convertDate(localTime); - Assertions.assertEquals("08:01:01", actual); - } - - @Test - public void convertLocalDateTime() { - final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1); - - final String actual = DebeziumConverterUtils.convertDate(localDateTime); - Assertions.assertEquals("2021-01-01T08:01:01Z", actual); - } - - @Test - @Disabled - public void convertDuration() { - final Duration duration = Duration.ofHours(100_000); - - final String actual = DebeziumConverterUtils.convertDate(duration); - Assertions.assertEquals("1981-05-29T20:00:00Z", actual); - } - - @Test - public void convertTimestamp() { - final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1); - final Timestamp timestamp = Timestamp.valueOf(localDateTime); - - final String actual = DebeziumConverterUtils.convertDate(timestamp); - Assertions.assertEquals("2021-01-01T08:01:01.000000Z", actual); - } - - @Test - @Disabled - public void convertNumber() { - final Number number = 100_000; - - final String actual = DebeziumConverterUtils.convertDate(number); - Assertions.assertEquals("1970-01-01T03:01:40Z", actual); - } - - @Test - public void convertStringDateFormat() { - final String stringValue = "2021-01-01T00:00:00Z"; - - final String actual = DebeziumConverterUtils.convertDate(stringValue); - Assertions.assertEquals("2021-01-01T00:00:00Z", actual); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_change_event.json deleted file mode 100644 index 07b575bf7e2..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": { - "first_name": "san", - "last_name": "goku", - "power": null - }, - "after": null, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775646886, - "snapshot": false, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 498, - "lsn": 23012360, - "xmin": null - }, - "op": "d", - "ts_ms": 1616775646931, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_message.json deleted file mode 100644 index 676ee5b74ff..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/delete_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": null, - "_ab_cdc_updated_at": "2021-03-26T16:20:46.886Z", - "_ab_cdc_lsn": 23012360, - "_ab_cdc_deleted_at": "2021-03-26T16:20:46.886Z" -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_change_event.json deleted file mode 100644 index 4b2c2fb6e2c..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": null, - "after": { - "first_name": "san", - "last_name": "goku", - "power": "Infinity" - }, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775642623, - "snapshot": true, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 495, - "lsn": 23011544, - "xmin": null - }, - "op": "r", - "ts_ms": 1616775642624, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_message.json deleted file mode 100644 index d971d32c176..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/insert_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": "Infinity", - "_ab_cdc_updated_at": "2021-03-26T16:20:42.623Z", - "_ab_cdc_lsn": 23011544, - "_ab_cdc_deleted_at": null -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/test_debezium_offset.dat b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/test_debezium_offset.dat deleted file mode 100644 index c7e7054916e..00000000000 Binary files a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/test_debezium_offset.dat and /dev/null differ diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_change_event.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_change_event.json deleted file mode 100644 index da5dcd9c2b0..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_change_event.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "before": null, - "after": { - "first_name": "san", - "last_name": "goku", - "power": 10000.2 - }, - "source": { - "version": "1.4.2.Final", - "connector": "postgresql", - "name": "orders", - "ts_ms": 1616775646881, - "snapshot": false, - "db": "db_lwfoyffqvx", - "schema": "public", - "table": "names", - "txId": 497, - "lsn": 23012216, - "xmin": null - }, - "op": "u", - "ts_ms": 1616775646929, - "transaction": null, - "destination": "orders.public.names" -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_message.json b/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_message.json deleted file mode 100644 index 89b9a08038a..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/test/resources/update_message.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "first_name": "san", - "last_name": "goku", - "power": 10000.2, - "_ab_cdc_updated_at": "2021-03-26T16:20:46.881Z", - "_ab_cdc_lsn": 23012216, - "_ab_cdc_deleted_at": null -} diff --git a/airbyte-integrations/bases/debezium-v1-4-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium-v1-4-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java deleted file mode 100644 index 8738ffda28b..00000000000 --- a/airbyte-integrations/bases/debezium-v1-4-2/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ /dev/null @@ -1,629 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.collect.Streams; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; -import io.airbyte.db.Database; -import io.airbyte.integrations.base.Source; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteMessage.Type; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class CdcSourceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class); - - protected static final String MODELS_SCHEMA = "models_schema"; - protected static final String MODELS_STREAM_NAME = "models"; - private static final Set STREAM_NAMES = Sets - .newHashSet(MODELS_STREAM_NAME); - protected static final String COL_ID = "id"; - protected static final String COL_MAKE_ID = "make_id"; - protected static final String COL_MODEL = "model"; - - protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( - CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME, - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER), - Field.of(COL_MODEL, JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))))); - protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers - .toDefaultConfiguredCatalog(CATALOG); - - // set all streams to incremental. - static { - CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); - } - - private static final List MODEL_RECORDS = ImmutableList.of( - Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350"))); - - protected void setup() throws SQLException { - createAndPopulateTables(); - } - - private void createAndPopulateTables() { - createAndPopulateActualTable(); - createAndPopulateRandomTable(); - } - - protected void executeQuery(final String query) { - try { - getDatabase().query( - ctx -> ctx - .execute(query)); - } catch (final SQLException e) { - throw new RuntimeException(e); - } - } - - public String columnClause(final Map columnsWithDataType, final Optional primaryKey) { - final StringBuilder columnClause = new StringBuilder(); - int i = 0; - for (final Map.Entry column : columnsWithDataType.entrySet()) { - columnClause.append(column.getKey()); - columnClause.append(" "); - columnClause.append(column.getValue()); - if (i < (columnsWithDataType.size() - 1)) { - columnClause.append(","); - columnClause.append(" "); - } - i++; - } - primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")")); - - return columnClause.toString(); - } - - public void createTable(final String schemaName, final String tableName, final String columnClause) { - executeQuery(createTableQuery(schemaName, tableName, columnClause)); - } - - public String createTableQuery(final String schemaName, final String tableName, final String columnClause) { - return String.format("CREATE TABLE %s.%s(%s);", schemaName, tableName, columnClause); - } - - public void createSchema(final String schemaName) { - executeQuery(createSchemaQuery(schemaName)); - } - - public String createSchemaQuery(final String schemaName) { - return "CREATE DATABASE " + schemaName + ";"; - } - - private void createAndPopulateActualTable() { - createSchema(MODELS_SCHEMA); - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME, - columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); - for (final JsonNode recordJson : MODEL_RECORDS) { - writeModelRecord(recordJson); - } - } - - /** - * This database and table is not part of Airbyte sync. It is being created just to make sure the - * databases not being synced by Airbyte are not causing issues with our debezium logic - */ - private void createAndPopulateRandomTable() { - createSchema(MODELS_SCHEMA + "_random"); - createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", - columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"), - Optional.of(COL_ID + "_random"))); - final List MODEL_RECORDS_RANDOM = ImmutableList.of( - Jsons - .jsonNode(ImmutableMap - .of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", - "Fiesta-random")), - Jsons.jsonNode(ImmutableMap - .of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", - "Focus-random")), - Jsons - .jsonNode(ImmutableMap - .of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", - "Ranger-random")), - Jsons.jsonNode(ImmutableMap - .of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", - "GLA-random")), - Jsons.jsonNode(ImmutableMap - .of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", - "A 220-random")), - Jsons - .jsonNode(ImmutableMap - .of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", - "E 350-random"))); - for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) { - writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", - COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random"); - } - } - - private void writeModelRecord(final JsonNode recordJson) { - writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL); - } - - private void writeRecords( - final JsonNode recordJson, - final String dbName, - final String streamName, - final String idCol, - final String makeIdCol, - final String modelCol) { - executeQuery( - String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName, - idCol, makeIdCol, modelCol, - recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(), - recordJson.get(modelCol).asText())); - } - - private static Set removeDuplicates(final Set messages) { - final Set existingDataRecordsWithoutUpdated = new HashSet<>(); - final Set output = new HashSet<>(); - - for (final AirbyteRecordMessage message : messages) { - final ObjectNode node = message.getData().deepCopy(); - node.remove("_ab_cdc_updated_at"); - - if (existingDataRecordsWithoutUpdated.contains(node)) { - LOGGER.info("Removing duplicate node: " + node); - } else { - output.add(message); - existingDataRecordsWithoutUpdated.add(node); - } - } - - return output; - } - - protected Set extractRecordMessages(final List messages) { - final List recordMessageList = messages - .stream() - .filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord) - .collect(Collectors.toList()); - final Set recordMessageSet = new HashSet<>(recordMessageList); - - assertEquals(recordMessageList.size(), recordMessageSet.size(), - "Expected no duplicates in airbyte record message output for a single sync."); - - return recordMessageSet; - } - - private List extractStateMessages(final List messages) { - return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) - .collect(Collectors.toList()); - } - - private void assertExpectedRecords(final Set expectedRecords, final Set actualRecords) { - // assume all streams are cdc. - assertExpectedRecords(expectedRecords, actualRecords, actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet())); - } - - private void assertExpectedRecords(final Set expectedRecords, - final Set actualRecords, - final Set cdcStreams) { - assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES); - } - - private void assertExpectedRecords(final Set expectedRecords, - final Set actualRecords, - final Set cdcStreams, - final Set streamNames) { - final Set actualData = actualRecords - .stream() - .map(recordMessage -> { - assertTrue(streamNames.contains(recordMessage.getStream())); - assertNotNull(recordMessage.getEmittedAt()); - - assertEquals(MODELS_SCHEMA, recordMessage.getNamespace()); - - final JsonNode data = recordMessage.getData(); - - if (cdcStreams.contains(recordMessage.getStream())) { - assertCdcMetaData(data, true); - } else { - assertNullCdcMetaData(data); - } - - removeCDCColumns((ObjectNode) data); - - return data; - }) - .collect(Collectors.toSet()); - - assertEquals(expectedRecords, actualData); - } - - @Test - @DisplayName("On the first sync, produce returns records that exist in the database.") - void testExistingData() throws Exception { - final CdcTargetPosition targetPosition = cdcLatestTargetPosition(); - final AutoCloseableIterator read = getSource().read(getConfig(), CONFIGURED_CATALOG, null); - final List actualRecords = AutoCloseableIterators.toListAndClose(read); - - final Set recordMessages = extractRecordMessages(actualRecords); - final List stateMessages = extractStateMessages(actualRecords); - - assertNotNull(targetPosition); - recordMessages.forEach(record -> { - assertEquals(extractPosition(record.getData()), targetPosition); - }); - - assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages); - assertEquals(1, stateMessages.size()); - assertNotNull(stateMessages.get(0).getData()); - assertExpectedStateMessages(stateMessages); - } - - @Test - @DisplayName("When a record is deleted, produces a deletion record.") - void testDelete() throws Exception { - final AutoCloseableIterator read1 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final List stateMessages1 = extractStateMessages(actualRecords1); - assertEquals(1, stateMessages1.size()); - assertNotNull(stateMessages1.get(0).getData()); - assertExpectedStateMessages(stateMessages1); - - executeQuery(String - .format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, - 11)); - - final JsonNode state = Jsons.jsonNode(stateMessages1); - final AutoCloseableIterator read2 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - final List recordMessages2 = new ArrayList<>( - extractRecordMessages(actualRecords2)); - final List stateMessages2 = extractStateMessages(actualRecords2); - assertEquals(1, stateMessages2.size()); - assertNotNull(stateMessages2.get(0).getData()); - assertExpectedStateMessages(stateMessages2); - assertEquals(1, recordMessages2.size()); - assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); - assertCdcMetaData(recordMessages2.get(0).getData(), false); - } - - @Test - @DisplayName("When a record is updated, produces an update record.") - void testUpdate() throws Exception { - final String updatedModel = "Explorer"; - final AutoCloseableIterator read1 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final List stateMessages1 = extractStateMessages(actualRecords1); - assertEquals(1, stateMessages1.size()); - assertNotNull(stateMessages1.get(0).getData()); - assertExpectedStateMessages(stateMessages1); - - executeQuery(String - .format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, - COL_MODEL, updatedModel, COL_ID, 11)); - - final JsonNode state = Jsons.jsonNode(stateMessages1); - final AutoCloseableIterator read2 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - final List recordMessages2 = new ArrayList<>( - extractRecordMessages(actualRecords2)); - final List stateMessages2 = extractStateMessages(actualRecords2); - assertEquals(1, stateMessages2.size()); - assertNotNull(stateMessages2.get(0).getData()); - assertExpectedStateMessages(stateMessages2); - assertEquals(1, recordMessages2.size()); - assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); - assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText()); - assertCdcMetaData(recordMessages2.get(0).getData(), true); - } - - @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) - @Test - @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") - void testRecordsProducedDuringAndAfterSync() throws Exception { - - final int recordsToCreate = 20; - final int[] recordsCreated = {0}; - // first batch of records. 20 created here and 6 created in setup method. - while (recordsCreated[0] < recordsToCreate) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 100 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated[0])); - writeModelRecord(record); - recordsCreated[0]++; - } - - final AutoCloseableIterator firstBatchIterator = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List dataFromFirstBatch = AutoCloseableIterators - .toListAndClose(firstBatchIterator); - final List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); - assertEquals(1, stateAfterFirstBatch.size()); - assertNotNull(stateAfterFirstBatch.get(0).getData()); - assertExpectedStateMessages(stateAfterFirstBatch); - final Set recordsFromFirstBatch = extractRecordMessages( - dataFromFirstBatch); - assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size()); - - // second batch of records again 20 being created - recordsCreated[0] = 0; - while (recordsCreated[0] < recordsToCreate) { - final JsonNode record = - Jsons.jsonNode(ImmutableMap - .of(COL_ID, 200 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL, - "F-" + recordsCreated[0])); - writeModelRecord(record); - recordsCreated[0]++; - } - - final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch); - final AutoCloseableIterator secondBatchIterator = getSource() - .read(getConfig(), CONFIGURED_CATALOG, state); - final List dataFromSecondBatch = AutoCloseableIterators - .toListAndClose(secondBatchIterator); - - final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); - assertEquals(1, stateAfterSecondBatch.size()); - assertNotNull(stateAfterSecondBatch.get(0).getData()); - assertExpectedStateMessages(stateAfterSecondBatch); - - final Set recordsFromSecondBatch = extractRecordMessages( - dataFromSecondBatch); - assertEquals(recordsToCreate, recordsFromSecondBatch.size(), - "Expected 20 records to be replicated in the second sync."); - - // sometimes there can be more than one of these at the end of the snapshot and just before the - // first incremental. - final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( - recordsFromFirstBatch); - final Set recordsFromSecondBatchWithoutDuplicates = removeDuplicates( - recordsFromSecondBatch); - - final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); - assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), - "Expected first sync to include records created while the test was running."); - assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, - recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates - .size()); - } - - @Test - @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") - void testCdcAndFullRefreshInSameSync() throws Exception { - final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG); - - final List MODEL_RECORDS_2 = ImmutableList.of( - Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")), - Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2"))); - - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", - columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); - - for (final JsonNode recordJson : MODEL_RECORDS_2) { - writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, - COL_MAKE_ID, COL_MODEL); - } - - final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_2", - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaType.NUMBER), - Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER), - Field.of(COL_MODEL, JsonSchemaType.STRING)) - .withSupportedSyncModes( - Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))); - airbyteStream.setSyncMode(SyncMode.FULL_REFRESH); - - final List streams = configuredCatalog.getStreams(); - streams.add(airbyteStream); - configuredCatalog.withStreams(streams); - - final AutoCloseableIterator read1 = getSource() - .read(getConfig(), configuredCatalog, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - - final Set recordMessages1 = extractRecordMessages(actualRecords1); - final List stateMessages1 = extractStateMessages(actualRecords1); - final HashSet names = new HashSet<>(STREAM_NAMES); - names.add(MODELS_STREAM_NAME + "_2"); - assertEquals(1, stateMessages1.size()); - assertNotNull(stateMessages1.get(0).getData()); - assertExpectedStateMessages(stateMessages1); - assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) - .collect(Collectors.toSet()), - recordMessages1, - Collections.singleton(MODELS_STREAM_NAME), - names); - - final JsonNode puntoRecord = Jsons - .jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto")); - writeModelRecord(puntoRecord); - - final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1)); - final AutoCloseableIterator read2 = getSource() - .read(getConfig(), configuredCatalog, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - - final Set recordMessages2 = extractRecordMessages(actualRecords2); - final List stateMessages2 = extractStateMessages(actualRecords2); - assertEquals(1, stateMessages2.size()); - assertNotNull(stateMessages2.get(0).getData()); - assertExpectedStateMessages(stateMessages2); - assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) - .collect(Collectors.toSet()), - recordMessages2, - Collections.singleton(MODELS_STREAM_NAME), - names); - } - - @Test - @DisplayName("When no records exist, no records are returned.") - void testNoData() throws Exception { - - executeQuery(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME)); - - final AutoCloseableIterator read = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List actualRecords = AutoCloseableIterators.toListAndClose(read); - - final Set recordMessages = extractRecordMessages(actualRecords); - final List stateMessages = extractStateMessages(actualRecords); - - assertExpectedRecords(Collections.emptySet(), recordMessages); - assertEquals(1, stateMessages.size()); - assertNotNull(stateMessages.get(0).getData()); - assertExpectedStateMessages(stateMessages); - } - - @Test - @DisplayName("When no changes have been made to the database since the previous sync, no records are returned.") - void testNoDataOnSecondSync() throws Exception { - final AutoCloseableIterator read1 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, null); - final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); - final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1)); - - final AutoCloseableIterator read2 = getSource() - .read(getConfig(), CONFIGURED_CATALOG, state); - final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); - - final Set recordMessages2 = extractRecordMessages(actualRecords2); - final List stateMessages2 = extractStateMessages(actualRecords2); - - assertExpectedRecords(Collections.emptySet(), recordMessages2); - assertEquals(1, stateMessages2.size()); - assertNotNull(stateMessages2.get(0).getData()); - assertExpectedStateMessages(stateMessages2); - } - - @Test - void testCheck() throws Exception { - final AirbyteConnectionStatus status = getSource().check(getConfig()); - assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED); - } - - @Test - void testDiscover() throws Exception { - final AirbyteCatalog expectedCatalog = expectedCatalogForDiscover(); - final AirbyteCatalog actualCatalog = getSource().discover(getConfig()); - - assertEquals( - expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) - .collect(Collectors.toList()), - actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) - .collect(Collectors.toList())); - } - - protected AirbyteCatalog expectedCatalogForDiscover() { - final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); - - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", - columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); - - final List streams = expectedCatalog.getStreams(); - // stream with PK - streams.get(0).setSourceDefinedCursor(true); - addCdcMetadataColumns(streams.get(0)); - - final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_2", - MODELS_SCHEMA, - Field.of(COL_ID, JsonSchemaType.INTEGER), - Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER), - Field.of(COL_MODEL, JsonSchemaType.STRING)); - streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); - streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); - addCdcMetadataColumns(streamWithoutPK); - - final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_random", - MODELS_SCHEMA + "_random", - Field.of(COL_ID + "_random", JsonSchemaType.INTEGER), - Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER), - Field.of(COL_MODEL + "_random", JsonSchemaType.STRING)) - .withSourceDefinedCursor(true) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))); - addCdcMetadataColumns(randomStream); - - streams.add(streamWithoutPK); - streams.add(randomStream); - expectedCatalog.withStreams(streams); - return expectedCatalog; - } - - protected abstract CdcTargetPosition cdcLatestTargetPosition(); - - protected abstract CdcTargetPosition extractPosition(JsonNode record); - - protected abstract void assertNullCdcMetaData(JsonNode data); - - protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull); - - protected abstract void removeCDCColumns(ObjectNode data); - - protected abstract void addCdcMetadataColumns(AirbyteStream stream); - - protected abstract Source getSource(); - - protected abstract JsonNode getConfig(); - - protected abstract Database getDatabase(); - - protected abstract void assertExpectedStateMessages(List stateMessages); - -} diff --git a/settings.gradle b/settings.gradle index 2dc801ee055..7baacf60c54 100644 --- a/settings.gradle +++ b/settings.gradle @@ -113,7 +113,6 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-integrations:bases:s3-destination-base-integration-test' include ':airbyte-integrations:bases:standard-source-test' include ':airbyte-integrations:connector-templates:generator' - include ':airbyte-integrations:bases:debezium-v1-4-2' include ':airbyte-integrations:bases:debezium-v1-9-6' // Needed by normalization integration tests