From cb9eae3681daafdc14dd9563476f488c79689d15 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Mon, 15 Apr 2024 14:49:14 -0700 Subject: [PATCH] [Source-mysql] : Add meta error handling in initial load path (#37328) --- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MySqlSourceOperations.java | 10 ++- .../initialsync/MySqlInitialLoadHandler.java | 13 +++- .../MySqlInitialLoadRecordIterator.java | 14 ++-- .../source/mysql/CdcMysqlSourceTest.java | 75 +++++++++++++++++++ docs/integrations/sources/mysql.md | 3 +- 6 files changed, 101 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 8a1857bb9bd..6cf2ddd3f27 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.18 + dockerImageTag: 3.3.19 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index e64dc892732..c716a26ffcf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -46,6 +46,7 @@ import com.mysql.cj.jdbc.result.ResultSetMetaData; import com.mysql.cj.result.Field; import io.airbyte.cdk.db.SourceOperations; import io.airbyte.cdk.db.jdbc.AbstractJdbcCompatibleSourceOperations; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.integrations.source.mysql.initialsync.CdcMetadataInjector; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.PreparedStatement; @@ -81,13 +82,14 @@ public class MySqlSourceOperations extends AbstractJdbcCompatibleSourceOperation } @Override - public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { - final ObjectNode jsonNode = (ObjectNode) super.rowToJson(queryContext); + public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException { + final AirbyteRecordData recordData = super.convertDatabaseRowToAirbyteRecordData(queryContext); + final ObjectNode jsonNode = (ObjectNode) recordData.rawRowData(); if (!metadataInjector.isPresent()) { - return jsonNode; + return recordData; } metadataInjector.get().inject(jsonNode); - return jsonNode; + return new AirbyteRecordData(jsonNode, recordData.meta()); } /** diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java index 6b03ff28128..2457bc5924f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java @@ -10,6 +10,7 @@ import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYN import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mysql.cj.MysqlType; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants; import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil; @@ -27,6 +28,7 @@ import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -110,7 +112,7 @@ public class MySqlInitialLoadHandler { } }); - final AutoCloseableIterator queryStream = + final AutoCloseableIterator queryStream = new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair, calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream)); final AutoCloseableIterator recordIterator = @@ -144,7 +146,7 @@ public class MySqlInitialLoadHandler { // Transforms the given iterator to create an {@link AirbyteRecordMessage} private AutoCloseableIterator getRecordIterator( - final AutoCloseableIterator recordIterator, + final AutoCloseableIterator recordIterator, final String streamName, final String namespace, final long emittedAt) { @@ -154,7 +156,12 @@ public class MySqlInitialLoadHandler { .withStream(streamName) .withNamespace(namespace) .withEmittedAt(emittedAt) - .withData(r))); + .withData(r.rawRowData()) + .withMeta(isMetaChangesEmptyOrNull(r.meta()) ? null : r.meta()))); + } + + private boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) { + return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty(); } // Augments the given iterator with record count logs. diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java index be4bb4d6739..7c1c600766a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java @@ -4,10 +4,10 @@ package io.airbyte.integrations.source.mysql.initialsync; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; import com.mysql.cj.MysqlType; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; +import io.airbyte.cdk.db.jdbc.AirbyteRecordData; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.commons.util.AutoCloseableIterator; @@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory; * records processed here. */ @SuppressWarnings("try") -public class MySqlInitialLoadRecordIterator extends AbstractIterator - implements AutoCloseableIterator { +public class MySqlInitialLoadRecordIterator extends AbstractIterator + implements AutoCloseableIterator { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadRecordIterator.class); @@ -54,7 +54,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator private final PrimaryKeyInfo pkInfo; private final boolean isCompositeKeyLoad; private int numSubqueries = 0; - private AutoCloseableIterator currentIterator; + private AutoCloseableIterator currentIterator; MySqlInitialLoadRecordIterator( final JdbcDatabase database, @@ -78,7 +78,7 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator @CheckForNull @Override - protected JsonNode computeNext() { + protected AirbyteRecordData computeNext() { if (shouldBuildNextSubquery()) { try { // We will only issue one query for a composite key load. If we have already processed all the data @@ -93,8 +93,8 @@ public class MySqlInitialLoadRecordIterator extends AbstractIterator } LOGGER.info("Subquery number : {}", numSubqueries); - final Stream stream = database.unsafeQuery( - this::getPkPreparedStatement, sourceOperations::rowToJson); + final Stream stream = database.unsafeQuery( + this::getPkPreparedStatement, sourceOperations::convertDatabaseRowToAirbyteRecordData); currentIterator = AutoCloseableIterators.fromStream(stream, pair); numSubqueries++; diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index e5a1a75b802..99104ed17cc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -50,6 +51,10 @@ import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.v0.AirbyteGlobalState; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change; +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -59,6 +64,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.StreamDescriptor; import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -80,6 +86,11 @@ public class CdcMysqlSourceTest extends CdcSourceTest DATE_TIME_RECORDS = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_DATE_TIME, "'2023-00-00 20:37:47'"))); + @Override protected MySQLTestDatabase createTestDatabase() { return MySQLTestDatabase.in(BaseImage.MYSQL_8, ContainerModifier.INVALID_TIMEZONE_CEST).withCdcPermissions(); @@ -734,6 +745,70 @@ public class CdcMysqlSourceTest extends CdcSourceTest streams = new ArrayList<>(); + streams.add(airbyteStream); + configuredCatalog.withStreams(streams); + + final AutoCloseableIterator read1 = source() + .read(config(), configuredCatalog, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read1); + + // Sync is expected to succeed with one record. However, the meta changes column should be populated + // for this record + // as it is an invalid date. As a result, this field will be omitted as Airbyte is unable to + // serialize the source value. + final Set recordMessages = extractRecordMessages(actualRecords); + assertEquals(recordMessages.size(), 1); + final AirbyteRecordMessage invalidDateRecord = recordMessages.stream().findFirst().get(); + + final AirbyteRecordMessageMetaChange expectedChange = + new AirbyteRecordMessageMetaChange().withReason(Reason.SOURCE_SERIALIZATION_ERROR).withChange( + Change.NULLED).withField(COL_DATE_TIME); + final AirbyteRecordMessageMeta expectedMessageMeta = new AirbyteRecordMessageMeta().withChanges(List.of(expectedChange)); + assertEquals(expectedMessageMeta, invalidDateRecord.getMeta()); + + ObjectMapper mapper = new ObjectMapper(); + final JsonNode expectedDataWithoutCdcFields = mapper.readTree("{\"id\":120}"); + removeCDCColumns((ObjectNode) invalidDateRecord.getData()); + assertEquals(expectedDataWithoutCdcFields, invalidDateRecord.getData()); + } + private void createTablesToIncreaseSchemaHistorySize() { for (int i = 0; i <= 200; i++) { final String tableName = generateRandomStringOf32Characters(); diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index ce2209e8295..c3343beab04 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.18 | 2024-04-15 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Refactor source operations. | +| 3.3.19 | 2024-04-15 | [37328](https://github.com/airbytehq/airbyte/pull/37328) | Populate airbyte_meta.changes | +| 3.3.18 | 2024-04-15 | [37324](https://github.com/airbytehq/airbyte/pull/37324) | Refactor source operations. | | 3.3.17 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. | | 3.3.16 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 3.3.15 | 2024-04-05 | [36577](https://github.com/airbytehq/airbyte/pull/36577) | Config error will not send out system trace message |