1
0
mirror of synced 2025-12-23 21:03:15 -05:00

introduce common abstraction for CDC via debezium (#4580)

* wip

* add file

* final structure

* few more updates

* undo unwanted changes

* add abstract test + more refinement

* remove CDC metadata to debezium

* rename class + add missing property

* move debezium to bases + upgrade debezium version + review comments

* downgrade version + minor fixes

* reset to minutes

* fix build

* address review comments

* should return Optional

* use common abstraction for CDC via debezium for mysql (#4604)

* use new cdc abstraction for mysql

* undo wanted change

* pull in latest changes

* use renamed class + move constants to MySqlSource

* bring in latest changes from cdc abstraction

* format

* bring in latest changes

* pull in latest changes

* use common abstraction for CDC via debezium for postgres (#4607)

* use cdc abstraction for postgres

* add files

* ready

* use renamed class + move constants to PostgresSource

* bring in the latest changes

* bring in latest changes

* pull in latest changes
This commit is contained in:
Subodh Kant Chaturvedi
2021-07-12 22:39:38 +05:30
committed by GitHub
parent 078de483a1
commit 4863ea1a93
47 changed files with 1973 additions and 2024 deletions

View File

@@ -11,16 +11,15 @@ application {
dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation 'io.debezium:debezium-api:1.4.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final'
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
implementation 'mysql:mysql-connector-java:8.0.22'
implementation 'org.apache.commons:commons-lang3:3.11'
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:mysql:1.15.1'

View File

@@ -1,178 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class handles reading and writing a debezium offset file. In many cases it is duplicating
* logic in debezium because that logic is not exposed in the public API. We mostly treat the
* contents of this state file like a black box. We know it is a Map<ByteBuffer, Bytebuffer>. We
* deserialize it to a Map<String, String> so that the state file can be human readable. If we ever
* discover that any of the contents of these offset files is not string serializable we will likely
* have to drop the human readability support and just base64 encode it.
*/
public class AirbyteFileOffsetBackingStore {
private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class);
private final Path offsetFilePath;
public AirbyteFileOffsetBackingStore(final Path offsetFilePath) {
this.offsetFilePath = offsetFilePath;
}
public Path getOffsetFilePath() {
return offsetFilePath;
}
public CdcState read() {
final Map<ByteBuffer, ByteBuffer> raw = load();
final Map<String, String> mappedAsStrings = raw.entrySet().stream().collect(Collectors.toMap(
e -> byteBufferToString(e.getKey()),
e -> byteBufferToString(e.getValue())));
final JsonNode asJson = Jsons.jsonNode(mappedAsStrings);
LOGGER.info("debezium state: {}", asJson);
return new CdcState().withState(asJson);
}
public Map<String, String> readMap() {
final Map<ByteBuffer, ByteBuffer> raw = load();
return raw.entrySet().stream().collect(Collectors.toMap(
e -> byteBufferToString(e.getKey()),
e -> byteBufferToString(e.getValue())));
}
@SuppressWarnings("unchecked")
public void persist(CdcState cdcState) {
final Map<String, String> mapAsString =
cdcState != null && cdcState.getState() != null ? Jsons.object(cdcState.getState().get(MYSQL_CDC_OFFSET), Map.class) : Collections.emptyMap();
final Map<ByteBuffer, ByteBuffer> mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap(
e -> stringToByteBuffer(e.getKey()),
e -> stringToByteBuffer(e.getValue())));
FileUtils.deleteQuietly(offsetFilePath.toFile());
save(mappedAsStrings);
}
private static String byteBufferToString(ByteBuffer byteBuffer) {
Preconditions.checkNotNull(byteBuffer);
return new String(byteBuffer.array(), StandardCharsets.UTF_8);
}
private static ByteBuffer stringToByteBuffer(String s) {
Preconditions.checkNotNull(s);
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
}
/**
* See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
@SuppressWarnings("unchecked")
private Map<ByteBuffer, ByteBuffer> load() {
try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) {
final Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
final Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
final Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
return data;
} catch (NoSuchFileException | EOFException e) {
// NoSuchFileException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
return Collections.emptyMap();
} catch (IOException | ClassNotFoundException e) {
throw new ConnectException(e);
}
}
/**
* See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
private void save(Map<ByteBuffer, ByteBuffer> data) {
try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) {
Map<byte[], byte[]> raw = new HashMap<>();
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
raw.put(key, value);
}
os.writeObject(raw);
} catch (IOException e) {
throw new ConnectException(e);
}
}
static AirbyteFileOffsetBackingStore initializeState(StateManager stateManager) {
final Path cdcWorkingDir;
try {
cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset");
} catch (IOException e) {
throw new RuntimeException(e);
}
final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat");
final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore(
cdcOffsetFilePath);
offsetManager.persist(stateManager.getCdcStateManager().getCdcState());
return offsetManager;
}
}

View File

@@ -1,169 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.HistoryRecord;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
/**
* The purpose of this class is : to , 1. Read the contents of the file {@link #path} at the end of
* the sync so that it can be saved in state for future syncs. Check {@link #read()} 2. Write the
* saved content back to the file {@link #path} at the beginning of the sync so that debezium can
* function smoothly. Check {@link #persist(CdcState)}. To understand more about file, please refer
* {@link FilteredFileDatabaseHistory}
*/
public class AirbyteSchemaHistoryStorage {
private final Path path;
private static final Charset UTF8 = StandardCharsets.UTF_8;
private final DocumentReader reader = DocumentReader.defaultReader();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
public AirbyteSchemaHistoryStorage(final Path path) {
this.path = path;
}
public Path getPath() {
return path;
}
/**
* This implementation is is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)}
*/
public String read() {
StringBuilder fileAsString = new StringBuilder();
try {
for (String line : Files.readAllLines(path, UTF8)) {
if (line != null && !line.isEmpty()) {
Document record = reader.read(line);
String recordAsString = writer.write(record);
fileAsString.append(recordAsString);
fileAsString.append(System.lineSeparator());
}
}
return fileAsString.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* This implementation is is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#start()}
*/
private void makeSureFileExists() {
try {
// Make sure the file exists ...
if (!Files.exists(path)) {
// Create parent directories if we have them ...
if (path.getParent() != null) {
Files.createDirectories(path.getParent());
}
try {
Files.createFile(path);
} catch (FileAlreadyExistsException e) {
// do nothing
}
}
} catch (IOException e) {
throw new IllegalStateException(
"Unable to create history file at " + path + ": " + e.getMessage(), e);
}
}
public void persist(CdcState cdcState) {
String fileAsString = cdcState != null && cdcState.getState() != null ? Jsons
.object(cdcState.getState().get(MYSQL_DB_HISTORY), String.class) : null;
if (fileAsString == null || fileAsString.isEmpty()) {
return;
}
FileUtils.deleteQuietly(path.toFile());
makeSureFileExists();
writeToFile(fileAsString);
}
/**
* This implementation is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)}
*
* @param fileAsString Represents the contents of the file saved in state from previous syncs
*/
private void writeToFile(String fileAsString) {
try {
String[] split = fileAsString.split(System.lineSeparator());
for (String element : split) {
Document read = reader.read(element);
String line = writer.write(read);
try (BufferedWriter historyWriter = Files
.newBufferedWriter(path, StandardOpenOption.APPEND)) {
try {
historyWriter.append(line);
historyWriter.newLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static AirbyteSchemaHistoryStorage initializeDBHistory(StateManager stateManager) {
final Path dbHistoryWorkingDir;
try {
dbHistoryWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-db-history");
} catch (IOException e) {
throw new RuntimeException(e);
}
final Path dbHistoryFilePath = dbHistoryWorkingDir.resolve("dbhistory.dat");
final AirbyteSchemaHistoryStorage schemaHistoryManager = new AirbyteSchemaHistoryStorage(dbHistoryFilePath);
schemaHistoryManager.persist(stateManager.getCdcStateManager().getCdcState());
return schemaHistoryManager;
}
}

View File

@@ -1,82 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_DELETED_AT;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_UPDATED_AT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.debezium.engine.ChangeEvent;
import java.time.Instant;
public class DebeziumEventUtils {
public static AirbyteMessage toAirbyteMessage(ChangeEvent<String, String> event, Instant emittedAt) {
final JsonNode debeziumRecord = Jsons.deserialize(event.value());
final JsonNode before = debeziumRecord.get("before");
final JsonNode after = debeziumRecord.get("after");
final JsonNode source = debeziumRecord.get("source");
final JsonNode data = formatDebeziumData(before, after, source);
final String schemaName = source.get("db").asText();
final String streamName = source.get("table").asText();
final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(schemaName)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}
// warning mutates input args.
private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) {
final ObjectNode base = (ObjectNode) (after.isNull() ? before : after);
long transactionMillis = source.get("ts_ms").asLong();
base.put(CDC_UPDATED_AT, transactionMillis);
base.put(CDC_LOG_FILE, source.get("file").asText());
base.put(CDC_LOG_POS, source.get("pos").asLong());
if (after.isNull()) {
base.put(CDC_DELETED_AT, transactionMillis);
} else {
base.put("_ab_cdc_deleted_at", (Long) null);
}
return base;
}
}

View File

@@ -1,165 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.debezium.engine.ChangeEvent;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The record iterator is the consumer (in the producer / consumer relationship with debezium)
* responsible for 1. making sure every record produced by the record publisher is processed 2.
* signalling to the record publisher when it is time for it to stop producing records. It emits
* this signal either when the publisher had not produced a new record for a long time or when it
* has processed at least all of the records that were present in the database when the source was
* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher actually shutting down, the consumer must stay alive as long as the
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing
* any produced records before closing.
*/
public class DebeziumRecordIterator extends AbstractIterator<ChangeEvent<String, String>>
implements AutoCloseableIterator<ChangeEvent<String, String>> {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class);
private static final WaitTime FIRST_RECORD_WAIT_TIME_MINUTES = new WaitTime(5, TimeUnit.MINUTES);
private static final WaitTime SUBSEQUENT_RECORD_WAIT_TIME_SECONDS = new WaitTime(5, TimeUnit.SECONDS);
private final LinkedBlockingQueue<ChangeEvent<String, String>> queue;
private final Optional<TargetFilePosition> targetFilePosition;
private final Supplier<Boolean> publisherStatusSupplier;
private final VoidCallable requestClose;
private boolean receivedFirstRecord;
public DebeziumRecordIterator(LinkedBlockingQueue<ChangeEvent<String, String>> queue,
Optional<TargetFilePosition> targetFilePosition,
Supplier<Boolean> publisherStatusSupplier,
VoidCallable requestClose) {
this.queue = queue;
this.targetFilePosition = targetFilePosition;
this.publisherStatusSupplier = publisherStatusSupplier;
this.requestClose = requestClose;
this.receivedFirstRecord = false;
}
@Override
protected ChangeEvent<String, String> computeNext() {
// keep trying until the publisher is closed or until the queue is empty. the latter case is
// possible when the publisher has shutdown but the consumer has not yet processed all messages it
// emitted.
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
final ChangeEvent<String, String> next;
try {
WaitTime waitTime = receivedFirstRecord ? SUBSEQUENT_RECORD_WAIT_TIME_SECONDS : FIRST_RECORD_WAIT_TIME_MINUTES;
next = queue.poll(waitTime.period, waitTime.timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// if within the timeout, the consumer could not get a record, it is time to tell the producer to
// shutdown.
if (next == null) {
requestClose();
LOGGER.info("no record found. polling again.");
continue;
}
// if the last record matches the target file position, it is time to tell the producer to shutdown.
if (shouldSignalClose(next)) {
requestClose();
}
receivedFirstRecord = true;
return next;
}
return endOfData();
}
@Override
public void close() throws Exception {
requestClose.call();
}
private boolean shouldSignalClose(ChangeEvent<String, String> event) {
if (targetFilePosition.isEmpty()) {
return false;
}
JsonNode valueAsJson = Jsons.deserialize(event.value());
String file = valueAsJson.get("source").get("file").asText();
int position = valueAsJson.get("source").get("pos").asInt();
boolean isSnapshot = SnapshotMetadata.TRUE == SnapshotMetadata.valueOf(
valueAsJson.get("source").get("snapshot").asText().toUpperCase());
if (isSnapshot || targetFilePosition.get().fileName.compareTo(file) > 0
|| (targetFilePosition.get().fileName.compareTo(file) == 0 && targetFilePosition.get().position >= position)) {
return false;
}
LOGGER.info(
"Signalling close because record's binlog file : " + file + " , position : " + position
+ " is after target file : "
+ targetFilePosition.get().fileName + " , target position : " + targetFilePosition
.get().position);
return true;
}
private void requestClose() {
try {
requestClose.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
enum SnapshotMetadata {
TRUE,
FALSE,
LAST
}
private static class WaitTime {
public final int period;
public final TimeUnit timeUnit;
public WaitTime(int period, TimeUnit timeUnit) {
this.period = period;
this.timeUnit = timeUnit;
}
}
}

View File

@@ -1,223 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DebeziumRecordPublisher implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class);
private final ExecutorService executor;
private DebeziumEngine<ChangeEvent<String, String>> engine;
private final JsonNode config;
private final ConfiguredAirbyteCatalog catalog;
private final AirbyteFileOffsetBackingStore offsetManager;
private final AirbyteSchemaHistoryStorage schemaHistoryManager;
private final AtomicBoolean hasClosed;
private final AtomicBoolean isClosing;
private final AtomicReference<Throwable> thrownError;
private final CountDownLatch engineLatch;
public DebeziumRecordPublisher(JsonNode config,
ConfiguredAirbyteCatalog catalog,
AirbyteFileOffsetBackingStore offsetManager,
AirbyteSchemaHistoryStorage schemaHistoryManager) {
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
this.hasClosed = new AtomicBoolean(false);
this.isClosing = new AtomicBoolean(false);
this.thrownError = new AtomicReference<>();
this.executor = Executors.newSingleThreadExecutor();
this.engineLatch = new CountDownLatch(1);
}
public void start(Queue<ChangeEvent<String, String>> queue) {
engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties(config, catalog, offsetManager))
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
// interacts with kafka. we want to ignore it.
// more on the tombstone:
// https://debezium.io/documentation/reference/configuration/event-flattening.html
if (e.value() != null) {
boolean inserted = false;
while (!inserted) {
inserted = queue.offer(e);
if (!inserted) {
try {
Thread.sleep(10);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
}
}
}
})
.using((success, message, error) -> {
LOGGER.info("Debezium engine shutdown.");
thrownError.set(error);
engineLatch.countDown();
})
.build();
// Run the engine asynchronously ...
executor.execute(engine);
}
public boolean hasClosed() {
return hasClosed.get();
}
public void close() throws Exception {
if (isClosing.compareAndSet(false, true)) {
// consumers should assume records can be produced until engine has closed.
if (engine != null) {
engine.close();
}
// wait for closure before shutting down executor service
engineLatch.await(5, TimeUnit.MINUTES);
// shut down and await for thread to actually go down
executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
// after the engine is completely off, we can mark this as closed
hasClosed.set(true);
if (thrownError.get() != null) {
throw new RuntimeException(thrownError.get());
}
}
}
protected Properties getDebeziumProperties(JsonNode config,
ConfiguredAirbyteCatalog catalog,
AirbyteFileOffsetBackingStore offsetManager) {
final Properties props = new Properties();
// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// https://debezium.io/documentation/reference/connectors/mysql.html#mysql-boolean-values
props.setProperty("converters", "boolean");
props.setProperty("boolean.type",
"io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/connectors/mysql.html#mysql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");
// snapshot config
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "initial");
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
// if any database client makes a schema change then the sync might break
props.setProperty("snapshot.locking.mode", "none");
// https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-file-filename
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table
// We have implemented our own implementation to filter out the schema information from other
// databases that the connector is not syncing
props.setProperty("database.history",
"io.airbyte.integrations.source.mysql.FilteredFileDatabaseHistory");
props.setProperty("database.history.file.filename",
schemaHistoryManager.getPath().toString());
// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// debezium names
props.setProperty("name", config.get("database").asText());
props.setProperty("database.server.name", config.get("database").asText());
// db connection configuration
props.setProperty("database.hostname", config.get("host").asText());
props.setProperty("database.port", config.get("port").asText());
props.setProperty("database.user", config.get("username").asText());
props.setProperty("database.dbname", config.get("database").asText());
if (config.has("password")) {
props.setProperty("database.password", config.get("password").asText());
}
// table selection
final String tableWhitelist = getTableWhitelist(catalog, config);
props.setProperty("table.include.list", tableWhitelist);
props.setProperty("database.include.list", config.get("database").asText());
return props;
}
private static String getTableWhitelist(ConfiguredAirbyteCatalog catalog, JsonNode config) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> config.get("database").asText() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}
}

View File

@@ -1,168 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.debezium.config.Configuration;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.HistoryRecordComparator;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.function.Consumer;
/**
* MySQL Debezium connector monitors the database schema evolution over the time and stores the data
* in a database history file. Without this file we can't fetch the records from binlog. We need to
* save the contents of the file. Debezium by default uses
* {@link io.debezium.relational.history.FileDatabaseHistory} class to write the schema information
* in the file. The problem is that the Debezium tracks the schema evolution of all the tables in
* all the databases, because of that the file content can grow. In order to make sure that debezium
* tracks only the schema of the tables that are present in the database that Airbyte is syncing, we
* created this class. In the method {@link #storeRecord(HistoryRecord)}, we introduced a check to
* make sure only those records are being saved whose database name matches the database Airbyte is
* syncing. We tell debezium to use this class by passing it as property in debezium engine. Look
* for "database.history" property in
* {@link DebeziumRecordPublisher#getDebeziumProperties(JsonNode, ConfiguredAirbyteCatalog, AirbyteFileOffsetBackingStore)}
* Ideally {@link FilteredFileDatabaseHistory} should have extended
* {@link io.debezium.relational.history.FileDatabaseHistory} and overridden the
* {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final
* class and can not be inherited
*/
public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory {
private final FileDatabaseHistory fileDatabaseHistory;
private static String databaseName;
public FilteredFileDatabaseHistory() {
this.fileDatabaseHistory = new FileDatabaseHistory();
}
/**
* Ideally the databaseName should have been initialized in the constructor of the class. But since
* we supply the class name to debezium and it uses reflection to construct the object of the class,
* we can't pass in the databaseName as a parameter to the constructor. That's why we had to take
* the static approach.
*
* @param databaseName Name of the database that the connector is syncing
*/
static void setDatabaseName(String databaseName) {
if (FilteredFileDatabaseHistory.databaseName == null) {
FilteredFileDatabaseHistory.databaseName = databaseName;
} else if (!FilteredFileDatabaseHistory.databaseName.equals(databaseName)) {
throw new RuntimeException(
"Database name has already been set : " + FilteredFileDatabaseHistory.databaseName
+ " can't set to : " + databaseName);
}
}
@Override
public void configure(Configuration config,
HistoryRecordComparator comparator,
DatabaseHistoryListener listener,
boolean useCatalogBeforeSchema) {
fileDatabaseHistory.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@Override
public void start() {
fileDatabaseHistory.start();
}
@Override
public void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (record == null) {
return;
}
try {
String dbNameInRecord = record.document().getString(Fields.DATABASE_NAME);
if (databaseName != null && dbNameInRecord != null && !dbNameInRecord.equals(databaseName)) {
return;
}
/**
* We are using reflection because the method
* {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} is
* protected and can not be accessed from here
*/
final Method storeRecordMethod = fileDatabaseHistory.getClass()
.getDeclaredMethod("storeRecord", record.getClass());
storeRecordMethod.setAccessible(true);
storeRecordMethod.invoke(fileDatabaseHistory, record);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public void stop() {
fileDatabaseHistory.stop();
// this is just for tests
databaseName = null;
}
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
try {
/**
* We are using reflection because the method
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} is protected
* and can not be accessed from here
*/
final Method recoverRecords = fileDatabaseHistory.getClass()
.getDeclaredMethod("recoverRecords", Consumer.class);
recoverRecords.setAccessible(true);
recoverRecords.invoke(fileDatabaseHistory, records);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean storageExists() {
return fileDatabaseHistory.storageExists();
}
@Override
public void initializeStorage() {
fileDatabaseHistory.initializeStorage();
}
@Override
public boolean exists() {
return fileDatabaseHistory.exists();
}
@Override
public String toString() {
return fileDatabaseHistory.toString();
}
}

View File

@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
public class MySqlCdcConnectorMetadataInjector implements CdcMetadataInjector {
@Override
public void addMetaData(ObjectNode event, JsonNode source) {
event.put(CDC_LOG_FILE, source.get("file").asText());
event.put(CDC_LOG_POS, source.get("pos").asLong());
}
@Override
public String namespace(JsonNode source) {
return source.get("db").asText();
}
}

View File

@@ -0,0 +1,55 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import java.util.Properties;
public class MySqlCdcProperties {
static Properties getDebeziumProperties() {
final Properties props = new Properties();
// debezium engine configuration
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
// https://debezium.io/documentation/reference/connectors/mysql.html#mysql-boolean-values
props.setProperty("converters", "boolean");
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
// snapshot config
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "initial");
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while Airbyte is
// taking a snapshot. There is a risk involved that
// if any database client makes a schema change then the sync might break
props.setProperty("snapshot.locking.mode", "none");
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
return props;
}
}

View File

@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.CdcSavedInfoFetcher;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import java.util.Optional;
public class MySqlCdcSavedInfoFetcher implements CdcSavedInfoFetcher {
private final JsonNode savedOffset;
private final JsonNode savedSchemaHistory;
protected MySqlCdcSavedInfoFetcher(CdcState savedState) {
final boolean savedStatePresent = savedState != null && savedState.getState() != null;
this.savedOffset = savedStatePresent ? savedState.getState().get(MYSQL_CDC_OFFSET) : null;
this.savedSchemaHistory = savedStatePresent ? savedState.getState().get(MYSQL_DB_HISTORY) : null;
}
@Override
public JsonNode getSavedOffset() {
return savedOffset;
}
@Override
public Optional<JsonNode> getSavedSchemaHistory() {
return Optional.ofNullable(savedSchemaHistory);
}
}

View File

@@ -0,0 +1,69 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcStateHandler;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlCdcStateHandler implements CdcStateHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcStateHandler.class);
private final StateManager stateManager;
public MySqlCdcStateHandler(StateManager stateManager) {
this.stateManager = stateManager;
}
@Override
public AirbyteMessage saveState(Map<String, String> offset, String dbHistory) {
final Map<String, Object> state = new HashMap<>();
state.put(MYSQL_CDC_OFFSET, offset);
state.put(MYSQL_DB_HISTORY, dbHistory);
final JsonNode asJson = Jsons.jsonNode(state);
LOGGER.info("debezium state: {}", asJson);
final CdcState cdcState = new CdcState().withState(asJson);
stateManager.getCdcStateManager().setCdcState(cdcState);
final AirbyteStateMessage stateMessage = stateManager.emit();
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}
}

View File

@@ -0,0 +1,109 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.integrations.debezium.internals.SnapshotMetadata;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlCdcTargetPosition implements CdcTargetPosition {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlCdcTargetPosition.class);
public final String fileName;
public final Integer position;
public MySqlCdcTargetPosition(String fileName, Integer position) {
this.fileName = fileName;
this.position = position;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MySqlCdcTargetPosition) {
MySqlCdcTargetPosition cdcTargetPosition = (MySqlCdcTargetPosition) obj;
return fileName.equals(cdcTargetPosition.fileName) && cdcTargetPosition.position.equals(position);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(fileName, position);
}
@Override
public String toString() {
return "FileName: " + fileName + ", Position : " + position;
}
public static MySqlCdcTargetPosition targetPosition(JdbcDatabase database) {
try {
List<MySqlCdcTargetPosition> masterStatus = database.resultSetQuery(
connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"),
resultSet -> {
String file = resultSet.getString("File");
int position = resultSet.getInt("Position");
if (file == null || position == 0) {
return new MySqlCdcTargetPosition(null, null);
}
return new MySqlCdcTargetPosition(file, position);
}).collect(Collectors.toList());
MySqlCdcTargetPosition targetPosition = masterStatus.get(0);
LOGGER.info("Target File position : " + targetPosition);
return targetPosition;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean reachedTargetPosition(JsonNode valueAsJson) {
String eventFileName = valueAsJson.get("source").get("file").asText();
int eventPosition = valueAsJson.get("source").get("pos").asInt();
boolean isSnapshot = SnapshotMetadata.TRUE == SnapshotMetadata.valueOf(
valueAsJson.get("source").get("snapshot").asText().toUpperCase());
if (isSnapshot || fileName.compareTo(eventFileName) > 0
|| (fileName.compareTo(eventFileName) == 0 && position >= eventPosition)) {
return false;
}
LOGGER.info("Signalling close because record's binlog file : " + eventFileName + " , position : " + eventPosition
+ " is after target file : "
+ fileName + " , target position : " + position);
return true;
}
}

View File

@@ -24,8 +24,8 @@
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.AirbyteFileOffsetBackingStore.initializeState;
import static io.airbyte.integrations.source.mysql.AirbyteSchemaHistoryStorage.initializeDBHistory;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static java.util.stream.Collectors.toList;
import com.fasterxml.jackson.databind.JsonNode;
@@ -34,38 +34,27 @@ import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.CompositeIterator;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import java.sql.JDBCType;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +65,8 @@ public class MySqlSource extends AbstractJdbcSource implements Source {
public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset";
public static final String MYSQL_DB_HISTORY = "mysql_db_history";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public MySqlSource() {
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration());
@@ -231,69 +222,12 @@ public class MySqlSource extends AbstractJdbcSource implements Source {
Instant emittedAt) {
JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
LOGGER.info("using CDC: {}", true);
// TODO: Figure out how to set the isCDC of stateManager to true. Its always false
final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager);
AirbyteSchemaHistoryStorage schemaHistoryManager = initializeDBHistory(stateManager);
FilteredFileDatabaseHistory.setDatabaseName(sourceConfig.get("database").asText());
/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE} is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(10000);
final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(sourceConfig, catalog, offsetManager, schemaHistoryManager);
publisher.start(queue);
final AirbyteDebeziumHandler handler =
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(),
catalog, true);
Optional<TargetFilePosition> targetFilePosition = TargetFilePosition
.targetFilePosition(database);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEvent<String, String>> eventIterator = new DebeziumRecordIterator(
queue,
targetFilePosition,
publisher::hasClosed,
publisher::close);
// convert to airbyte message.
final AutoCloseableIterator<AirbyteMessage> messageIterator = AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, emittedAt));
// our goal is to get the state at the time this supplier is called (i.e. after all message records
// have been produced)
final Supplier<AirbyteMessage> stateMessageSupplier = () -> {
Map<String, String> offset = offsetManager.readMap();
String dbHistory = schemaHistoryManager.read();
Map<String, Object> state = new HashMap<>();
state.put(MYSQL_CDC_OFFSET, offset);
state.put(MYSQL_DB_HISTORY, dbHistory);
final JsonNode asJson = Jsons.jsonNode(state);
LOGGER.info("debezium state: {}", asJson);
CdcState cdcState = new CdcState().withState(asJson);
stateManager.getCdcStateManager().setCdcState(cdcState);
final AirbyteStateMessage stateMessage = stateManager.emit();
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
};
// wrap the supplier in an iterator so that we can concat it to the message iterator.
final Iterator<AirbyteMessage> stateMessageIterator = MoreIterators
.singletonIteratorFromSupplier(stateMessageSupplier);
// this structure guarantees that the debezium engine will be closed, before we attempt to emit the
// state file. we want this so that we have a guarantee that the debezium offset file (which we use
// to produce the state file) is up-to-date.
final CompositeIterator<AirbyteMessage> messageIteratorWithStateDecorator = AutoCloseableIterators
.concatWithEagerClose(messageIterator,
AutoCloseableIterators.fromIterator(stateMessageIterator));
return Collections.singletonList(messageIteratorWithStateDecorator);
return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,

View File

@@ -1,75 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TargetFilePosition {
private static final Logger LOGGER = LoggerFactory.getLogger(TargetFilePosition.class);
public final String fileName;
public final Integer position;
public TargetFilePosition(String fileName, Integer position) {
this.fileName = fileName;
this.position = position;
}
@Override
public String toString() {
return "FileName: " + fileName + ", Position : " + position;
}
public static Optional<TargetFilePosition> targetFilePosition(JdbcDatabase database) {
try {
List<TargetFilePosition> masterStatus = database.resultSetQuery(
connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"),
resultSet -> {
String file = resultSet.getString("File");
int position = resultSet.getInt("Position");
if (file == null || position == 0) {
return new TargetFilePosition(null, null);
}
return new TargetFilePosition(file, position);
}).collect(Collectors.toList());
TargetFilePosition targetFilePosition = masterStatus.get(0);
LOGGER.info("Target File position : " + targetFilePosition);
if (targetFilePosition.fileName == null || targetFilePosition == null) {
return Optional.empty();
}
return Optional.of(targetFilePosition);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -24,14 +24,15 @@
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_DELETED_AT;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_UPDATED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,84 +41,33 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.debezium.CdcSourceTest;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
public class CdcMySqlSourceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CdcMySqlSourceTest.class);
private static final String MODELS_SCHEMA = "models_schema";
private static final String MODELS_STREAM_NAME = "models";
private static final Set<String> STREAM_NAMES = Sets
.newHashSet(MODELS_STREAM_NAME);
private static final String COL_ID = "id";
private static final String COL_MAKE_ID = "make_id";
private static final String COL_MODEL = "model";
private static final String DB_NAME = MODELS_SCHEMA;
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MODEL, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers
.toDefaultConfiguredCatalog(CATALOG);
// set all streams to incremental.
static {
CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
}
private static final List<JsonNode> MODEL_RECORDS = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350")));
public class CdcMySqlSourceTest extends CdcSourceTest {
private MySQLContainer<?> container;
private Database database;
@@ -125,11 +75,11 @@ public class CdcMySqlSourceTest {
private JsonNode config;
@BeforeEach
public void setup() {
public void setup() throws SQLException {
init();
revokeAllPermissions();
grantCorrectPermissions();
createAndPopulateTables();
super.setup();
}
private void init() {
@@ -148,7 +98,7 @@ public class CdcMySqlSourceTest {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", CdcMySqlSourceTest.DB_NAME)
.put("database", DB_NAME)
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("replication_method", "CDC")
@@ -160,93 +110,7 @@ public class CdcMySqlSourceTest {
}
private void grantCorrectPermissions() {
executeQuery(
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
+ container.getUsername() + "@'%';");
}
private void executeQuery(String query) {
try {
database.query(
ctx -> ctx
.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private void createAndPopulateTables() {
createAndPopulateActualTable();
createAndPopulateRandomTable();
}
private void createAndPopulateActualTable() {
executeQuery("CREATE DATABASE " + MODELS_SCHEMA + ";");
executeQuery(String
.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
for (JsonNode recordJson : MODEL_RECORDS) {
writeModelRecord(recordJson);
}
}
/**
* This database and table is not part of Airbyte sync. It is being created just to make sure the
* databases not being synced by Airbyte are not causing issues with our debezium logic
*/
private void createAndPopulateRandomTable() {
executeQuery("CREATE DATABASE " + MODELS_SCHEMA + "_random" + ";");
executeQuery(String
.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", COL_ID + "_random",
COL_MAKE_ID + "_random",
COL_MODEL + "_random", COL_ID + "_random"));
final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Fiesta-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Focus-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random",
"Ranger-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"GLA-random")),
Jsons.jsonNode(ImmutableMap
.of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"A 220-random")),
Jsons
.jsonNode(ImmutableMap
.of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random",
"E 350-random")));
for (JsonNode recordJson : MODEL_RECORDS_RANDOM) {
writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random");
}
}
private void writeModelRecord(JsonNode recordJson) {
writeRecords(recordJson, CdcMySqlSourceTest.MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID,
COL_MAKE_ID,
COL_MODEL);
}
private void writeRecords(
JsonNode recordJson,
String dbName,
String streamName,
String idCol,
String makeIdCol,
String modelCol) {
executeQuery(
String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName,
idCol, makeIdCol, modelCol,
recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(),
recordJson.get(modelCol).asText()));
executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';");
}
@AfterEach
@@ -296,30 +160,28 @@ public class CdcMySqlSourceTest {
.map(AirbyteRecordMessage::getData).collect(Collectors.toSet());
configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL));
Set<JsonNode> dataFromDebeziumSnapshot = extractRecordMessages(
AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null)))
Set<JsonNode> dataFromDebeziumSnapshot =
extractRecordMessages(AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null)))
.stream()
.map(
airbyteRecordMessage -> {
JsonNode data = airbyteRecordMessage.getData();
removeCDCColumns((ObjectNode) data);
/**
* Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref
* : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case
* TINYINT, SMALLINT -> o.put(columnName, r.getShort(i));
*/
((ObjectNode) data)
.put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt());
return data;
})
.map(airbyteRecordMessage -> {
JsonNode data = airbyteRecordMessage.getData();
removeCDCColumns((ObjectNode) data);
/**
* Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref
* : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case
* TINYINT, SMALLINT -> o.put(columnName, r.getShort(i));
*/
((ObjectNode) data)
.put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt());
return data;
})
.collect(Collectors.toSet());
assertEquals(dataFromFullRefresh, originalData);
assertEquals(dataFromFullRefresh, dataFromDebeziumSnapshot);
}
private void setupForComparisonBetweenFullRefreshAndCDCSnapshot(
ImmutableList<JsonNode> data) {
private void setupForComparisonBetweenFullRefreshAndCDCSnapshot(ImmutableList<JsonNode> data) {
executeQuery("CREATE DATABASE " + "test_schema" + ";");
executeQuery(String.format(
"CREATE TABLE %s.%s(%s INTEGER, %s Boolean, %s TINYINT(1), %s TINYINT(2), PRIMARY KEY (%s));",
@@ -342,16 +204,8 @@ public class CdcMySqlSourceTest {
((ObjectNode) config).put("database", "test_schema");
}
@Test
@DisplayName("On the first sync, produce returns records that exist in the database.")
void testExistingData() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);
@Override
protected CdcTargetPosition cdcLatestTargetPosition() {
JdbcDatabase jdbcDatabase = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
@@ -360,324 +214,44 @@ public class CdcMySqlSourceTest {
config.get("port").asInt()),
DRIVER_CLASS);
Optional<TargetFilePosition> targetFilePosition = TargetFilePosition.targetFilePosition(jdbcDatabase);
assertTrue(targetFilePosition.isPresent());
/**
* Debezium sets the binlog file name and position values for all the records fetched during
* snapshot to the latest log position fetched via query SHOW MASTER STATUS Ref :
* {@linkplain io.debezium.connector.mysql.SnapshotReader#readBinlogPosition(int, io.debezium.connector.mysql.SourceInfo, io.debezium.jdbc.JdbcConnection, java.util.concurrent.atomic.AtomicReference)}
*/
recordMessages.forEach(record -> {
assertEquals(record.getData().get(CDC_LOG_FILE).asText(),
targetFilePosition.get().fileName);
assertEquals(record.getData().get(CDC_LOG_POS).asInt(), targetFilePosition.get().position);
});
assertExpectedRecords(
new HashSet<>(MODEL_RECORDS), recordMessages);
assertExpectedStateMessages(stateMessages);
return MySqlCdcTargetPosition.targetPosition(jdbcDatabase);
}
@Test
@DisplayName("When a record is deleted, produces a deletion record.")
void testDelete() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedStateMessages(stateMessages1);
executeQuery(String
.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID,
11));
final JsonNode state = stateMessages1.get(0).getData();
final AutoCloseableIterator<AirbyteMessage> read2 = source
.read(config, CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteRecordMessage> recordMessages2 = new ArrayList<>(
extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedStateMessages(stateMessages2);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertNotNull(recordMessages2.get(0).getData().get(CDC_LOG_FILE));
assertNotNull(recordMessages2.get(0).getData().get(CDC_UPDATED_AT));
assertNotNull(recordMessages2.get(0).getData().get(CDC_DELETED_AT));
@Override
protected CdcTargetPosition extractPosition(JsonNode record) {
return new MySqlCdcTargetPosition(record.get(CDC_LOG_FILE).asText(), record.get(CDC_LOG_POS).asInt());
}
@Test
@DisplayName("When a record is updated, produces an update record.")
void testUpdate() throws Exception {
final String updatedModel = "Explorer";
final AutoCloseableIterator<AirbyteMessage> read1 = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedStateMessages(stateMessages1);
executeQuery(String
.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME,
COL_MODEL, updatedModel, COL_ID, 11));
final JsonNode state = stateMessages1.get(0).getData();
final AutoCloseableIterator<AirbyteMessage> read2 = source
.read(config, CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final List<AirbyteRecordMessage> recordMessages2 = new ArrayList<>(
extractRecordMessages(actualRecords2));
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedStateMessages(stateMessages2);
assertEquals(1, recordMessages2.size());
assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt());
assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText());
assertNotNull(recordMessages2.get(0).getData().get(CDC_LOG_FILE));
assertNotNull(recordMessages2.get(0).getData().get(CDC_UPDATED_AT));
assertTrue(recordMessages2.get(0).getData().get(CDC_DELETED_AT).isNull());
@Override
protected void assertNullCdcMetaData(JsonNode data) {
assertNull(data.get(CDC_LOG_FILE));
assertNull(data.get(CDC_LOG_POS));
assertNull(data.get(CDC_UPDATED_AT));
assertNull(data.get(CDC_DELETED_AT));
}
@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
@DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.")
void testRecordsProducedDuringAndAfterSync() throws Exception {
final int recordsToCreate = 20;
final int[] recordsCreated = {0};
// first batch of records. 20 created here and 6 created in setup method.
while (recordsCreated[0] < recordsToCreate) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated[0]));
writeModelRecord(record);
recordsCreated[0]++;
@Override
protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) {
assertNotNull(data.get(CDC_LOG_FILE));
assertNotNull(data.get(CDC_LOG_POS));
assertNotNull(data.get(CDC_UPDATED_AT));
if (deletedAtNull) {
assertTrue(data.get(CDC_DELETED_AT).isNull());
} else {
assertFalse(data.get(CDC_DELETED_AT).isNull());
}
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertExpectedStateMessages(stateAfterFirstBatch);
Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
assertEquals((MODEL_RECORDS.size() + 20), recordsFromFirstBatch.size());
// second batch of records again 20 being created
recordsCreated[0] = 0;
while (recordsCreated[0] < recordsToCreate) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated[0]));
writeModelRecord(record);
recordsCreated[0]++;
}
final JsonNode state = stateAfterFirstBatch.get(0).getData();
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = source
.read(config, CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertExpectedStateMessages(stateAfterSecondBatch);
Set<AirbyteRecordMessage> recordsFromSecondBatch = extractRecordMessages(
dataFromSecondBatch);
assertEquals(20, recordsFromSecondBatch.size(),
"Expected 20 records to be replicated in the second sync.");
// sometimes there can be more than one of these at the end of the snapshot and just before the
// first incremental.
final Set<AirbyteRecordMessage> recordsFromFirstBatchWithoutDuplicates = removeDuplicates(
recordsFromFirstBatch);
final Set<AirbyteRecordMessage> recordsFromSecondBatchWithoutDuplicates = removeDuplicates(
recordsFromSecondBatch);
final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(),
"Expected first sync to include records created while the test was running.");
assertEquals(40 + recordsCreatedBeforeTestCount,
recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates
.size());
}
private static Set<AirbyteRecordMessage> removeDuplicates(Set<AirbyteRecordMessage> messages) {
final Set<JsonNode> existingDataRecordsWithoutUpdated = new HashSet<>();
final Set<AirbyteRecordMessage> output = new HashSet<>();
for (AirbyteRecordMessage message : messages) {
ObjectNode node = message.getData().deepCopy();
node.remove("_ab_cdc_updated_at");
if (existingDataRecordsWithoutUpdated.contains(node)) {
LOGGER.info("Removing duplicate node: " + node);
} else {
output.add(message);
existingDataRecordsWithoutUpdated.add(node);
}
}
return output;
@Override
protected void removeCDCColumns(ObjectNode data) {
data.remove(CDC_LOG_FILE);
data.remove(CDC_LOG_POS);
data.remove(CDC_UPDATED_AT);
data.remove(CDC_DELETED_AT);
}
@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
void testCdcAndFullRefreshInSameSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG);
final List<JsonNode> MODEL_RECORDS_2 = ImmutableList.of(
Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")),
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));
executeQuery(String
.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
for (JsonNode recordJson : MODEL_RECORDS_2) {
writeRecords(recordJson, CdcMySqlSourceTest.MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
COL_MAKE_ID, COL_MODEL);
}
ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream()
.withStream(CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MODEL, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))));
airbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
List<ConfiguredAirbyteStream> streams = configuredCatalog.getStreams();
streams.add(airbyteStream);
configuredCatalog.withStreams(streams);
final AutoCloseableIterator<AirbyteMessage> read1 = source
.read(config, configuredCatalog, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final Set<AirbyteRecordMessage> recordMessages1 = extractRecordMessages(actualRecords1);
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
HashSet<String> names = new HashSet<>(STREAM_NAMES);
names.add(MODELS_STREAM_NAME + "_2");
assertExpectedStateMessages(stateMessages1);
assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
.collect(Collectors.toSet()),
recordMessages1,
Collections.singleton(MODELS_STREAM_NAME),
names);
final JsonNode puntoRecord = Jsons
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
writeModelRecord(puntoRecord);
final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final AutoCloseableIterator<AirbyteMessage> read2 = source
.read(config, configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedStateMessages(stateMessages2);
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord))
.collect(Collectors.toSet()),
recordMessages2,
Collections.singleton(MODELS_STREAM_NAME),
names);
}
@Test
@DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {
executeQuery(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME));
final AutoCloseableIterator<AirbyteMessage> read = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);
assertExpectedRecords(Collections.emptySet(), recordMessages);
assertExpectedStateMessages(stateMessages);
}
@Test
@DisplayName("When no changes have been made to the database since the previous sync, no records are returned.")
void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source
.read(config, CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final AutoCloseableIterator<AirbyteMessage> read2 = source
.read(config, CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
final Set<AirbyteRecordMessage> recordMessages2 = extractRecordMessages(actualRecords2);
final List<AirbyteStateMessage> stateMessages2 = extractStateMessages(actualRecords2);
assertExpectedRecords(Collections.emptySet(), recordMessages2);
assertExpectedStateMessages(stateMessages2);
}
@Test
void testCheck() {
final AirbyteConnectionStatus status = source.check(config);
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED);
}
@Test
void testDiscover() throws Exception {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);
executeQuery(String
.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200));",
MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, COL_MAKE_ID, COL_MODEL));
List<AirbyteStream> streams = expectedCatalog.getStreams();
// stream with PK
streams.get(0).setSourceDefinedCursor(true);
addCdcMetadataColumns(streams.get(0));
AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MODEL, JsonSchemaPrimitive.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcMetadataColumns(streamWithoutPK);
streams.add(streamWithoutPK);
expectedCatalog.withStreams(streams);
final AirbyteCatalog actualCatalog = source.discover(config);
assertEquals(
expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName))
.collect(Collectors.toList()),
actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName))
.collect(Collectors.toList()));
}
private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) {
@Override
protected void addCdcMetadataColumns(AirbyteStream stream) {
ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
ObjectNode properties = (ObjectNode) jsonSchema.get("properties");
@@ -687,92 +261,29 @@ public class CdcMySqlSourceTest {
properties.set(CDC_LOG_POS, numberType);
properties.set(CDC_UPDATED_AT, numberType);
properties.set(CDC_DELETED_AT, numberType);
return stream;
}
private Set<AirbyteRecordMessage> extractRecordMessages(List<AirbyteMessage> messages) {
final List<AirbyteRecordMessage> recordMessageList = messages
.stream()
.filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord)
.collect(Collectors.toList());
final Set<AirbyteRecordMessage> recordMessageSet = new HashSet<>(recordMessageList);
assertEquals(recordMessageList.size(), recordMessageSet.size(),
"Expected no duplicates in airbyte record message output for a single sync.");
return recordMessageSet;
@Override
protected Source getSource() {
return source;
}
private List<AirbyteStateMessage> extractStateMessages(List<AirbyteMessage> messages) {
return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState)
.collect(Collectors.toList());
@Override
protected JsonNode getConfig() {
return config;
}
private static void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages) {
// TODO: add assertion for boolean cdc is true
assertEquals(1, stateMessages.size());
assertNotNull(stateMessages.get(0).getData());
assertNotNull(
stateMessages.get(0).getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET));
assertNotNull(
stateMessages.get(0).getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY));
@Override
protected Database getDatabase() {
return database;
}
private static void assertExpectedRecords(Set<JsonNode> expectedRecords,
Set<AirbyteRecordMessage> actualRecords) {
// assume all streams are cdc.
assertExpectedRecords(
expectedRecords,
actualRecords,
actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet()));
}
private static void assertExpectedRecords(Set<JsonNode> expectedRecords,
Set<AirbyteRecordMessage> actualRecords,
Set<String> cdcStreams) {
assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES);
}
private static void assertExpectedRecords(Set<JsonNode> expectedRecords,
Set<AirbyteRecordMessage> actualRecords,
Set<String> cdcStreams,
Set<String> streamNames) {
final Set<JsonNode> actualData = actualRecords
.stream()
.map(recordMessage -> {
assertTrue(streamNames.contains(recordMessage.getStream()));
assertNotNull(recordMessage.getEmittedAt());
assertEquals(MODELS_SCHEMA, recordMessage.getNamespace());
final JsonNode data = recordMessage.getData();
if (cdcStreams.contains(recordMessage.getStream())) {
assertNotNull(data.get(CDC_LOG_FILE));
assertNotNull(data.get(CDC_LOG_POS));
assertNotNull(data.get(CDC_UPDATED_AT));
} else {
assertNull(data.get(CDC_LOG_FILE));
assertNull(data.get(CDC_LOG_POS));
assertNull(data.get(CDC_UPDATED_AT));
assertNull(data.get(CDC_DELETED_AT));
}
removeCDCColumns((ObjectNode) data);
return data;
})
.collect(Collectors.toSet());
assertEquals(expectedRecords, actualData);
}
private static void removeCDCColumns(ObjectNode data) {
data.remove(CDC_LOG_FILE);
data.remove(CDC_LOG_POS);
data.remove(CDC_UPDATED_AT);
data.remove(CDC_DELETED_AT);
@Override
public void assertExpectedStateMessages(List<AirbyteStateMessage> stateMessages) {
for (AirbyteStateMessage stateMessage : stateMessages) {
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET));
assertNotNull(stateMessage.getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY));
}
}
}