1
0
mirror of synced 2026-01-04 09:04:47 -05:00

cleanup: delete debezium 1-4-2 module (#18733)

* mssql-source:upgrade debezium version to 1.9.6

* delete debezium 1-4-2

* more improvements
This commit is contained in:
Subodh Kant Chaturvedi
2022-11-05 00:24:26 +05:30
committed by GitHub
parent e5c3f4bd0d
commit 3468516a2b
31 changed files with 0 additions and 2648 deletions

View File

@@ -1,25 +0,0 @@
plugins {
id "java-test-fixtures"
}
project.configurations {
testFixturesImplementation.extendsFrom implementation
}
dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-db:db-lib')
implementation 'io.debezium:debezium-api:1.4.2.Final'
implementation ('io.debezium:debezium-embedded:1.4.2.Final') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' }
implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final'
implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
testFixturesImplementation project(':airbyte-db:db-lib')
testFixturesImplementation project(':airbyte-integrations:bases:base-java')
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2'
}

View File

@@ -1,131 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.CompositeIterator;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
public class AirbyteDebeziumHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class);
/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
private static final int QUEUE_CAPACITY = 10000;
private final Properties connectorProperties;
private final JsonNode config;
private final CdcTargetPosition targetPosition;
private final ConfiguredAirbyteCatalog catalog;
private final boolean trackSchemaHistory;
private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition targetPosition,
final Properties connectorProperties,
final ConfiguredAirbyteCatalog catalog,
final boolean trackSchemaHistory) {
this.config = config;
this.targetPosition = targetPosition;
this.connectorProperties = connectorProperties;
this.catalog = catalog;
this.trackSchemaHistory = trackSchemaHistory;
this.queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
}
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt) {
LOGGER.info("using CDC: {}", true);
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcSavedInfoFetcher.getSavedOffset());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager = schemaHistoryManager(cdcSavedInfoFetcher);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(connectorProperties, config, catalog, offsetManager,
schemaHistoryManager);
publisher.start(queue);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetPosition,
publisher::hasClosed,
publisher::close);
// convert to airbyte message.
final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, emittedAt));
// our goal is to get the state at the time this supplier is called (i.e. after all message records
// have been produced)
final Supplier<AirbyteMessage> stateMessageSupplier = () -> {
final Map<String, String> offset = offsetManager.read();
final String dbHistory = trackSchemaHistory ? schemaHistoryManager
.orElseThrow(() -> new RuntimeException("Schema History Tracking is true but manager is not initialised")).read() : null;
return cdcStateHandler.saveState(offset, dbHistory);
};
// wrap the supplier in an iterator so that we can concat it to the message iterator.
final Iterator<AirbyteMessage> stateMessageIterator = MoreIterators.singletonIteratorFromSupplier(stateMessageSupplier);
// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.
final CompositeIterator<AirbyteMessage> messageIteratorWithStateDecorator =
AutoCloseableIterators.concatWithEagerClose(messageIterator, AutoCloseableIterators.fromIterator(stateMessageIterator));
return Collections.singletonList(messageIteratorWithStateDecorator);
}
private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final CdcSavedInfoFetcher cdcSavedInfoFetcher) {
if (trackSchemaHistory) {
FilteredFileDatabaseHistory.setDatabaseName(config.get(JdbcUtils.DATABASE_KEY).asText());
return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(cdcSavedInfoFetcher.getSavedSchemaHistory()));
}
return Optional.empty();
}
public static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}
}

View File

@@ -1,36 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* This interface is used to add metadata to the records fetched from the database. For instance, in
* Postgres we add the lsn to the records. In MySql we add the file name and position to the
* records.
*/
public interface CdcMetadataInjector {
/**
* A debezium record contains multiple pieces. Ref :
* https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events
*
* @param event is the actual record which contains data and would be written to the destination
* @param source contains the metadata about the record and we need to extract that metadata and add
* it to the event before writing it to destination
*/
void addMetaData(ObjectNode event, JsonNode source);
/**
* As part of Airbyte record we need to add the namespace (schema name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-create-events
*/
String namespace(JsonNode source);
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;
/**
* This interface is used to fetch the saved info required for debezium to run incrementally. Each
* connector saves offset and schema history in different manner
*/
public interface CdcSavedInfoFetcher {
JsonNode getSavedOffset();
Optional<JsonNode> getSavedSchemaHistory();
}

View File

@@ -1,19 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.Map;
/**
* This interface is used to allow connectors to save the offset and schema history in the manner
* which suits them
*/
@FunctionalInterface
public interface CdcStateHandler {
AirbyteMessage saveState(Map<String, String> offset, String dbHistory);
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import com.fasterxml.jackson.databind.JsonNode;
/**
* This interface is used to define the target position at the beginning of the sync so that once we
* reach the desired target, we can shutdown the sync. This is needed because it might happen that
* while we are syncing the data, new changes are being made in the source database and as a result
* we might end up syncing forever. In order to tackle that, we need to define a point to end at the
* beginning of the sync
*/
public interface CdcTargetPosition {
boolean reachedTargetPosition(JsonNode valueAsJson);
}

View File

@@ -1,144 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class handles reading and writing a debezium offset file. In many cases it is duplicating
* logic in debezium because that logic is not exposed in the public API. We mostly treat the
* contents of this state file like a black box. We know it is a Map&lt;ByteBuffer, Bytebuffer&gt;.
* We deserialize it to a Map&lt;String, String&gt; so that the state file can be human readable. If
* we ever discover that any of the contents of these offset files is not string serializable we
* will likely have to drop the human readability support and just base64 encode it.
*/
public class AirbyteFileOffsetBackingStore {
private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class);
private final Path offsetFilePath;
public AirbyteFileOffsetBackingStore(final Path offsetFilePath) {
this.offsetFilePath = offsetFilePath;
}
public Path getOffsetFilePath() {
return offsetFilePath;
}
public Map<String, String> read() {
final Map<ByteBuffer, ByteBuffer> raw = load();
return raw.entrySet().stream().collect(Collectors.toMap(
e -> byteBufferToString(e.getKey()),
e -> byteBufferToString(e.getValue())));
}
@SuppressWarnings("unchecked")
public void persist(final JsonNode cdcState) {
final Map<String, String> mapAsString =
cdcState != null ? Jsons.object(cdcState, Map.class) : Collections.emptyMap();
final Map<ByteBuffer, ByteBuffer> mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap(
e -> stringToByteBuffer(e.getKey()),
e -> stringToByteBuffer(e.getValue())));
FileUtils.deleteQuietly(offsetFilePath.toFile());
save(mappedAsStrings);
}
private static String byteBufferToString(final ByteBuffer byteBuffer) {
Preconditions.checkNotNull(byteBuffer);
return new String(byteBuffer.array(), StandardCharsets.UTF_8);
}
private static ByteBuffer stringToByteBuffer(final String s) {
Preconditions.checkNotNull(s);
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
}
/**
* See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
@SuppressWarnings("unchecked")
private Map<ByteBuffer, ByteBuffer> load() {
try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) {
// todo (cgardens) - we currently suppress a security warning for this line. use of readObject from
// untrusted sources is considered unsafe. Since the source is controlled by us in this case it
// should be safe. That said, changing this implementation to not use readObject would remove some
// headache.
final Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
final Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
final Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
for (final Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
return data;
} catch (final NoSuchFileException | EOFException e) {
// NoSuchFileException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
return Collections.emptyMap();
} catch (final IOException | ClassNotFoundException e) {
throw new ConnectException(e);
}
}
/**
* See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
private void save(final Map<ByteBuffer, ByteBuffer> data) {
try (final ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) {
final Map<byte[], byte[]> raw = new HashMap<>();
for (final Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
final byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
final byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
raw.put(key, value);
}
os.writeObject(raw);
} catch (final IOException e) {
throw new ConnectException(e);
}
}
public static AirbyteFileOffsetBackingStore initializeState(final JsonNode cdcState) {
final Path cdcWorkingDir;
try {
cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset");
} catch (final IOException e) {
throw new RuntimeException(e);
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");
final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(cdcOffsetFilePath);
offsetManager.persist(cdcState);
return offsetManager;
}
}

View File

@@ -1,149 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.HistoryRecord;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
/**
* The purpose of this class is : to , 1. Read the contents of the file {@link #path} which contains
* the schema history at the end of the sync so that it can be saved in state for future syncs.
* Check {@link #read()} 2. Write the saved content back to the file {@link #path} at the beginning
* of the sync so that debezium can function smoothly. Check persist(Optional&lt;JsonNode&gt;). To
* understand more about file, please refer {@link FilteredFileDatabaseHistory}
*/
public class AirbyteSchemaHistoryStorage {
private final Path path;
private static final Charset UTF8 = StandardCharsets.UTF_8;
private final DocumentReader reader = DocumentReader.defaultReader();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
public AirbyteSchemaHistoryStorage(final Path path) {
this.path = path;
}
public Path getPath() {
return path;
}
/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)}
*/
public String read() {
final StringBuilder fileAsString = new StringBuilder();
try {
for (final String line : Files.readAllLines(path, UTF8)) {
if (line != null && !line.isEmpty()) {
final Document record = reader.read(line);
final String recordAsString = writer.write(record);
fileAsString.append(recordAsString);
fileAsString.append(System.lineSeparator());
}
}
return fileAsString.toString();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#start()}
*/
private void makeSureFileExists() {
try {
// Make sure the file exists ...
if (!Files.exists(path)) {
// Create parent directories if we have them ...
if (path.getParent() != null) {
Files.createDirectories(path.getParent());
}
try {
Files.createFile(path);
} catch (final FileAlreadyExistsException e) {
// do nothing
}
}
} catch (final IOException e) {
throw new IllegalStateException(
"Unable to create history file at " + path + ": " + e.getMessage(), e);
}
}
public void persist(final Optional<JsonNode> schemaHistory) {
if (schemaHistory.isEmpty()) {
return;
}
final String fileAsString = Jsons.object(schemaHistory.get(), String.class);
if (fileAsString == null || fileAsString.isEmpty()) {
return;
}
FileUtils.deleteQuietly(path.toFile());
makeSureFileExists();
writeToFile(fileAsString);
}
/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)}
*
* @param fileAsString Represents the contents of the file saved in state from previous syncs
*/
private void writeToFile(final String fileAsString) {
try {
final String[] split = fileAsString.split(System.lineSeparator());
for (final String element : split) {
final Document read = reader.read(element);
final String line = writer.write(read);
try (final BufferedWriter historyWriter = Files
.newBufferedWriter(path, StandardOpenOption.APPEND)) {
try {
historyWriter.append(line);
historyWriter.newLine();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
public static AirbyteSchemaHistoryStorage initializeDBHistory(final Optional<JsonNode> schemaHistory) {
final Path dbHistoryWorkingDir;
try {
dbHistoryWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-db-history");
} catch (final IOException e) {
throw new RuntimeException(e);
}
final Path dbHistoryFilePath = dbHistoryWorkingDir.resolve("dbhistory.dat");
final AirbyteSchemaHistoryStorage schemaHistoryManager = new AirbyteSchemaHistoryStorage(dbHistoryFilePath);
schemaHistoryManager.persist(schemaHistory);
return schemaHistoryManager;
}
}

View File

@@ -1,69 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.RelationalColumn;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class DebeziumConverterUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumConverterUtils.class);
private DebeziumConverterUtils() {
throw new UnsupportedOperationException();
}
public static String convertDate(final Object input) {
/**
* While building this custom converter we were not sure what type debezium could return cause there
* is no mention of it in the documentation. Secondly if you take a look at
* {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter#converterFor(io.debezium.spi.converter.RelationalColumn, io.debezium.spi.converter.CustomConverter.ConverterRegistration)}
* method, even it is handling multiple data types but its not clear under what circumstances which
* data type would be returned. I just went ahead and handled the data types that made sense.
* Secondly, we use LocalDateTime to handle this cause it represents DATETIME datatype in JAVA
*/
if (input instanceof LocalDateTime) {
return DataTypeUtils.toISO8601String((LocalDateTime) input);
} else if (input instanceof LocalDate) {
return DataTypeUtils.toISO8601String((LocalDate) input);
} else if (input instanceof Duration) {
return DataTypeUtils.toISO8601String((Duration) input);
} else if (input instanceof Timestamp) {
return DataTypeUtils.toISO8601StringWithMicroseconds((((Timestamp) input).toInstant()));
} else if (input instanceof Number) {
return DataTypeUtils.toISO8601String(
new Timestamp(((Number) input).longValue()).toLocalDateTime());
} else if (input instanceof Date) {
return DataTypeUtils.toISO8601String((Date) input);
} else if (input instanceof String) {
try {
return LocalDateTime.parse((String) input).toString();
} catch (final DateTimeParseException e) {
LOGGER.warn("Cannot convert value '{}' to LocalDateTime type", input);
return input.toString();
}
}
LOGGER.warn("Uncovered date class type '{}'. Use default converter", input.getClass().getName());
return input.toString();
}
public static Object convertDefaultValue(RelationalColumn field) {
if (field.isOptional()) {
return null;
} else if (field.hasDefaultValue()) {
return field.defaultValue();
}
return null;
}
}

View File

@@ -1,67 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.debezium.engine.ChangeEvent;
import java.sql.Timestamp;
import java.time.Instant;
public class DebeziumEventUtils {
public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at";
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";
public static AirbyteMessage toAirbyteMessage(final ChangeEvent<String, String> event,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt) {
final JsonNode debeziumRecord = Jsons.deserialize(event.value());
final JsonNode before = debeziumRecord.get("before");
final JsonNode after = debeziumRecord.get("after");
final JsonNode source = debeziumRecord.get("source");
final JsonNode data = formatDebeziumData(before, after, source, cdcMetadataInjector);
final String schemaName = cdcMetadataInjector.namespace(source);
final String streamName = source.get("table").asText();
final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(schemaName)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}
// warning mutates input args.
private static JsonNode formatDebeziumData(final JsonNode before,
final JsonNode after,
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector) {
final ObjectNode base = (ObjectNode) (after.isNull() ? before : after);
final long transactionMillis = source.get("ts_ms").asLong();
final String transactionTimestamp = new Timestamp(transactionMillis).toInstant().toString();
base.put(CDC_UPDATED_AT, transactionTimestamp);
cdcMetadataInjector.addMetaData(base, source);
if (after.isNull()) {
base.put(CDC_DELETED_AT, transactionTimestamp);
} else {
base.put(CDC_DELETED_AT, (String) null);
}
return base;
}
}

View File

@@ -1,155 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.debezium.engine.ChangeEvent;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The record iterator is the consumer (in the producer / consumer relationship with debezium)
* responsible for 1. making sure every record produced by the record publisher is processed 2.
* signalling to the record publisher when it is time for it to stop producing records. It emits
* this signal either when the publisher had not produced a new record for a long time or when it
* has processed at least all of the records that were present in the database when the source was
* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher actually shutting down, the consumer must stay alive as long as the
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing
* any produced records before closing.
*/
public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String, String>>
implements AutoCloseableIterator<ChangeEvent<String, String>> {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);
private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(1, TimeUnit.MINUTES);
private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final CdcTargetPosition targetPosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
private boolean hasSnapshotFinished;
private boolean signalledClose;
public DebeziumRecordIterator(final LinkedBlockingQueue<ChangeEvent<String, String>> queue,
final CdcTargetPosition targetPosition,
final Supplier<Boolean> publisherStatusSupplier,
final VoidCallable requestClose) {
this.queue = queue;
this.targetPosition = targetPosition;
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
this.hasSnapshotFinished = true;
this.signalledClose = false;
}
@Override
protected ChangeEvent<String, String> computeNext() {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all messages it
// emitted.
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
final WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES;
next = queue.poll(waitTime.period, waitTime.timeUnit);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
if (next == null) {
LOGGER.info("Closing cause next is returned as null");
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}
final JsonNode eventAsJson = Jsons.deserialize(next.value());
hasSnapshotFinished = hasSnapshotFinished(eventAsJson);
// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (!signalledClose && shouldSignalClose(eventAsJson)) {
requestClose();
}
receivedFirstRecord = true;
return next;
}
return endOfData();
}
private boolean hasSnapshotFinished(final JsonNode eventAsJson) {
final SnapshotMetadata snapshot = SnapshotMetadata.valueOf(eventAsJson.get("source").get("snapshot").asText().toUpperCase());
return SnapshotMetadata.TRUE != snapshot;
}
/**
* Debezium was built as an ever running process which keeps on listening for new changes on DB and
* immediately processing them. Airbyte needs debezium to work as a start stop mechanism. In order
* to determine when to stop debezium engine we rely on few factors 1. TargetPosition logic. At the
* beginning of the sync we define a target position in the logs of the DB. This can be an LSN or
* anything specific to the DB which can help us identify that we have reached a specific position
* in the log based replication When we start processing records from debezium, we extract the the
* log position from the metadata of the record and compare it with our target that we defined at
* the beginning of the sync. If we have reached the target position, we shutdown the debezium
* engine 2. The TargetPosition logic might not always work and in order to tackle that we have
* another logic where if we do not receive records from debezium for a given duration, we ask
* debezium engine to shutdown 3. We also take the Snapshot into consideration, when a connector is
* running for the first time, we let it complete the snapshot and only after the completion of
* snapshot we should shutdown the engine. If we are closing the engine before completion of
* snapshot, we throw an exception
*/
@Override
public void close() throws Exception {
requestClose();
}
private boolean shouldSignalClose(final JsonNode eventAsJson) {
return targetPosition.reachedTargetPosition(eventAsJson);
}
private void requestClose() {
try {
requestClose.call();
signalledClose = true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
throwExceptionIfSnapshotNotFinished();
}
private void throwExceptionIfSnapshotNotFinished() {
if (!hasSnapshotFinished) {
throw new RuntimeException("Closing down debezium engine but snapshot has not finished");
}
}
private static class WaitTime {
public final int period;
public final TimeUnit timeUnit;
public WaitTime(final int period, final TimeUnit timeUnit) {
this.period = period;
this.timeUnit = timeUnit;
}
}
}

View File

@@ -1,191 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The purpose of this class is to intiliaze and spawn the debezium engine with the right properties
* to fetch records
*/
public class DebeziumRecordPublisher implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class);
private final ExecutorService executor;
private DebeziumEngine<ChangeEvent<String, String>> engine;
private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;
private final AtomicBoolean hasClosed;
private final AtomicBoolean isClosing;
private final AtomicReference<Throwable> thrownError;
private final CountDownLatch engineLatch;
private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;
public DebeziumRecordPublisher(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
this.hasClosed = new AtomicBoolean(false);
this.isClosing = new AtomicBoolean(false);
this.thrownError = new AtomicReference<>();
this.executor = Executors.newSingleThreadExecutor();
this.engineLatch = new CountDownLatch(1);
}
public void start(final Queue<ChangeEvent<String, String>> queue) {
engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties())
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
// interacts with kafka. we want to ignore it.
// more on the tombstone:
// https://debezium.io/documentation/reference/configuration/event-flattening.html
if (e.value() != null) {
boolean inserted = false;
while (!inserted) {
inserted = queue.offer(e);
}
}
})
.using((success, message, error) -> {
LOGGER.info("Debezium engine shutdown.");
thrownError.set(error);
engineLatch.countDown();
})
.build();
// Run the engine asynchronously ...
executor.execute(engine);
}
public boolean hasClosed() {
return hasClosed.get();
}
public void close() throws Exception {
if (isClosing.compareAndSet(false, true)) {
// consumers should assume records can be produced until engine has closed.
if (engine != null) {
engine.close();
}
// wait for closure before shutting down executor service
engineLatch.await(5, TimeUnit.MINUTES);
// shut down and await for thread to actually go down
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
// after the engine is completely off, we can mark this as closed
hasClosed.set(true);
if (thrownError.get() != null) {
throw new RuntimeException(thrownError.get());
}
}
}
protected Properties getDebeziumProperties() {
final Properties props = new Properties();
props.putAll(properties);
// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");
if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-file-filename
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table
// We have implemented our own implementation to filter out the schema information from other
// databases that the connector is not syncing
props.setProperty("database.history", "io.airbyte.integrations.debezium.internals.FilteredFileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
}
// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());
// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());
if (config.has(JdbcUtils.PASSWORD_KEY)) {
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
}
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");
// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);
props.setProperty("database.include.list", config.get(JdbcUtils.DATABASE_KEY).asText());
return props;
}
@VisibleForTesting
public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}
}

View File

@@ -1,149 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import io.debezium.config.Configuration;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.HistoryRecordComparator;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.function.Consumer;
/**
* MySQL Debezium connector monitors the database schema evolution over the time and stores the data
* in a database history file. Without this file we can't fetch the records from binlog. We need to
* save the contents of the file. Debezium by default uses
* {@link io.debezium.relational.history.FileDatabaseHistory} class to write the schema information
* in the file. The problem is that the Debezium tracks the schema evolution of all the tables in
* all the databases, because of that the file content can grow. In order to make sure that debezium
* tracks only the schema of the tables that are present in the database that Airbyte is syncing, we
* created this class. In the method {@link #storeRecord(HistoryRecord)}, we introduced a check to
* make sure only those records are being saved whose database name matches the database Airbyte is
* syncing. We tell debezium to use this class by passing it as property in debezium engine. Look
* for "database.history" property in {@link DebeziumRecordPublisher#getDebeziumProperties()}
* Ideally {@link FilteredFileDatabaseHistory} should have extended
* {@link io.debezium.relational.history.FileDatabaseHistory} and overridden the
* {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final
* class and can not be inherited
*/
public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory {
private final FileDatabaseHistory fileDatabaseHistory;
private static String databaseName;
public FilteredFileDatabaseHistory() {
this.fileDatabaseHistory = new FileDatabaseHistory();
}
/**
* Ideally the databaseName should have been initialized in the constructor of the class. But since
* we supply the class name to debezium and it uses reflection to construct the object of the class,
* we can't pass in the databaseName as a parameter to the constructor. That's why we had to take
* the static approach.
*
* @param databaseName Name of the database that the connector is syncing
*/
public static void setDatabaseName(final String databaseName) {
if (FilteredFileDatabaseHistory.databaseName == null) {
FilteredFileDatabaseHistory.databaseName = databaseName;
} else if (!FilteredFileDatabaseHistory.databaseName.equals(databaseName)) {
throw new RuntimeException(
"Database name has already been set : " + FilteredFileDatabaseHistory.databaseName
+ " can't set to : " + databaseName);
}
}
@Override
public void configure(final Configuration config,
final HistoryRecordComparator comparator,
final DatabaseHistoryListener listener,
final boolean useCatalogBeforeSchema) {
fileDatabaseHistory.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@Override
public void start() {
fileDatabaseHistory.start();
}
@Override
public void storeRecord(final HistoryRecord record) throws DatabaseHistoryException {
if (record == null) {
return;
}
try {
final String dbNameInRecord = record.document().getString(Fields.DATABASE_NAME);
if (databaseName != null && dbNameInRecord != null && !dbNameInRecord.equals(databaseName)) {
return;
}
/**
* We are using reflection because the method
* {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} is
* protected and can not be accessed from here
*/
final Method storeRecordMethod = fileDatabaseHistory.getClass()
.getDeclaredMethod("storeRecord", record.getClass());
storeRecordMethod.setAccessible(true);
storeRecordMethod.invoke(fileDatabaseHistory, record);
} catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public void stop() {
fileDatabaseHistory.stop();
// this is just for tests
resetDbName();
}
public static void resetDbName() {
databaseName = null;
}
@Override
protected void recoverRecords(final Consumer<HistoryRecord> records) {
try {
/**
* We are using reflection because the method
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} is protected
* and can not be accessed from here
*/
final Method recoverRecords = fileDatabaseHistory.getClass()
.getDeclaredMethod("recoverRecords", Consumer.class);
recoverRecords.setAccessible(true);
recoverRecords.invoke(fileDatabaseHistory, records);
} catch (final NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean storageExists() {
return fileDatabaseHistory.storageExists();
}
@Override
public void initializeStorage() {
fileDatabaseHistory.initializeStorage();
}
@Override
public boolean exists() {
return fileDatabaseHistory.exists();
}
@Override
public String toString() {
return fileDatabaseHistory.toString();
}
}

View File

@@ -1,185 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import com.microsoft.sqlserver.jdbc.Geography;
import com.microsoft.sqlserver.jdbc.Geometry;
import com.microsoft.sqlserver.jdbc.SQLServerException;
import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import microsoft.sql.DateTimeOffset;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MSSQLConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);
private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "SMALLDATETIME");
private final Set<String> BINARY = Set.of("VARBINARY", "BINARY");
private static final String DATETIMEOFFSET = "DATETIMEOFFSET";
private static final String TIME_TYPE = "TIME";
private static final String SMALLMONEY_TYPE = "SMALLMONEY";
private static final String GEOMETRY = "GEOMETRY";
private static final String GEOGRAPHY = "GEOGRAPHY";
private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss XXX";
@Override
public void configure(Properties props) {}
@Override
public void converterFor(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
if (DATE_TYPES.contains(field.typeName().toUpperCase())) {
registerDate(field, registration);
} else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) {
registerMoney(field, registration);
} else if (BINARY.contains(field.typeName().toUpperCase())) {
registerBinary(field, registration);
} else if (GEOMETRY.equalsIgnoreCase(field.typeName())) {
registerGeometry(field, registration);
} else if (GEOGRAPHY.equalsIgnoreCase(field.typeName())) {
registerGeography(field, registration);
} else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) {
registerTime(field, registration);
} else if (DATETIMEOFFSET.equalsIgnoreCase(field.typeName())) {
registerDateTimeOffSet(field, registration);
}
}
private void registerGeometry(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof byte[]) {
try {
return Geometry.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
LOGGER.warn("Uncovered Geometry class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
private void registerGeography(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof byte[]) {
try {
return Geography.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
LOGGER.warn("Uncovered Geography class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
private void registerDate(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
return DebeziumConverterUtils.convertDate(input);
});
}
private void registerDateTimeOffSet(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof DateTimeOffset) {
return DataTypeUtils.toISO8601String(
OffsetDateTime.parse(input.toString(),
DateTimeFormatter.ofPattern(DEBEZIUM_DATETIMEOFFSET_FORMAT)));
}
LOGGER.warn("Uncovered DateTimeOffSet class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
private void registerTime(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof Timestamp) {
return DataTypeUtils.toISOTimeString(((Timestamp) input).toLocalDateTime());
}
LOGGER.warn("Uncovered time class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
private void registerMoney(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.float64(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof BigDecimal) {
return ((BigDecimal) input).doubleValue();
}
LOGGER.warn("Uncovered money class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
private void registerBinary(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof byte[]) {
return new String((byte[]) input, Charset.defaultCharset());
}
LOGGER.warn("Uncovered binary class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}
}

View File

@@ -1,75 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a custom debezium converter used in MySQL to handle the DATETIME data type. We need a
* custom converter cause by default debezium returns the DATETIME values as numbers. We need to
* convert it to proper format. Ref :
* https://debezium.io/documentation/reference/1.4/development/converters.html This is built from
* reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you
* rename this class then remember to rename the datetime.type property value in
* io.airbyte-integrations.source.mysql.MySqlCdcProperties#getDebeziumProperties() (If you don't
* rename, a test would still fail but it might be tricky to figure out where to change the property
* name)
*/
public class MySQLConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLConverter.class);
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"};
private final String[] TEXT_TYPES = {"CHAR", "VARCHAR", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT"};
@Override
public void configure(final Properties props) {}
@Override
public void converterFor(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDate(field, registration);
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerText(field, registration);
}
}
private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (x instanceof byte[]) {
return new String((byte[]) x, StandardCharsets.UTF_8);
} else {
return x.toString();
}
});
}
/**
* The debezium driver replaces Zero-value by Null even when this column is mandatory. According to
* the doc, it should be done by driver, but it fails.
*/
private Object convertDefaultValueNullDate(final RelationalColumn field) {
final var defaultValue = DebeziumConverterUtils.convertDefaultValue(field);
return (defaultValue == null && !field.isOptional() ? DataTypeUtils.toISO8601String(LocalDate.EPOCH) : defaultValue);
}
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(),
x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x));
}
}

View File

@@ -1,134 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PostgresConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
@Override
public void configure(final Properties props) {}
@Override
public void converterFor(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDate(field, registration);
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
|| Arrays.stream(GEOMETRICS_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))
|| Arrays.stream(BIT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerText(field, registration);
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerMoney(field, registration);
}
}
private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (x instanceof byte[]) {
return new String((byte[]) x, StandardCharsets.UTF_8);
} else {
return x.toString();
}
});
}
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof PGInterval) {
return convertInterval((PGInterval) x);
} else {
return DebeziumConverterUtils.convertDate(x);
}
});
}
private String convertInterval(final PGInterval pgInterval) {
final StringBuilder resultInterval = new StringBuilder();
formatDateUnit(resultInterval, pgInterval.getYears(), " year ");
formatDateUnit(resultInterval, pgInterval.getMonths(), " mons ");
formatDateUnit(resultInterval, pgInterval.getDays(), " days ");
formatTimeValues(resultInterval, pgInterval);
return resultInterval.toString();
}
private void registerMoney(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
} else if (x instanceof Double) {
final BigDecimal result = BigDecimal.valueOf((Double) x);
if (result.compareTo(new BigDecimal("999999999999999")) == 1
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
return null;
}
return result.toString();
} else {
return x.toString();
}
});
}
private void formatDateUnit(final StringBuilder resultInterval, final int dateUnit, final String s) {
if (dateUnit != 0) {
resultInterval
.append(dateUnit)
.append(s);
}
}
private void formatTimeValues(final StringBuilder resultInterval, final PGInterval pgInterval) {
if (isNegativeTime(pgInterval)) {
resultInterval.append("-");
}
// TODO check if value more or less than Integer.MIN_VALUE Integer.MAX_VALUE,
final int hours = Math.abs(pgInterval.getHours());
final int minutes = Math.abs(pgInterval.getMinutes());
final int seconds = Math.abs(pgInterval.getWholeSeconds());
resultInterval.append(addFirstDigit(hours));
resultInterval.append(hours);
resultInterval.append(":");
resultInterval.append(addFirstDigit(minutes));
resultInterval.append(minutes);
resultInterval.append(":");
resultInterval.append(addFirstDigit(seconds));
resultInterval.append(seconds);
}
private String addFirstDigit(final int hours) {
return hours <= 9 ? "0" : "";
}
private boolean isNegativeTime(final PGInterval pgInterval) {
return pgInterval.getHours() < 0
|| pgInterval.getMinutes() < 0
|| pgInterval.getWholeSeconds() < 0;
}
}

View File

@@ -1,11 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
public enum SnapshotMetadata {
TRUE,
FALSE,
LAST
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import com.google.common.collect.Lists;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class AirbyteDebeziumHandlerTest {
@Test
public void shouldUseCdcTestShouldReturnTrue() {
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
"MODELS_STREAM_NAME",
"MODELS_SCHEMA",
Field.of("COL_ID", JsonSchemaType.NUMBER),
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
Field.of("COL_MODEL", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
.toDefaultConfiguredCatalog(catalog);
// set all streams to incremental.
configuredCatalog.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
Assertions.assertTrue(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
}
@Test
public void shouldUseCdcTestShouldReturnFalse() {
final AirbyteCatalog catalog = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
"MODELS_STREAM_NAME",
"MODELS_SCHEMA",
Field.of("COL_ID", JsonSchemaType.NUMBER),
Field.of("COL_MAKE_ID", JsonSchemaType.NUMBER),
Field.of("COL_MODEL", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("COL_ID")))));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
.toDefaultConfiguredCatalog(catalog);
Assertions.assertFalse(AirbyteDebeziumHandler.shouldUseCDC(configuredCatalog));
}
}

View File

@@ -1,50 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import org.junit.jupiter.api.Test;
class AirbyteFileOffsetBackingStoreTest {
@SuppressWarnings("UnstableApiUsage")
@Test
void test() throws IOException {
final Path testRoot = Files.createTempDirectory(Path.of("/tmp"), "offset-store-test");
final byte[] bytes = MoreResources.readBytes("test_debezium_offset.dat");
final Path templateFilePath = testRoot.resolve("template_offset.dat");
IOs.writeFile(templateFilePath, bytes);
final Path writeFilePath = testRoot.resolve("offset.dat");
final AirbyteFileOffsetBackingStore offsetStore = new AirbyteFileOffsetBackingStore(templateFilePath);
final Map<String, String> offset = offsetStore.read();
final JsonNode asJson = Jsons.jsonNode(offset);
final AirbyteFileOffsetBackingStore offsetStore2 = new AirbyteFileOffsetBackingStore(writeFilePath);
offsetStore2.persist(asJson);
final Map<String, String> stateFromOffsetStoreRoundTrip = offsetStore2.read();
// verify that, after a round trip through the offset store, we get back the same data.
assertEquals(offset, stateFromOffsetStoreRoundTrip);
// verify that the file written by the offset store is identical to the template file.
assertTrue(com.google.common.io.Files.equal(templateFilePath.toFile(), writeFilePath.toFile()));
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.debezium.engine.ChangeEvent;
import java.io.IOException;
import java.time.Instant;
import org.junit.jupiter.api.Test;
class DebeziumEventUtilsTest {
@Test
public void testConvertChangeEvent() throws IOException {
final String stream = "names";
final Instant emittedAt = Instant.now();
final CdcMetadataInjector cdcMetadataInjector = new DummyMetadataInjector();
final ChangeEvent<String, String> insertChangeEvent = mockChangeEvent("insert_change_event.json");
final ChangeEvent<String, String> updateChangeEvent = mockChangeEvent("update_change_event.json");
final ChangeEvent<String, String> deleteChangeEvent = mockChangeEvent("delete_change_event.json");
final AirbyteMessage actualInsert = DebeziumEventUtils.toAirbyteMessage(insertChangeEvent, cdcMetadataInjector, emittedAt);
final AirbyteMessage actualUpdate = DebeziumEventUtils.toAirbyteMessage(updateChangeEvent, cdcMetadataInjector, emittedAt);
final AirbyteMessage actualDelete = DebeziumEventUtils.toAirbyteMessage(deleteChangeEvent, cdcMetadataInjector, emittedAt);
final AirbyteMessage expectedInsert = createAirbyteMessage(stream, emittedAt, "insert_message.json");
final AirbyteMessage expectedUpdate = createAirbyteMessage(stream, emittedAt, "update_message.json");
final AirbyteMessage expectedDelete = createAirbyteMessage(stream, emittedAt, "delete_message.json");
deepCompare(expectedInsert, actualInsert);
deepCompare(expectedUpdate, actualUpdate);
deepCompare(expectedDelete, actualDelete);
}
private static ChangeEvent<String, String> mockChangeEvent(final String resourceName) throws IOException {
final ChangeEvent<String, String> mocked = mock(ChangeEvent.class);
final String resource = MoreResources.readResource(resourceName);
when(mocked.value()).thenReturn(resource);
return mocked;
}
private static AirbyteMessage createAirbyteMessage(final String stream, final Instant emittedAt, final String resourceName) throws IOException {
final String data = MoreResources.readResource(resourceName);
final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage()
.withStream(stream)
.withNamespace("public")
.withData(Jsons.deserialize(data))
.withEmittedAt(emittedAt.toEpochMilli());
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(recordMessage);
}
private static void deepCompare(final Object expected, final Object actual) {
assertEquals(Jsons.deserialize(Jsons.serialize(expected)), Jsons.deserialize(Jsons.serialize(actual)));
}
public static class DummyMetadataInjector implements CdcMetadataInjector {
@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final long lsn = source.get("lsn").asLong();
event.put("_ab_cdc_lsn", lsn);
}
@Override
public String namespace(final JsonNode source) {
return source.get("schema").asText();
}
}
}

View File

@@ -1,43 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.ImmutableList;
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.SyncMode;
import org.junit.jupiter.api.Test;
class DebeziumRecordPublisherTest {
@Test
public void testWhitelistCreation() {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of(
CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public").withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public").withSyncMode(SyncMode.INCREMENTAL)));
final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.n\"aMéS";
final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog);
assertEquals(expectedWhitelist, actualWhitelist);
}
@Test
public void testWhitelistFiltersFullRefresh() {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of(
CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public").withSyncMode(SyncMode.FULL_REFRESH)));
final String expectedWhitelist = "public.id_and_name";
final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog);
assertEquals(expectedWhitelist, actualWhitelist);
}
}

View File

@@ -1,102 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium.internals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.debezium.spi.converter.RelationalColumn;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class DebeziumConverterUtilsTest {
@Test
public void convertDefaultValueTest() {
final RelationalColumn relationalColumn = mock(RelationalColumn.class);
when(relationalColumn.isOptional()).thenReturn(true);
Object actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertNull(actualColumnDefaultValue, "Default value for optional relational column should be null");
when(relationalColumn.isOptional()).thenReturn(false);
when(relationalColumn.hasDefaultValue()).thenReturn(false);
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertNull(actualColumnDefaultValue);
when(relationalColumn.isOptional()).thenReturn(false);
when(relationalColumn.hasDefaultValue()).thenReturn(true);
final String expectedColumnDefaultValue = "default value";
when(relationalColumn.defaultValue()).thenReturn(expectedColumnDefaultValue);
actualColumnDefaultValue = DebeziumConverterUtils.convertDefaultValue(relationalColumn);
Assertions.assertEquals(actualColumnDefaultValue, expectedColumnDefaultValue);
}
@Test
public void convertLocalDate() {
final LocalDate localDate = LocalDate.of(2021, 1, 1);
final String actual = DebeziumConverterUtils.convertDate(localDate);
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
}
@Test
public void convertTLocalTime() {
final LocalTime localTime = LocalTime.of(8, 1, 1);
final String actual = DebeziumConverterUtils.convertDate(localTime);
Assertions.assertEquals("08:01:01", actual);
}
@Test
public void convertLocalDateTime() {
final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);
final String actual = DebeziumConverterUtils.convertDate(localDateTime);
Assertions.assertEquals("2021-01-01T08:01:01Z", actual);
}
@Test
@Disabled
public void convertDuration() {
final Duration duration = Duration.ofHours(100_000);
final String actual = DebeziumConverterUtils.convertDate(duration);
Assertions.assertEquals("1981-05-29T20:00:00Z", actual);
}
@Test
public void convertTimestamp() {
final LocalDateTime localDateTime = LocalDateTime.of(2021, 1, 1, 8, 1, 1);
final Timestamp timestamp = Timestamp.valueOf(localDateTime);
final String actual = DebeziumConverterUtils.convertDate(timestamp);
Assertions.assertEquals("2021-01-01T08:01:01.000000Z", actual);
}
@Test
@Disabled
public void convertNumber() {
final Number number = 100_000;
final String actual = DebeziumConverterUtils.convertDate(number);
Assertions.assertEquals("1970-01-01T03:01:40Z", actual);
}
@Test
public void convertStringDateFormat() {
final String stringValue = "2021-01-01T00:00:00Z";
final String actual = DebeziumConverterUtils.convertDate(stringValue);
Assertions.assertEquals("2021-01-01T00:00:00Z", actual);
}
}

View File

@@ -1,25 +0,0 @@
{
"before": {
"first_name": "san",
"last_name": "goku",
"power": null
},
"after": null,
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775646886,
"snapshot": false,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 498,
"lsn": 23012360,
"xmin": null
},
"op": "d",
"ts_ms": 1616775646931,
"transaction": null,
"destination": "orders.public.names"
}

View File

@@ -1,8 +0,0 @@
{
"first_name": "san",
"last_name": "goku",
"power": null,
"_ab_cdc_updated_at": "2021-03-26T16:20:46.886Z",
"_ab_cdc_lsn": 23012360,
"_ab_cdc_deleted_at": "2021-03-26T16:20:46.886Z"
}

View File

@@ -1,25 +0,0 @@
{
"before": null,
"after": {
"first_name": "san",
"last_name": "goku",
"power": "Infinity"
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775642623,
"snapshot": true,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 495,
"lsn": 23011544,
"xmin": null
},
"op": "r",
"ts_ms": 1616775642624,
"transaction": null,
"destination": "orders.public.names"
}

View File

@@ -1,8 +0,0 @@
{
"first_name": "san",
"last_name": "goku",
"power": "Infinity",
"_ab_cdc_updated_at": "2021-03-26T16:20:42.623Z",
"_ab_cdc_lsn": 23011544,
"_ab_cdc_deleted_at": null
}

View File

@@ -1,25 +0,0 @@
{
"before": null,
"after": {
"first_name": "san",
"last_name": "goku",
"power": 10000.2
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "orders",
"ts_ms": 1616775646881,
"snapshot": false,
"db": "db_lwfoyffqvx",
"schema": "public",
"table": "names",
"txId": 497,
"lsn": 23012216,
"xmin": null
},
"op": "u",
"ts_ms": 1616775646929,
"transaction": null,
"destination": "orders.public.names"
}

View File

@@ -1,8 +0,0 @@
{
"first_name": "san",
"last_name": "goku",
"power": 10000.2,
"_ab_cdc_updated_at": "2021-03-26T16:20:46.881Z",
"_ab_cdc_lsn": 23012216,
"_ab_cdc_deleted_at": null
}

View File

@@ -1,629 +0,0 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.debezium;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.Database;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class CdcSourceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CdcSourceTest.class);
protected static final String MODELS_SCHEMA = "models_schema";
protected static final String MODELS_STREAM_NAME = "models";
private static final Set<String> STREAM_NAMES = Sets
.newHashSet(MODELS_STREAM_NAME);
protected static final String COL_ID = "id";
protected static final String COL_MAKE_ID = "make_id";
protected static final String COL_MODEL = "model";
protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))));
protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers
.toDefaultConfiguredCatalog(CATALOG);
// set all streams to incremental.
static {
CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
}
private static final List<JsonNode> MODEL_RECORDS = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350")));
protected void setup() throws SQLException {
createAndPopulateTables();
}
private void createAndPopulateTables() {
createAndPopulateActualTable();
createAndPopulateRandomTable();
}
protected void executeQuery(final String query) {
try {
getDatabase().query(
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
public String columnClause(final Map<String, String> columnsWithDataType, final Optional<String> primaryKey) {
final StringBuilder columnClause = new StringBuilder();
int i = 0;
for (final Map.Entry<String, String> column : columnsWithDataType.entrySet()) {
columnClause.append(column.getKey());
columnClause.append(" ");
columnClause.append(column.getValue());
if (i < (columnsWithDataType.size() - 1)) {
columnClause.append(",");
columnClause.append(" ");
}
i++;
}
primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")"));
return columnClause.toString();
}
public void createTable(final String schemaName, final String tableName, final String columnClause) {
executeQuery(createTableQuery(schemaName, tableName, columnClause));
}
public String createTableQuery(final String schemaName, final String tableName, final String columnClause) {
return String.format("CREATE TABLE %s.%s(%s);", schemaName, tableName, columnClause);
}
public void createSchema(final String schemaName) {
executeQuery(createSchemaQuery(schemaName));
}
public String createSchemaQuery(final String schemaName) {
return "CREATE DATABASE " + schemaName + ";";
}
private void createAndPopulateActualTable() {
createSchema(MODELS_SCHEMA);
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME,
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
for (final JsonNode recordJson : MODEL_RECORDS) {
writeModelRecord(recordJson);
}
}
/**
* This database and table is not part of Airbyte sync. It is being created just to make sure the
* databases not being synced by Airbyte are not causing issues with our debezium logic
*/
private void createAndPopulateRandomTable() {
createSchema(MODELS_SCHEMA + "_random");
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Fiesta-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Focus-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Ranger-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"GLA-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"A 220-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"E 350-random")));
for (final JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
}
}
private void writeModelRecord(final JsonNode recordJson) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL);
}
private void writeRecords(
final JsonNode recordJson,
final String dbName,
final String streamName,
final String idCol,
final String makeIdCol,
final String modelCol) {
executeQuery(
String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName,
idCol, makeIdCol, modelCol,
recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(),
recordJson.get(modelCol).asText()));
}
private static Set<AirbyteRecordMessage> removeDuplicates(final Set<AirbyteRecordMessage> messages) {
final Set<JsonNode> existingDataRecordsWithoutUpdated = new HashSet<>();
final Set<AirbyteRecordMessage> output = new HashSet<>();
for (final AirbyteRecordMessage message : messages) {
final ObjectNode node = message.getData().deepCopy();
node.remove("_ab_cdc_updated_at");
if (existingDataRecordsWithoutUpdated.contains(node)) {
LOGGER.info("Removing duplicate node: " + node);
} else {
output.add(message);
existingDataRecordsWithoutUpdated.add(node);
}
}
return output;
}
protected Set<AirbyteRecordMessage> extractRecordMessages(final List<AirbyteMessage> messages) {
final List<AirbyteRecordMessage> recordMessageList = messages
.stream()
.filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord)
.collect(Collectors.toList());
final Set<AirbyteRecordMessage> recordMessageSet = new HashSet<>(recordMessageList);
assertEquals(recordMessageList.size(), recordMessageSet.size(),
"Expected no duplicates in airbyte record message output for a single sync.");
return recordMessageSet;
}
private List<AirbyteStateMessage> extractStateMessages(final List<AirbyteMessage> messages) {
return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState)
.collect(Collectors.toList());
}
private void assertExpectedRecords(final Set<JsonNode> expectedRecords, final Set<AirbyteRecordMessage> actualRecords) {
// assume all streams are cdc.
assertExpectedRecords(expectedRecords, actualRecords, actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet()));
}
private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams) {
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES);
}
private void assertExpectedRecords(final Set<JsonNode> expectedRecords,
final Set<AirbyteRecordMessage> actualRecords,
final Set<String> cdcStreams,
final Set<String> streamNames) {
final Set<JsonNode> actualData = actualRecords
.stream()
.map(recordMessage -> {
assertTrue(streamNames.contains(recordMessage.getStream()));
assertNotNull(recordMessage.getEmittedAt());
assertEquals(MODELS_SCHEMA, recordMessage.getNamespace());
final JsonNode data = recordMessage.getData();
if (cdcStreams.contains(recordMessage.getStream())) {
assertCdcMetaData(data, true);
} else {
assertNullCdcMetaData(data);
}
removeCDCColumns((ObjectNode) data);
return data;
})
.collect(Collectors.toSet());
assertEquals(expectedRecords, actualData);
}
@Test
@DisplayName("On the first sync, produce returns records that exist in the database.")
void testExistingData() throws Exception {
final CdcTargetPosition targetPosition = cdcLatestTargetPosition();
final AutoCloseableIterator<AirbyteMessage> read = getSource().read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);
assertNotNull(targetPosition);
recordMessages.forEach(record -> {
assertEquals(extractPosition(record.getData()), targetPosition);
});
assertExpectedRecords(new HashSet<>(MODEL_RECORDS), recordMessages);
assertEquals(1, stateMessages.size());
assertNotNull(stateMessages.get(0).getData());
assertExpectedStateMessages(stateMessages);
}
@Test
@DisplayName("When a record is deleted, produces a deletion record.")
void testDelete() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertEquals(1, stateMessages1.size());
assertNotNull(stateMessages1.get(0).getData());
assertExpectedStateMessages(stateMessages1);
executeQuery(String
.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID,
11));
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteRecordMessage> recordMessages2 = new ArrayList<>(
extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertEquals(1, stateMessages2.size());
assertNotNull(stateMessages2.get(0).getData());
assertExpectedStateMessages(stateMessages2);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertCdcMetaData(recordMessages2.get(0).getData(), false);
}
@Test
@DisplayName("When a record is updated, produces an update record.")
void testUpdate() throws Exception {
final String updatedModel = "Explorer";
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertEquals(1, stateMessages1.size());
assertNotNull(stateMessages1.get(0).getData());
assertExpectedStateMessages(stateMessages1);
executeQuery(String
.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME,
COL_MODEL, updatedModel, COL_ID, 11));
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteRecordMessage> recordMessages2 = new ArrayList<>(
extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertEquals(1, stateMessages2.size());
assertNotNull(stateMessages2.get(0).getData());
assertExpectedStateMessages(stateMessages2);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText());
assertCdcMetaData(recordMessages2.get(0).getData(), true);
}
@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
@DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.")
void testRecordsProducedDuringAndAfterSync() throws Exception {
final int recordsToCreate = 20;
final int[] recordsCreated = {0};
// first batch of records. 20 created here and 6 created in setup method.
while (recordsCreated[0] < recordsToCreate) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated[0]));
writeModelRecord(record);
recordsCreated[0]++;
}
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());
assertNotNull(stateAfterFirstBatch.get(0).getData());
assertExpectedStateMessages(stateAfterFirstBatch);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size());
// second batch of records again 20 being created
recordsCreated[0] = 0;
while (recordsCreated[0] < recordsToCreate) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated[0]));
writeModelRecord(record);
recordsCreated[0]++;
}
final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch);
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(1, stateAfterSecondBatch.size());
assertNotNull(stateAfterSecondBatch.get(0).getData());
assertExpectedStateMessages(stateAfterSecondBatch);
final Set<AirbyteRecordMessage> recordsFromSecondBatch = extractRecordMessages(
dataFromSecondBatch);
assertEquals(recordsToCreate, recordsFromSecondBatch.size(),
"Expected 20 records to be replicated in the second sync.");
// sometimes there can be more than one of these at the end of the snapshot and just before the
// first incremental.
final Set<AirbyteRecordMessage> recordsFromFirstBatchWithoutDuplicates = removeDuplicates(
recordsFromFirstBatch);
final Set<AirbyteRecordMessage> recordsFromSecondBatchWithoutDuplicates = removeDuplicates(
recordsFromSecondBatch);
final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(),
"Expected first sync to include records created while the test was running.");
assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount,
recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates
.size());
}
@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
void testCdcAndFullRefreshInSameSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<JsonNode> MODEL_RECORDS_2 = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
for (final JsonNode recordJson : MODEL_RECORDS_2) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
COL_MAKE_ID, COL_MODEL);
}
final ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))));
airbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
final List<ConfiguredAirbyteStream> streams = configuredCatalog.getStreams();
streams.add(airbyteStream);
configuredCatalog.withStreams(streams);
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
final HashSet<String> names = new HashSet<>(STREAM_NAMES);
names.add(MODELS_STREAM_NAME + "_2");
assertEquals(1, stateMessages1.size());
assertNotNull(stateMessages1.get(0).getData());
assertExpectedStateMessages(stateMessages1);
assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
recordMessages1,
Collections.singleton(MODELS_STREAM_NAME),
names);
final JsonNode puntoRecord = Jsons
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
writeModelRecord(puntoRecord);
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertEquals(1, stateMessages2.size());
assertNotNull(stateMessages2.get(0).getData());
assertExpectedStateMessages(stateMessages2);
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
.collect(Collectors.toSet()),
recordMessages2,
Collections.singleton(MODELS_STREAM_NAME),
names);
}
@Test
@DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {
executeQuery(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME));
final AutoCloseableIterator<AirbyteMessage> read = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);
assertExpectedRecords(Collections.emptySet(), recordMessages);
assertEquals(1, stateMessages.size());
assertNotNull(stateMessages.get(0).getData());
assertExpectedStateMessages(stateMessages);
}
@Test
@DisplayName("When no changes have been made to the database since the previous sync, no records are returned.")
void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedRecords(Collections.emptySet(), recordMessages2);
assertEquals(1, stateMessages2.size());
assertNotNull(stateMessages2.get(0).getData());
assertExpectedStateMessages(stateMessages2);
}
@Test
void testCheck() throws Exception {
final AirbyteConnectionStatus status = getSource().check(getConfig());
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED);
}
@Test
void testDiscover() throws Exception {
final AirbyteCatalog expectedCatalog = expectedCatalogForDiscover();
final AirbyteCatalog actualCatalog = getSource().discover(getConfig());
assertEquals(
expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName))
.collect(Collectors.toList()),
actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName))
.collect(Collectors.toList()));
}
protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty()));
final List<AirbyteStream> streams = expectedCatalog.getStreams();
// stream with PK
streams.get(0).setSourceDefinedCursor(true);
addCdcMetadataColumns(streams.get(0));
final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcMetadataColumns(streamWithoutPK);
final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")));
addCdcMetadataColumns(randomStream);
streams.add(streamWithoutPK);
streams.add(randomStream);
expectedCatalog.withStreams(streams);
return expectedCatalog;
}
protected abstract CdcTargetPosition cdcLatestTargetPosition();
protected abstract CdcTargetPosition extractPosition(JsonNode record);
protected abstract void assertNullCdcMetaData(JsonNode data);
protected abstract void assertCdcMetaData(JsonNode data, boolean deletedAtNull);
protected abstract void removeCDCColumns(ObjectNode data);
protected abstract void addCdcMetadataColumns(AirbyteStream stream);
protected abstract Source getSource();
protected abstract JsonNode getConfig();
protected abstract Database getDatabase();
protected abstract void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages);
}

View File

@@ -113,7 +113,6 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD"
include ':airbyte-integrations:bases:s3-destination-base-integration-test'
include ':airbyte-integrations:bases:standard-source-test'
include ':airbyte-integrations:connector-templates:generator'
include ':airbyte-integrations:bases:debezium-v1-4-2'
include ':airbyte-integrations:bases:debezium-v1-9-6'
// Needed by normalization integration tests