14652 source mysql cdc meta fields encoded incorrectly to protobuf (#67151)
## What Among other CDC_* fields, CDC_UPDATED_AT and CDC_DELETED_AT fields are sent as part of CDC snapshot and incremental records. These two are unique because their catalog definition (AirbyteSchemaType) is a String, but the values they hold are date time. Json encoding relied on the fact that all dates encode to string in json but in protobuf we encoded these values using `.setTimestampWithTimezone` which prevented destination from pulling the value out of protobuf. Destination relies on the catalog type to know how to decode messages. We need to ensure that these fields are encoded to protobuf using `.setString` instead in order to match the catalog type definition. #### Note: The CDK portion of this PR was separated to https://github.com/airbytehq/airbyte/pull/67152. Once the CDK PR is merged this PR will only include connector changes. *** Please ignore the CDK changes on this PR *** ## How Updated CDC Meta field type to support a unique type that accepts an `OffsetDateTime` but encodes the value in protobuf as a string. ## Review guide <!-- 1. `x.py` 2. `y.py` --> ## User Impact Users who previously used Mysql CDC in speed mode will need to refresh those streams. ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [x] YES 💚 - [ ] NO ❌ Resolves https://github.com/airbytehq/airbyte/issues/66735
This commit is contained in:
committed by
GitHub
parent
47927f170b
commit
79b0c93291
@@ -1,3 +1,3 @@
|
||||
testExecutionConcurrency=1
|
||||
JunitMethodExecutionTimeout=5m
|
||||
cdkVersion=0.1.28
|
||||
cdkVersion=0.1.47
|
||||
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
|
||||
dockerImageTag: 3.50.8
|
||||
dockerImageTag: 3.50.9
|
||||
dockerRepository: airbyte/source-mysql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
|
||||
githubIssueLabel: source-mysql
|
||||
|
||||
@@ -18,9 +18,7 @@ import io.airbyte.cdk.data.FloatCodec
|
||||
import io.airbyte.cdk.data.JsonCodec
|
||||
import io.airbyte.cdk.data.JsonEncoder
|
||||
import io.airbyte.cdk.data.LeafAirbyteSchemaType
|
||||
import io.airbyte.cdk.data.LongCodec
|
||||
import io.airbyte.cdk.data.NullCodec
|
||||
import io.airbyte.cdk.data.OffsetDateTimeCodec
|
||||
import io.airbyte.cdk.data.TextCodec
|
||||
import io.airbyte.cdk.discover.CommonMetaField
|
||||
import io.airbyte.cdk.discover.Field
|
||||
@@ -86,6 +84,7 @@ class MySqlSourceDebeziumOperations(
|
||||
configuration.incrementalConfiguration as CdcIncrementalConfiguration
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun deserializeRecord(
|
||||
key: DebeziumRecordKey,
|
||||
value: DebeziumRecordValue,
|
||||
@@ -148,52 +147,43 @@ class MySqlSourceDebeziumOperations(
|
||||
val transactionMillis: Long = source["ts_ms"].asLong()
|
||||
val transactionOffsetDateTime: OffsetDateTime =
|
||||
OffsetDateTime.ofInstant(Instant.ofEpochMilli(transactionMillis), ZoneOffset.UTC)
|
||||
val transactionTimestampJsonNode: JsonNode =
|
||||
OffsetDateTimeCodec.encode(transactionOffsetDateTime)
|
||||
data.set<JsonNode>(
|
||||
CommonMetaField.CDC_UPDATED_AT.id,
|
||||
transactionTimestampJsonNode,
|
||||
)
|
||||
resultRow[CommonMetaField.CDC_UPDATED_AT.id] =
|
||||
FieldValueEncoder(transactionOffsetDateTime, OffsetDateTimeCodec)
|
||||
FieldValueEncoder(
|
||||
transactionOffsetDateTime,
|
||||
CommonMetaField.CDC_UPDATED_AT.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
|
||||
data.set<JsonNode>(
|
||||
CommonMetaField.CDC_DELETED_AT.id,
|
||||
if (isDelete) transactionTimestampJsonNode else Jsons.nullNode(),
|
||||
)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
resultRow[CommonMetaField.CDC_DELETED_AT.id] =
|
||||
FieldValueEncoder(
|
||||
if (isDelete) transactionOffsetDateTime else null,
|
||||
(if (isDelete) OffsetDateTimeCodec else NullCodec) as JsonEncoder<Any>
|
||||
(if (isDelete) CommonMetaField.CDC_DELETED_AT.type.jsonEncoder else NullCodec)
|
||||
as JsonEncoder<Any>
|
||||
)
|
||||
|
||||
// Set _ab_cdc_log_file and _ab_cdc_log_pos meta-field values.
|
||||
val position = MySqlSourceCdcPosition(source["file"].asText(), source["pos"].asLong())
|
||||
data.set<JsonNode>(
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
|
||||
TextCodec.encode(position.fileName)
|
||||
)
|
||||
resultRow[MySqlSourceCdcMetaFields.CDC_LOG_FILE.id] =
|
||||
FieldValueEncoder(position.fileName, TextCodec)
|
||||
|
||||
data.set<JsonNode>(
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
|
||||
LongCodec.encode(position.position)
|
||||
)
|
||||
resultRow[MySqlSourceCdcMetaFields.CDC_LOG_FILE.id] =
|
||||
FieldValueEncoder(
|
||||
position.fileName,
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_FILE.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
|
||||
resultRow[MySqlSourceCdcMetaFields.CDC_LOG_POS.id] =
|
||||
FieldValueEncoder(position.position, LongCodec)
|
||||
FieldValueEncoder(
|
||||
position.position,
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
|
||||
// Set the _ab_cdc_cursor meta-field value.
|
||||
data.set<JsonNode>(
|
||||
MySqlSourceCdcMetaFields.CDC_CURSOR.id,
|
||||
LongCodec.encode(position.cursorValue)
|
||||
)
|
||||
resultRow[MySqlSourceCdcMetaFields.CDC_CURSOR.id] =
|
||||
FieldValueEncoder(position.cursorValue, LongCodec)
|
||||
FieldValueEncoder(
|
||||
position.cursorValue,
|
||||
MySqlSourceCdcMetaFields.CDC_CURSOR.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
|
||||
// Return a DeserializedRecord instance.
|
||||
return DeserializedRecord(resultRow, emptyMap()) // TEMP
|
||||
return DeserializedRecord(resultRow, emptyMap())
|
||||
}
|
||||
|
||||
override fun findStreamNamespace(key: DebeziumRecordKey, value: DebeziumRecordValue): String? =
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import com.mysql.cj.MysqlType
|
||||
import io.airbyte.cdk.command.OpaqueStateValue
|
||||
import io.airbyte.cdk.data.JsonEncoder
|
||||
import io.airbyte.cdk.discover.CdcIntegerMetaFieldType
|
||||
import io.airbyte.cdk.discover.CdcOffsetDateTimeMetaFieldType
|
||||
import io.airbyte.cdk.discover.CdcStringMetaFieldType
|
||||
@@ -87,34 +88,39 @@ class MySqlSourceOperations :
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS
|
||||
)
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun decorateRecordData(
|
||||
timestamp: OffsetDateTime,
|
||||
globalStateValue: OpaqueStateValue?,
|
||||
stream: Stream,
|
||||
recordData: NativeRecordPayload
|
||||
) {
|
||||
recordData.set(
|
||||
CommonMetaField.CDC_UPDATED_AT.id,
|
||||
FieldValueEncoder(timestamp, CdcOffsetDateTimeMetaFieldType.jsonEncoder)
|
||||
)
|
||||
recordData.set(
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
|
||||
FieldValueEncoder(0, CdcIntegerMetaFieldType.jsonEncoder)
|
||||
)
|
||||
recordData[CommonMetaField.CDC_UPDATED_AT.id] =
|
||||
FieldValueEncoder(
|
||||
timestamp,
|
||||
CommonMetaField.CDC_UPDATED_AT.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
recordData[MySqlSourceCdcMetaFields.CDC_LOG_POS.id] =
|
||||
FieldValueEncoder(
|
||||
0,
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
if (globalStateValue == null) {
|
||||
return
|
||||
}
|
||||
val offset: DebeziumOffset =
|
||||
MySqlSourceDebeziumOperations.deserializeStateUnvalidated(globalStateValue).offset
|
||||
val position: MySqlSourceCdcPosition = MySqlSourceDebeziumOperations.position(offset)
|
||||
recordData.set(
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_FILE.id,
|
||||
FieldValueEncoder(position.fileName, CdcStringMetaFieldType.jsonEncoder)
|
||||
)
|
||||
recordData.set(
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.id,
|
||||
FieldValueEncoder(position.position, CdcIntegerMetaFieldType.jsonEncoder)
|
||||
)
|
||||
recordData[MySqlSourceCdcMetaFields.CDC_LOG_FILE.id] =
|
||||
FieldValueEncoder(
|
||||
position.fileName,
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_FILE.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
recordData[MySqlSourceCdcMetaFields.CDC_LOG_POS.id] =
|
||||
FieldValueEncoder(
|
||||
position.position,
|
||||
MySqlSourceCdcMetaFields.CDC_LOG_POS.type.jsonEncoder as JsonEncoder<Any>
|
||||
)
|
||||
}
|
||||
|
||||
override fun decorateRecordData(
|
||||
|
||||
Reference in New Issue
Block a user