1
0
mirror of synced 2025-12-25 02:09:19 -05:00

refactor: separate validation result handling from coercer (#69113)

This commit is contained in:
Jonathan Pearlin
2025-11-05 09:18:59 -05:00
committed by GitHub
parent 81e06622a6
commit dc83e41e77
19 changed files with 542 additions and 63 deletions

View File

@@ -1,3 +1,10 @@
## Version 0.1.66
**Load CDK**
* Added: Support for reporting of additional stats in destination state messages.
* Changed: Refactor coercer interface to separate out coercion and validation.
## Version 0.1.65
extract cdk: fix bug when getting table metadata that cause timeout

View File

@@ -77,7 +77,7 @@ dependencies {
api("com.fasterxml.jackson.core:jackson-databind")
api("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
api("com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39")
api("io.airbyte.airbyte-protocol:protocol-models:0.18.0") {
api("io.airbyte.airbyte-protocol:protocol-models:0.19.0") {
exclude(group="com.google.guava", module="guava")
exclude(group="com.google.api-client")
exclude(group="org.apache.logging.log4j")

View File

@@ -199,6 +199,8 @@ class EnrichedAirbyteValue(
* Creates a nullified version of this value with the specified reason.
*
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
*
* @deprecated Use the new data flow pipeline instead
*/
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
val nullChange = Meta.Change(field = name, change = Change.NULLED, reason = reason)
@@ -212,6 +214,8 @@ class EnrichedAirbyteValue(
*
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
* @param newValue The new (truncated) value to use
*
* @deprecated Use the new data flow pipeline instead
*/
fun truncate(
newValue: AirbyteValue,

View File

@@ -5,8 +5,10 @@
package io.airbyte.cdk.load.dataflow.state.stats
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.dataflow.state.StateKey
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.load.message.GlobalSnapshotCheckpoint
@@ -18,6 +20,7 @@ import jakarta.inject.Singleton
class StateStatsEnricher(
private val statsStore: CommittedStatsStore,
private val namespaceMapper: NamespaceMapper,
private val metricTracker: MetricTracker,
) {
// Enriches provided state message with stats associated with the given state key.
fun enrich(msg: CheckpointMessage, key: StateKey): CheckpointMessage {
@@ -30,16 +33,34 @@ class StateStatsEnricher(
@VisibleForTesting
@Suppress("UNUSED_PARAMETER")
fun enrichTopLevelDestinationStats(msg: CheckpointMessage, count: Long): CheckpointMessage {
fun enrichTopLevelDestinationStats(
msg: CheckpointMessage,
desc: DestinationStream.Descriptor,
count: Long
): CheckpointMessage {
// TODO: set this using the count above once we get to total rejected
// records.
msg.updateStats(
destinationStats = msg.sourceStats,
additionalStats = metricTracker.drain(desc)
)
return msg
}
@VisibleForTesting
@Suppress("UNUSED_PARAMETER")
fun enrichTopLevelDestinationStatsGlobalState(
msg: CheckpointMessage,
count: Long
): CheckpointMessage {
// TODO: set this using the count above once we get to total rejected
// records.
msg.updateStats(destinationStats = msg.sourceStats)
return msg
}
@VisibleForTesting
fun enrichTopLevelStats(msg: CheckpointMessage, stats: EmissionStats): CheckpointMessage {
msg.updateStats(
@@ -62,7 +83,7 @@ class StateStatsEnricher(
)
val (committed, cumulative) = statsStore.commitStats(desc, key)
enrichTopLevelDestinationStats(msg, committed.count)
enrichTopLevelDestinationStats(msg, desc, committed.count)
enrichTopLevelStats(msg, cumulative)
return msg
@@ -88,7 +109,7 @@ class StateStatsEnricher(
}
.fold(CommitStatsResult()) { acc, c -> acc.merge(c) }
enrichTopLevelDestinationStats(msg, committed.count)
enrichTopLevelDestinationStatsGlobalState(msg, committed.count)
enrichTopLevelStats(msg, cumulative)
return msg

View File

@@ -0,0 +1,106 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.dataflow.stats
import io.airbyte.cdk.load.command.DestinationStream
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
/**
* A thread-safe utility class designed to track and manage metrics for different destination
* streams. Metrics are categorized by stream descriptors and identified by metric names.
*
* The class supports adding numeric metric values, retrieving metrics for specific streams, and
* draining (retrieving and clearing) metrics for a stream. If metric values are missing for
* predefined metric names, default values of 0.0 are used.
*/
@Singleton
class MetricTracker {
private val metrics: MutableMap<DestinationStream.Descriptor, MutableMap<String, Double>> =
ConcurrentHashMap()
private val lock: Any = Any()
/**
* Adds a metric value to the specified stream descriptor and metric. If the metric does not
* already exist for the given stream, it will be initialized with the given value. Later calls
* will update the existing value by adding the provided value.
*
* @param stream the stream descriptor used to identify the metrics data.
* @param metric the metric name and related metadata to be added or updated.
* @param value the numeric value to add to the specified metric for the given stream.
*/
fun add(stream: DestinationStream.Descriptor, metric: ObservabilityMetrics, value: Double) {
synchronized(lock) {
metrics.putIfAbsent(stream, ConcurrentHashMap())
val streamMetrics = metrics[stream]!!
streamMetrics[metric.metricName] =
streamMetrics.getOrDefault(metric.metricName, 0.0) + value
}
}
/**
* Retrieves the metrics associated with the specified stream descriptor.
*
* @param stream the stream descriptor used to identify the metrics to retrieve.
* @return a map containing the metrics data, where keys represent metric names
* ```
* and values are their corresponding numeric values. If no metrics are
* found for the provided stream, an empty map is returned.
* ```
*/
fun get(stream: DestinationStream.Descriptor): Map<String, Double> =
synchronized(lock) { metrics[stream]?.toMap() ?: mutableMapOf() }
/**
* Drains and returns the current metrics data for the specified stream descriptor. After
* retrieval, the metrics for the provided stream are cleared.
*
* @param stream the stream descriptor associated with the metrics to be drained.
* @return a map containing the drained metrics data, where keys are metric names and values are
* their respective numeric values.
*/
fun drain(stream: DestinationStream.Descriptor): Map<String, Double> {
synchronized(lock) {
// Ensure that all metrics are present even if not explicitly set.
val copy = addDefaultValues(get(stream))
metrics[stream]?.clear()
return copy
}
}
/**
* Adds default values for any missing metrics in the provided map. If a metric defined in the
* [ObservabilityMetrics] enum is absent in the given map, it will be initialized with a default
* value of 0.0.
*
* @param metrics the map of metrics to be updated with default values. The keys
* ```
* are metric names and the values are their respective numeric values.
* @return
* ```
* a new mutable map containing the updated metrics with default values for
* ```
* any missing entries.
* ```
*/
private fun addDefaultValues(metrics: Map<String, Double>): Map<String, Double> {
val copy = metrics.toMutableMap()
ObservabilityMetrics.entries.forEach { copy.putIfAbsent(it.metricName, 0.0) }
return copy
}
}
/**
* Enum representing the available observability metrics. Each metric is associated with a specific
* metric name used for tracking system behavior and performance.
*
* @property metricName The name of the metric used in tracking.
*/
enum class ObservabilityMetrics(val metricName: String) {
NULLED_VALUE_COUNT("nulledValueCount"),
TRUNCATED_VALUE_COUNT("truncatedValueCount")
}

View File

@@ -7,6 +7,7 @@ package io.airbyte.cdk.load.dataflow.transform
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
/**
* Interface for destination-specific field coercion and type representation.
@@ -71,30 +72,43 @@ interface ValueCoercer {
* nullify values that fail validation.
*
* @param value The enriched Airbyte value to validate
* @return The validated EnrichedAirbyteValue, potentially nullified if validation fails
* @return The [ValidationResult] indicating whether the value is valid or not, and if so, why
*
* @example
* ```kotlin
* override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue {
* override fun validate(value: EnrichedAirbyteValue): ValidationResult =
* when (val abValue = value.abValue) {
* is StringValue -> {
* if (abValue.value.length > MAX_STRING_LENGTH) {
* value.nullify(
* AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
* )
* ShouldNullify(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION)
* } else {
* Valid
* }
* }
* is IntegerValue -> {
* if (abValue.value > MAX_INTEGER) {
* value.nullify(
* AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
* )
* ShouldNullify(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION)
* } else {
* Valid
* }
* }
* }
* return value
* }
* ```
*/
fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue
fun validate(value: EnrichedAirbyteValue): ValidationResult
}
/** Result of a value validation check via the [ValueCoercer.validate] method. */
sealed interface ValidationResult {
/** Value is valid, no action needed */
data object Valid : ValidationResult
/** Value should be nullified with the given reason */
data class ShouldNullify(val reason: AirbyteRecordMessageMetaChange.Reason) : ValidationResult
/** Value should be replaced with the new, truncated value and reason */
data class ShouldTruncate(
val truncatedValue: AirbyteValue,
val reason: AirbyteRecordMessageMetaChange.Reason
) : ValidationResult
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.dataflow.transform.data
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
import io.airbyte.cdk.load.dataflow.stats.ObservabilityMetrics
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import jakarta.inject.Singleton
@Singleton
class ValidationResultHandler(private val metricTracker: MetricTracker) {
/**
* Processes an `EnrichedAirbyteValue` based on its corresponding `ValidationResult`. The method
* handles three cases:
* - If the result requires truncation, the value is truncated using the `truncate` method.
* - If the result requires nullification, the value is nullified using the `nullify` method.
* - If the result is valid, the original value is returned.
*
* @param stream The descriptor of the destination stream where the value belongs.
* @param result The validation result indicating how the value should be handled.
* @param value The enriched Airbyte value to process based on the validation result.
*/
fun handle(
stream: DestinationStream.Descriptor,
result: ValidationResult,
value: EnrichedAirbyteValue
) =
when (result) {
is ValidationResult.ShouldTruncate ->
truncate(
stream = stream,
value = value,
truncatedValue = result.truncatedValue,
reason = result.reason
)
is ValidationResult.ShouldNullify ->
nullify(stream = stream, value = value, reason = result.reason)
is ValidationResult.Valid -> value
}
/**
* Creates a nullified version of this value with the specified reason.
*
* @param stream The [DestinationStream.Descriptor] for the stream that this nulled value
* belongs to.
* @param value The [EnrichedAirbyteValue] to nullify
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
*
* @return The nullified [EnrichedAirbyteValue].
*/
fun nullify(
stream: DestinationStream.Descriptor,
value: EnrichedAirbyteValue,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
): EnrichedAirbyteValue {
val nullChange = Meta.Change(field = value.name, change = Change.NULLED, reason = reason)
value.abValue = NullValue
value.changes.add(nullChange)
metricTracker.add(stream, ObservabilityMetrics.NULLED_VALUE_COUNT, 1.0)
return value
}
/**
* Creates a truncated version of this value with the specified reason and new value.
*
* @param stream The [DestinationStream.Descriptor] for the stream that this truncated value
* belongs to.
* @param value The original [EnrichedAirbyteValue] that is to be truncated
* @param truncatedValue The new, truncated value to use
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
*
* @return The truncated [EnrichedAirbyteValue].
*/
fun truncate(
stream: DestinationStream.Descriptor,
value: EnrichedAirbyteValue,
truncatedValue: AirbyteValue,
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION
): EnrichedAirbyteValue {
val truncateChange =
Meta.Change(field = value.name, change = Change.TRUNCATED, reason = reason)
value.abValue = truncatedValue
value.changes.add(truncateChange)
metricTracker.add(stream, ObservabilityMetrics.TRUNCATED_VALUE_COUNT, 1.0)
return value
}
}

View File

@@ -5,6 +5,7 @@
package io.airbyte.cdk.load.dataflow.transform.defaults
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
@@ -18,5 +19,5 @@ import jakarta.inject.Singleton
class NoOpCoercer : ValueCoercer {
override fun map(value: EnrichedAirbyteValue): EnrichedAirbyteValue = value
override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue = value
override fun validate(value: EnrichedAirbyteValue): ValidationResult = ValidationResult.Valid
}

View File

@@ -7,6 +7,7 @@ package io.airbyte.cdk.load.dataflow.transform.medium
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.dataflow.transform.ColumnNameMapper
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
import io.airbyte.cdk.load.message.DestinationRecordRaw
import jakarta.inject.Singleton
@@ -14,6 +15,7 @@ import jakarta.inject.Singleton
class JsonConverter(
private val columnNameMapper: ColumnNameMapper,
private val coercer: ValueCoercer,
private val validationResultHandler: ValidationResultHandler,
) {
fun convert(msg: DestinationRecordRaw): HashMap<String, AirbyteValue> {
val enriched =
@@ -23,9 +25,18 @@ class JsonConverter(
enriched.declaredFields.forEach { field ->
val mappedKey =
columnNameMapper.getMappedColumnName(msg.stream, field.key)
?: field.key // fallback to original key
?: field.key // fallback to the original key
val mappedValue = field.value.let { coercer.map(it) }.let { coercer.validate(it) }
val mappedValue =
field.value
.let { coercer.map(it) }
.let { value ->
validationResultHandler.handle(
stream = msg.stream.mappedDescriptor,
result = coercer.validate(value),
value = value
)
}
munged[mappedKey] = mappedValue.abValue
}

View File

@@ -40,6 +40,7 @@ import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.data.json.toAirbyteValue
import io.airbyte.cdk.load.dataflow.transform.ColumnNameMapper
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
import io.airbyte.cdk.load.dataflow.transform.defaults.NoOpColumnNameMapper
import io.airbyte.cdk.load.message.DestinationRecordProtobufSource
import io.airbyte.cdk.load.message.DestinationRecordRaw
@@ -67,6 +68,7 @@ import javax.inject.Singleton
class ProtobufConverter(
private val columnNameMapper: ColumnNameMapper,
private val coercer: ValueCoercer,
private val validationResultHandler: ValidationResultHandler,
) {
private val isNoOpMapper = columnNameMapper is NoOpColumnNameMapper
@@ -132,7 +134,12 @@ class ProtobufConverter(
enrichedValue.abValue = airbyteValue
val mappedValue = coercer.map(enrichedValue)
val validatedValue = coercer.validate(mappedValue)
val validatedValue =
validationResultHandler.handle(
stream = stream.mappedDescriptor,
result = coercer.validate(mappedValue),
value = mappedValue
)
allParsingFailures.addAll(validatedValue.changes)

View File

@@ -31,6 +31,7 @@ import io.airbyte.cdk.load.message.Meta.Companion.getEmittedAtMs
import io.airbyte.cdk.load.state.CheckpointId
import io.airbyte.cdk.load.state.CheckpointKey
import io.airbyte.cdk.load.util.deserializeToNode
import io.airbyte.protocol.models.v0.AdditionalStats
import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
@@ -478,6 +479,7 @@ sealed interface CheckpointMessage : DestinationMessage {
data class Stats(
val recordCount: Long,
val rejectedRecordCount: Long = 0, // TODO should not have a default?
val additionalStats: Map<String, Double> = emptyMap(),
)
data class Checkpoint(
@@ -532,12 +534,14 @@ sealed interface CheckpointMessage : DestinationMessage {
val totalRecords: Long?
val totalBytes: Long?
val totalRejectedRecords: Long?
val additionalStats: MutableMap<String, Double>
fun updateStats(
destinationStats: Stats? = null,
totalRecords: Long? = null,
totalBytes: Long? = null,
totalRejectedRecords: Long? = null,
additionalStats: Map<String, Double> = emptyMap()
)
fun withDestinationStats(stats: Stats): CheckpointMessage
@@ -551,6 +555,7 @@ sealed interface CheckpointMessage : DestinationMessage {
}
}
}
val additionalStatsToAdd = this.additionalStats
destinationStats?.let {
message.destinationStats =
AirbyteStateStats().apply {
@@ -558,6 +563,15 @@ sealed interface CheckpointMessage : DestinationMessage {
if (it.rejectedRecordCount > 0) {
withRejectedRecordCount(it.rejectedRecordCount.toDouble())
}
if (additionalStatsToAdd.isNotEmpty()) {
withAdditionalStats(
AdditionalStats().apply {
additionalStatsToAdd.forEach {
additionalProperties[it.key] = it.value
}
}
)
}
}
}
additionalProperties.forEach { (key, value) -> message.withAdditionalProperty(key, value) }
@@ -590,6 +604,7 @@ data class StreamCheckpoint(
override var totalRecords: Long? = null,
override var totalBytes: Long? = null,
override var totalRejectedRecords: Long? = null,
override var additionalStats: MutableMap<String, Double> = mutableMapOf(),
) : CheckpointMessage {
/** Convenience constructor, intended for use in tests. */
constructor(
@@ -601,7 +616,8 @@ data class StreamCheckpoint(
destinationRecordCount: Long? = null,
checkpointKey: CheckpointKey? = null,
totalRecords: Long? = null,
totalBytes: Long? = null
totalBytes: Long? = null,
additionalStats: MutableMap<String, Double> = mutableMapOf(),
) : this(
Checkpoint(
unmappedNamespace = unmappedNamespace,
@@ -609,12 +625,13 @@ data class StreamCheckpoint(
state = blob.deserializeToNode(),
),
Stats(sourceRecordCount),
destinationRecordCount?.let { Stats(it) },
destinationRecordCount?.let { Stats(recordCount = it, additionalStats = additionalStats) },
additionalProperties,
serializedSizeBytes = 0L,
checkpointKey = checkpointKey,
totalRecords = totalRecords,
totalBytes = totalBytes
totalBytes = totalBytes,
additionalStats = additionalStats,
)
override val checkpoints: List<Checkpoint>
@@ -624,14 +641,17 @@ data class StreamCheckpoint(
destinationStats: Stats?,
totalRecords: Long?,
totalBytes: Long?,
totalRejectedRecords: Long?
totalRejectedRecords: Long?,
additionalStats: Map<String, Double>
) {
destinationStats?.let { this.destinationStats = it }
totalRecords?.let { this.totalRecords = it }
totalBytes?.let { this.totalBytes = it }
totalRejectedRecords?.let { this.totalRejectedRecords = it }
this.additionalStats.putAll(additionalStats)
}
override fun withDestinationStats(stats: Stats) = copy(destinationStats = stats)
override fun withDestinationStats(stats: Stats) =
copy(destinationStats = stats, additionalStats = additionalStats)
override fun asProtocolMessage(): AirbyteMessage {
val stateMessage =
@@ -656,6 +676,7 @@ data class GlobalCheckpoint(
override var totalRecords: Long? = null,
override var totalBytes: Long? = null,
override var totalRejectedRecords: Long? = null,
override var additionalStats: MutableMap<String, Double> = mutableMapOf(),
) : CheckpointMessage {
/** Convenience constructor, primarily intended for use in tests. */
constructor(
@@ -672,12 +693,14 @@ data class GlobalCheckpoint(
destinationStats: Stats?,
totalRecords: Long?,
totalBytes: Long?,
totalRejectedRecords: Long?
totalRejectedRecords: Long?,
additionalStats: Map<String, Double>
) {
destinationStats?.let { this.destinationStats = it }
totalRecords?.let { this.totalRecords = it }
totalBytes?.let { this.totalBytes = it }
totalRejectedRecords?.let { this.totalRejectedRecords = it }
this.additionalStats.putAll(additionalStats)
}
override fun withDestinationStats(stats: Stats) = copy(destinationStats = stats)
@@ -708,6 +731,7 @@ data class GlobalSnapshotCheckpoint(
override var totalRecords: Long? = null,
override var totalBytes: Long? = null,
override var totalRejectedRecords: Long? = null,
override var additionalStats: MutableMap<String, Double> = mutableMapOf(),
val streamCheckpoints: Map<DestinationStream.Descriptor, CheckpointKey>
) : CheckpointMessage {
@@ -721,12 +745,14 @@ data class GlobalSnapshotCheckpoint(
destinationStats: Stats?,
totalRecords: Long?,
totalBytes: Long?,
totalRejectedRecords: Long?
totalRejectedRecords: Long?,
additionalStats: Map<String, Double>
) {
destinationStats?.let { this.destinationStats = it }
totalRecords?.let { this.totalRecords = it }
totalBytes?.let { this.totalBytes = it }
totalRejectedRecords?.let { this.totalRejectedRecords = it }
this.additionalStats.putAll(additionalStats)
}
override fun withDestinationStats(stats: Stats) = copy(destinationStats = stats)

View File

@@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.dataflow.state.PartitionKey
import io.airbyte.cdk.load.dataflow.state.StateKey
import io.airbyte.cdk.load.dataflow.stats.MetricTracker
import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.load.message.GlobalSnapshotCheckpoint
@@ -29,11 +30,14 @@ class StateStatsEnricherTest {
@MockK private lateinit var namespaceMapper: NamespaceMapper
@MockK private lateinit var metricTracker: MetricTracker
private lateinit var stateStatsEnricher: StateStatsEnricher
@BeforeEach
fun setUp() {
stateStatsEnricher = StateStatsEnricher(statsStore, namespaceMapper)
every { metricTracker.drain(any()) } returns emptyMap()
stateStatsEnricher = StateStatsEnricher(statsStore, namespaceMapper, metricTracker)
}
@Test
@@ -174,11 +178,12 @@ class StateStatsEnricherTest {
@Test
fun `#enrichTopLevelDestinationStats`() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val checkpoint = mockk<CheckpointMessage>(relaxed = true)
val sourceStats = CheckpointMessage.Stats(recordCount = 100)
every { checkpoint.sourceStats } returns sourceStats
val result = stateStatsEnricher.enrichTopLevelDestinationStats(checkpoint, 50L)
val result = stateStatsEnricher.enrichTopLevelDestinationStats(checkpoint, stream, 50L)
assertEquals(checkpoint, result)
verify { checkpoint.updateStats(destinationStats = sourceStats) }

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.dataflow.stats
import io.airbyte.cdk.load.command.DestinationStream
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
internal class MetricTrackerTest {
@Test
fun testAddingMetricValue() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val metric = ObservabilityMetrics.NULLED_VALUE_COUNT
val metricTracker = MetricTracker()
metricTracker.add(stream, metric, 1.0)
assertEquals(1.0, metricTracker.get(stream)[metric.metricName])
metricTracker.add(stream, metric, 2.0)
assertEquals(3.0, metricTracker.get(stream)[metric.metricName])
metricTracker.add(stream, ObservabilityMetrics.TRUNCATED_VALUE_COUNT, 5.0)
assertEquals(3.0, metricTracker.get(stream)[metric.metricName])
assertEquals(
5.0,
metricTracker.get(stream)[ObservabilityMetrics.TRUNCATED_VALUE_COUNT.metricName]
)
}
@Test
fun testDrainMetricValues() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val metric = ObservabilityMetrics.NULLED_VALUE_COUNT
val metricTracker = MetricTracker()
metricTracker.add(stream, metric, 1.0)
metricTracker.add(stream, metric, 2.0)
metricTracker.add(stream, ObservabilityMetrics.TRUNCATED_VALUE_COUNT, 5.0)
val metrics = metricTracker.drain(stream)
assertEquals(2, metrics.size)
assertEquals(3.0, metrics[metric.metricName])
assertEquals(5.0, metrics[ObservabilityMetrics.TRUNCATED_VALUE_COUNT.metricName])
// Validate that the underlying map has been cleared
assertEquals(0, metricTracker.get(stream).size)
}
@Test
fun testDrainAllDefaultValues() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val metricTracker = MetricTracker()
val metrics = metricTracker.drain(stream)
assertEquals(ObservabilityMetrics.entries.size, metrics.size)
ObservabilityMetrics.entries.forEach { metric ->
assertEquals(0.0, metrics[metric.metricName])
}
}
}

View File

@@ -15,6 +15,7 @@ import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
import io.airbyte.cdk.load.dataflow.transform.medium.JsonConverter
import io.airbyte.cdk.load.dataflow.transform.medium.ProtobufConverter
import io.airbyte.cdk.load.message.DestinationRecordRaw
@@ -37,13 +38,16 @@ class JsonRecordMungerTest {
@MockK lateinit var protobufConverter: ProtobufConverter
private lateinit var validationResultHandler: ValidationResultHandler
private lateinit var jsonConverter: JsonConverter
private lateinit var munger: RecordMunger
@BeforeEach
fun setup() {
jsonConverter = JsonConverter(columnNameMapper, valueCoercer)
validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
jsonConverter = JsonConverter(columnNameMapper, valueCoercer, validationResultHandler)
munger = RecordMunger(jsonConverter, protobufConverter)
}
@@ -55,7 +59,7 @@ class JsonRecordMungerTest {
secondArg<String>() + "_munged"
}
every { valueCoercer.validate(any<EnrichedAirbyteValue>()) } answers { firstArg() }
every { valueCoercer.validate(any<EnrichedAirbyteValue>()) } returns ValidationResult.Valid
val stringfiedValue =
Fixtures.mockCoercedValue(StringValue("{ \"json\": \"stringified\" }"))

View File

@@ -23,6 +23,7 @@ import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
import io.airbyte.cdk.load.dataflow.transform.medium.JsonConverter
import io.airbyte.cdk.load.dataflow.transform.medium.ProtobufConverter
import io.airbyte.cdk.load.message.DestinationRecordProtobufSource
@@ -65,6 +66,7 @@ class ProtobufRecordMungerTest {
private lateinit var stream: DestinationStream
private lateinit var columnNameMapper: ColumnNameMapper
private lateinit var valueCoercer: ValueCoercer
private lateinit var validationResultHandler: ValidationResultHandler
private var protoSource: DestinationRecordProtobufSource? = null
private lateinit var record: DestinationRecordRaw
private lateinit var fieldAccessors: Array<AirbyteValueProxy.FieldAccessor>
@@ -90,22 +92,25 @@ class ProtobufRecordMungerTest {
return value
}
override fun validate(value: EnrichedAirbyteValue): EnrichedAirbyteValue {
override fun validate(value: EnrichedAirbyteValue): ValidationResult =
when (val abValue = value.abValue) {
is IntegerValue ->
if (abValue.value < INT64_MIN || abValue.value > INT64_MAX) {
value.nullify(
if (abValue.value !in INT64_MIN..INT64_MAX) {
ValidationResult.ShouldNullify(
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_FIELD_SIZE_LIMITATION,
)
} else {
ValidationResult.Valid
}
else -> {}
else -> {
ValidationResult.Valid
}
}
return value
}
}
validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val fields =
mutableListOf(
field("bool_col", BooleanType, 0),
@@ -275,8 +280,8 @@ class ProtobufRecordMungerTest {
munger =
RecordMunger(
JsonConverter(columnNameMapper, valueCoercer),
ProtobufConverter(columnNameMapper, valueCoercer),
JsonConverter(columnNameMapper, valueCoercer, validationResultHandler),
ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler),
)
}

View File

@@ -0,0 +1,101 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.load.dataflow.transform.data
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import io.mockk.mockk
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
internal class ValidationResultHandlerTest {
@Test
fun testHandleValidationResult() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val abValue = StringValue("test value")
val enrichedValue =
EnrichedAirbyteValue(
abValue = abValue,
type = StringType,
name = "testField",
changes = mutableListOf(),
airbyteMetaField = null,
)
val handler = ValidationResultHandler(mockk(relaxed = true))
assertEquals(enrichedValue, handler.handle(stream, ValidationResult.Valid, enrichedValue))
assertEquals(
NullValue,
handler
.handle(
stream,
ValidationResult.ShouldNullify(Reason.DESTINATION_SERIALIZATION_ERROR),
enrichedValue
)
.abValue
)
val truncatedValue = StringValue("This is...")
assertEquals(
truncatedValue,
handler
.handle(
stream,
ValidationResult.ShouldTruncate(
truncatedValue,
Reason.DESTINATION_FIELD_SIZE_LIMITATION
),
enrichedValue
)
.abValue
)
}
@Test
fun testNullify() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val abValue = StringValue("test value")
val enrichedValue =
EnrichedAirbyteValue(
abValue = abValue,
type = StringType,
name = "testField",
changes = mutableListOf(),
airbyteMetaField = null,
)
val handler = ValidationResultHandler(mockk(relaxed = true))
val nullifiedValue = handler.nullify(stream, enrichedValue)
assertEquals(NullValue, nullifiedValue.abValue)
assertEquals(1, nullifiedValue.changes.size)
assertEquals(Change.NULLED, nullifiedValue.changes.first().change)
}
@Test
fun testTruncate() {
val stream = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val abValue = StringValue("This is a very long string that needs truncation")
val enrichedValue =
EnrichedAirbyteValue(
abValue = abValue,
type = StringType,
name = "testField",
changes = mutableListOf(),
airbyteMetaField = null,
)
val newValue = StringValue("This is a...")
val handler = ValidationResultHandler(mockk(relaxed = true))
val truncatedValue = handler.truncate(stream, enrichedValue, newValue)
assertEquals(newValue, truncatedValue.abValue)
assertEquals(1, truncatedValue.changes.size)
assertEquals(Change.TRUNCATED, truncatedValue.changes.first().change)
}
}

View File

@@ -9,7 +9,9 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.AirbyteValueProxy.FieldAccessor
import io.airbyte.cdk.load.dataflow.transform.ColumnNameMapper
import io.airbyte.cdk.load.dataflow.transform.ValidationResult
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
import io.airbyte.cdk.load.message.DestinationRecordProtobufSource
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.Meta
@@ -43,7 +45,7 @@ class ProtobufConverterTest {
mockk<ValueCoercer> {
every { representAs(any()) } returns null
every { map(any()) } answers { firstArg<EnrichedAirbyteValue>() }
every { validate(any()) } answers { firstArg<EnrichedAirbyteValue>() }
every { validate(any()) } returns ValidationResult.Valid
}
private fun createMockMapperPassThrough(): ColumnNameMapper =
@@ -152,7 +154,8 @@ class ProtobufConverterTest {
fun `convertWithMetadata processes basic types correctly`() {
val valueCoercer = createMockCoercerPassThrough()
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors =
arrayOf(
@@ -265,7 +268,8 @@ class ProtobufConverterTest {
fun `convertWithMetadata handles BigDecimal values correctly`() {
val valueCoercer = createMockCoercerPassThrough()
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors =
arrayOf(
@@ -308,7 +312,8 @@ class ProtobufConverterTest {
fun `convertWithMetadata handles null values`() {
val valueCoercer = createMockCoercerPassThrough()
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("null_field", StringType, 0))
@@ -331,10 +336,11 @@ class ProtobufConverterTest {
StringValue::class.java
every { representAs(not(ofType(TimeTypeWithoutTimezone::class))) } returns null
every { map(any()) } answers { firstArg<EnrichedAirbyteValue>() }
every { validate(any()) } answers { firstArg<EnrichedAirbyteValue>() }
every { validate(any()) } returns ValidationResult.Valid
}
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("time_field", TimeTypeWithoutTimezone, 0))
val protoValues = listOf(vTimeNoTz(LocalTime.parse("12:34:56")))
@@ -370,21 +376,18 @@ class ProtobufConverterTest {
enriched.abValue is StringValue &&
(enriched.abValue as StringValue).value.length > 5
) {
enriched.abValue = NullValue
enriched.changes.add(
Meta.Change(
enriched.name,
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_FIELD_SIZE_LIMITATION
)
ValidationResult.ShouldNullify(
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_FIELD_SIZE_LIMITATION
)
} else {
ValidationResult.Valid
}
enriched
}
}
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("short_string", StringType, 0), fa("long_string", StringType, 1))
val protoValues = listOf(vString("hello"), vString("this_is_too_long"))
@@ -411,7 +414,8 @@ class ProtobufConverterTest {
columnName: String
): String = if (columnName == "original_name") "mapped_name" else columnName
}
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("original_name", StringType, 0))
val protoValues = listOf(vString("test"))
@@ -430,7 +434,8 @@ class ProtobufConverterTest {
fun `convertWithMetadata handles parsing exceptions`() {
val valueCoercer = createMockCoercerPassThrough()
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("invalid_int", IntegerType, 0))
@@ -455,7 +460,8 @@ class ProtobufConverterTest {
fun `convertWithMetadata merges meta changes from source + stream unknown changes + parsing failures`() {
val valueCoercer = createMockCoercerPassThrough()
val columnNameMapper = createMockMapperPassThrough()
val converter = ProtobufConverter(columnNameMapper, valueCoercer)
val validationResultHandler = ValidationResultHandler(mockk(relaxed = true))
val converter = ProtobufConverter(columnNameMapper, valueCoercer, validationResultHandler)
val accessors = arrayOf(fa("ok_str", StringType, 0), fa("bad_int", IntegerType, 1))

View File

@@ -52,7 +52,7 @@ import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.CsvSource
import org.junit.jupiter.params.provider.MethodSource
class DestinationMessageTest {
internal class DestinationMessageTest {
private val uuidGenerator = UUIDGenerator()
private fun factory(
@@ -131,7 +131,7 @@ class DestinationMessageTest {
@MethodSource("roundTrippableMessages")
fun testRoundTripRecord(message: AirbyteMessage) {
val roundTripped = convert(factory(false), message).asProtocolMessage()
Assertions.assertEquals(message, roundTripped)
assertEquals(message, roundTripped)
}
@ParameterizedTest
@@ -163,7 +163,7 @@ class DestinationMessageTest {
val parsedMessage = convert(factory(false), inputMessage) as StreamCheckpoint
Assertions.assertEquals(
assertEquals(
// we represent the state message ID as a long, but jackson sees that 1234 can be Int,
// and Int(1234) != Long(1234). (and additionalProperties is just a Map<String, Any?>)
// So we just compare the serialized protocol messages.
@@ -203,7 +203,7 @@ class DestinationMessageTest {
val parsedMessage = convert(factory(false), inputMessage) as GlobalCheckpoint
Assertions.assertEquals(
assertEquals(
inputMessage
.also { it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) }
.serializeToString(),

View File

@@ -1 +1 @@
version=0.1.65
version=0.1.66