9894 cdc toolkit add time limit for snapshot partitions running in cdc scenario wass (#46699)
This commit is contained in:
committed by
GitHub
parent
fb4a43f603
commit
c49109208e
@@ -10,6 +10,9 @@ interface SourceConfiguration : Configuration, SshTunnelConfiguration {
|
||||
/** Does READ generate states of type GLOBAL? */
|
||||
val global: Boolean
|
||||
|
||||
/** Maximum amount of time may be set to limit overall snapshotting duration */
|
||||
val maxSnapshotReadDuration: Duration?
|
||||
|
||||
/** During the READ operation, how often a feed should checkpoint, ideally. */
|
||||
val checkpointTargetInterval: Duration
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ data class FakeSourceConfiguration(
|
||||
val cursor: CursorConfiguration,
|
||||
override val maxConcurrency: Int,
|
||||
override val checkpointTargetInterval: Duration,
|
||||
override val maxSnapshotReadDuration: Duration? = null,
|
||||
) : SourceConfiguration {
|
||||
override val global: Boolean = cursor is CdcCursor
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.airbyte.cdk.command.JdbcSourceConfiguration
|
||||
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
|
||||
import io.airbyte.cdk.output.OutputConsumer
|
||||
import jakarta.inject.Singleton
|
||||
import java.time.Instant
|
||||
|
||||
/** Default implementation of [JdbcSharedState]. */
|
||||
@Singleton
|
||||
@@ -20,6 +21,9 @@ class DefaultJdbcSharedState(
|
||||
private val globalLockResource: GlobalLockResource,
|
||||
) : JdbcSharedState {
|
||||
|
||||
// First hit to the readStartTime initializes the value.
|
||||
override val snapshotReadStartTime: Instant by
|
||||
lazy(LazyThreadSafetyMode.SYNCHRONIZED) { Instant.now() }
|
||||
override val withSampling: Boolean
|
||||
get() = constants.withSampling
|
||||
|
||||
|
||||
@@ -3,10 +3,12 @@ package io.airbyte.cdk.read
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import io.airbyte.cdk.TransientErrorException
|
||||
import io.airbyte.cdk.command.OpaqueStateValue
|
||||
import io.airbyte.cdk.output.OutputConsumer
|
||||
import io.airbyte.cdk.util.Jsons
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
@@ -57,6 +59,15 @@ sealed class JdbcPartitionReader<P : JdbcPartition<*>>(
|
||||
override fun releaseResources() {
|
||||
acquiredResources.getAndSet(null)?.close()
|
||||
}
|
||||
|
||||
/** If configured max feed read time elapsed we exit with a transient error */
|
||||
protected fun checkMaxReadTimeElapsed() {
|
||||
sharedState.configuration.maxSnapshotReadDuration?.let {
|
||||
if (java.time.Duration.between(sharedState.snapshotReadStartTime, Instant.now()) > it) {
|
||||
throw TransientErrorException("Shutting down snapshot reader: max duration elapsed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** JDBC implementation of [PartitionReader] which reads the [partition] in its entirety. */
|
||||
@@ -68,6 +79,12 @@ open class JdbcNonResumablePartitionReader<P : JdbcPartition<*>>(
|
||||
val numRecords = AtomicLong()
|
||||
|
||||
override suspend fun run() {
|
||||
/* Don't start read if we've gone over max duration.
|
||||
We check for elapsed duration before reading and not while because
|
||||
existing exiting with an exception skips checkpoint(), so any work we
|
||||
did before time has elapsed will be wasted. */
|
||||
checkMaxReadTimeElapsed()
|
||||
|
||||
selectQuerier
|
||||
.executeQuery(
|
||||
q = partition.nonResumableQuery,
|
||||
@@ -109,6 +126,12 @@ open class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
|
||||
val runComplete = AtomicBoolean(false)
|
||||
|
||||
override suspend fun run() {
|
||||
/* Don't start read if we've gone over max duration.
|
||||
We check for elapsed duration before reading and not while because
|
||||
existing exiting with an exception skips checkpoint(), so any work we
|
||||
did before time has elapsed will be wasted. */
|
||||
checkMaxReadTimeElapsed()
|
||||
|
||||
val fetchSize: Int = streamState.fetchSizeOrDefault
|
||||
val limit: Long = streamState.limit
|
||||
incumbentLimit.set(limit)
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import io.airbyte.cdk.command.JdbcSourceConfiguration
|
||||
import io.airbyte.cdk.output.OutputConsumer
|
||||
import io.micronaut.context.annotation.DefaultImplementation
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Encapsulates database-specific state, both constant or transient, common to all partitions.
|
||||
@@ -35,6 +36,9 @@ interface JdbcSharedState {
|
||||
/** Targeted memory footprint of a partition, in bytes. */
|
||||
val targetPartitionByteSize: Long
|
||||
|
||||
/** Keeping the time when the read operation started. */
|
||||
val snapshotReadStartTime: Instant
|
||||
|
||||
/** Creates a new instance of a [JdbcFetchSizeEstimator]. */
|
||||
fun jdbcFetchSizeEstimator(): JdbcFetchSizeEstimator
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
package io.airbyte.cdk.read
|
||||
|
||||
import io.airbyte.cdk.TransientErrorException
|
||||
import io.airbyte.cdk.data.LocalDateCodec
|
||||
import io.airbyte.cdk.output.BufferingOutputConsumer
|
||||
import io.airbyte.cdk.read.TestFixtures.assertFailures
|
||||
@@ -15,6 +16,7 @@ import io.airbyte.cdk.read.TestFixtures.sharedState
|
||||
import io.airbyte.cdk.read.TestFixtures.stream
|
||||
import io.airbyte.cdk.read.TestFixtures.ts
|
||||
import java.time.LocalDate
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.runBlocking
|
||||
@@ -54,7 +56,8 @@ class JdbcPartitionReaderTest {
|
||||
"""{"id":4,"ts":"2024-08-04","msg":"you"}""",
|
||||
"""{"id":5,"ts":"2024-08-05","msg":"today"}""",
|
||||
)
|
||||
)
|
||||
),
|
||||
maxSnapshotReadTime = java.time.Duration.ofMinutes(1),
|
||||
)
|
||||
val factory = sharedState.factory()
|
||||
val result = factory.create(stream, opaqueStateValue(cursor = cursorLowerBound))
|
||||
@@ -132,7 +135,8 @@ class JdbcPartitionReaderTest {
|
||||
"""{"id":3,"ts":"2024-08-03","msg":"are"}""",
|
||||
"""{"id":4,"ts":"2024-08-04","msg":"you"}""",
|
||||
)
|
||||
)
|
||||
),
|
||||
maxSnapshotReadTime = java.time.Duration.ofMinutes(1),
|
||||
)
|
||||
val factory = sharedState.factory()
|
||||
val result = factory.create(stream, opaqueStateValue(cursor = cursorLowerBound))
|
||||
@@ -192,4 +196,120 @@ class JdbcPartitionReaderTest {
|
||||
factory.sharedState.concurrencyResource.available,
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testPartitionMaxReadTime() {
|
||||
// Generate partition
|
||||
val stream = stream(withPK = false)
|
||||
val sharedState =
|
||||
sharedState(
|
||||
mockedQueries =
|
||||
arrayOf(
|
||||
TestFixtures.MockedQuery(
|
||||
SelectQuerySpec(
|
||||
SelectColumns(id, ts, msg),
|
||||
From(stream.name, stream.namespace),
|
||||
Where(
|
||||
And(
|
||||
GreaterOrEqual(ts, LocalDateCodec.encode(cursorLowerBound)),
|
||||
LesserOrEqual(ts, LocalDateCodec.encode(cursorUpperBound)),
|
||||
)
|
||||
),
|
||||
OrderBy(ts),
|
||||
Limit(4),
|
||||
),
|
||||
SelectQuerier.Parameters(reuseResultObject = true, fetchSize = 2),
|
||||
"""{"id":1,"ts":"2024-08-01","msg":"hello"}""",
|
||||
"""{"id":2,"ts":"2024-08-02","msg":"how"}""",
|
||||
"""{"id":3,"ts":"2024-08-03","msg":"are"}""",
|
||||
"""{"id":4,"ts":"2024-08-04","msg":"you"}""",
|
||||
)
|
||||
),
|
||||
maxSnapshotReadTime = java.time.Duration.ofSeconds(1),
|
||||
)
|
||||
val factory = sharedState.factory()
|
||||
val result = factory.create(stream, opaqueStateValue(cursor = cursorLowerBound))
|
||||
factory.assertFailures()
|
||||
Assertions.assertTrue(result is DefaultJdbcCursorIncrementalPartition)
|
||||
val partition = result as DefaultJdbcCursorIncrementalPartition
|
||||
partition.streamState.cursorUpperBound = LocalDateCodec.encode(cursorUpperBound)
|
||||
partition.streamState.fetchSize = 2
|
||||
partition.streamState.updateLimitState { it.up } // so we don't hit the limit
|
||||
// Generate reader
|
||||
val readerResumable = JdbcResumablePartitionReader(partition)
|
||||
// Acquire resources
|
||||
Assertions.assertEquals(
|
||||
sharedState.configuration.maxConcurrency,
|
||||
factory.sharedState.concurrencyResource.available,
|
||||
)
|
||||
Assertions.assertEquals(
|
||||
PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN,
|
||||
readerResumable.tryAcquireResources()
|
||||
)
|
||||
Assertions.assertEquals(
|
||||
sharedState.configuration.maxConcurrency - 1,
|
||||
factory.sharedState.concurrencyResource.available,
|
||||
)
|
||||
|
||||
Assertions.assertThrows(TransientErrorException::class.java) {
|
||||
// Run and simulate timing out
|
||||
runBlocking {
|
||||
sharedState.snapshotReadStartTime
|
||||
delay(1.seconds)
|
||||
readerResumable.run()
|
||||
}
|
||||
}
|
||||
readerResumable.releaseResources()
|
||||
|
||||
// Generate partition
|
||||
val stream2 = stream(withPK = false)
|
||||
val sharedState2 =
|
||||
sharedState(
|
||||
mockedQueries =
|
||||
arrayOf(
|
||||
TestFixtures.MockedQuery(
|
||||
SelectQuerySpec(
|
||||
SelectColumns(id, ts, msg),
|
||||
From(stream.name, stream.namespace),
|
||||
Where(
|
||||
And(
|
||||
GreaterOrEqual(ts, LocalDateCodec.encode(cursorLowerBound)),
|
||||
LesserOrEqual(ts, LocalDateCodec.encode(cursorUpperBound)),
|
||||
)
|
||||
),
|
||||
),
|
||||
SelectQuerier.Parameters(reuseResultObject = true, fetchSize = 2),
|
||||
"""{"id":1,"ts":"2024-08-01","msg":"hello"}""",
|
||||
"""{"id":2,"ts":"2024-08-02","msg":"how"}""",
|
||||
"""{"id":3,"ts":"2024-08-03","msg":"are"}""",
|
||||
"""{"id":4,"ts":"2024-08-04","msg":"you"}""",
|
||||
"""{"id":5,"ts":"2024-08-05","msg":"today"}""",
|
||||
)
|
||||
),
|
||||
maxSnapshotReadTime = java.time.Duration.ofSeconds(1),
|
||||
)
|
||||
val factory2 = sharedState2.factory()
|
||||
val result2 = factory2.create(stream2, opaqueStateValue(cursor = cursorLowerBound))
|
||||
factory2.assertFailures()
|
||||
Assertions.assertTrue(result2 is DefaultJdbcCursorIncrementalPartition)
|
||||
val partition2 = result2 as DefaultJdbcCursorIncrementalPartition
|
||||
partition2.streamState.cursorUpperBound = LocalDateCodec.encode(cursorUpperBound)
|
||||
partition2.streamState.fetchSize = 2
|
||||
// Generate reader
|
||||
|
||||
val readerNonResumable = JdbcNonResumablePartitionReader(partition2)
|
||||
Assertions.assertEquals(
|
||||
PartitionReader.TryAcquireResourcesStatus.READY_TO_RUN,
|
||||
readerNonResumable.tryAcquireResources()
|
||||
)
|
||||
|
||||
Assertions.assertThrows(TransientErrorException::class.java) {
|
||||
// Run and simulate timing out
|
||||
runBlocking {
|
||||
sharedState2.snapshotReadStartTime
|
||||
delay(1.seconds)
|
||||
readerNonResumable.run()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,10 +75,16 @@ object TestFixtures {
|
||||
maxConcurrency: Int = 10,
|
||||
maxMemoryBytesForTesting: Long = 1_000_000L,
|
||||
constants: DefaultJdbcConstants = DefaultJdbcConstants(),
|
||||
maxSnapshotReadTime: Duration? = null,
|
||||
vararg mockedQueries: MockedQuery,
|
||||
): DefaultJdbcSharedState {
|
||||
val configuration =
|
||||
StubbedJdbcSourceConfiguration(global, checkpointTargetInterval, maxConcurrency)
|
||||
StubbedJdbcSourceConfiguration(
|
||||
global,
|
||||
checkpointTargetInterval,
|
||||
maxConcurrency,
|
||||
maxSnapshotReadTime
|
||||
)
|
||||
return DefaultJdbcSharedState(
|
||||
configuration,
|
||||
BufferingOutputConsumer(ClockFactory().fixed()),
|
||||
@@ -119,6 +125,7 @@ object TestFixtures {
|
||||
override val global: Boolean,
|
||||
override val checkpointTargetInterval: Duration,
|
||||
override val maxConcurrency: Int,
|
||||
override val maxSnapshotReadDuration: Duration?,
|
||||
) : JdbcSourceConfiguration {
|
||||
override val realHost: String
|
||||
get() = TODO("Not yet implemented")
|
||||
|
||||
@@ -25,6 +25,7 @@ data class H2SourceConfiguration(
|
||||
val resumablePreferred: Boolean,
|
||||
override val maxConcurrency: Int,
|
||||
override val checkpointTargetInterval: Duration,
|
||||
override val maxSnapshotReadDuration: Duration? = null,
|
||||
) : JdbcSourceConfiguration {
|
||||
override val global: Boolean = cursor is CdcCursor
|
||||
override val jdbcProperties: Map<String, String> = mapOf()
|
||||
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
|
||||
dockerImageTag: 0.0.20
|
||||
dockerImageTag: 0.0.21
|
||||
dockerRepository: airbyte/source-mysql-v2
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
|
||||
githubIssueLabel: source-mysql-v2
|
||||
|
||||
@@ -34,6 +34,7 @@ data class MysqlSourceConfiguration(
|
||||
override val checkPrivileges: Boolean,
|
||||
override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(10),
|
||||
val debeziumKeepAliveInterval: Duration = Duration.ofMinutes(1),
|
||||
override val maxSnapshotReadDuration: Duration?
|
||||
) : JdbcSourceConfiguration, CdcSourceConfiguration {
|
||||
override val global = incrementalConfiguration is CdcIncrementalConfiguration
|
||||
|
||||
@@ -120,6 +121,12 @@ class MysqlSourceConfigurationFactory :
|
||||
val sslJdbcParameters = jdbcEncryption.parseSSLConfig()
|
||||
jdbcProperties.putAll(sslJdbcParameters)
|
||||
|
||||
val cursorConfig = pojo.getCursorMethodConfigurationValue()
|
||||
val maxSnapshotReadTime: Duration? =
|
||||
when (cursorConfig is CdcCursor) {
|
||||
true -> cursorConfig.initialLoadTimeoutHours?.let { Duration.ofHours(it.toLong()) }
|
||||
else -> null
|
||||
}
|
||||
// Build JDBC URL
|
||||
val address = "%s:%d"
|
||||
val jdbcUrlFmt = "jdbc:mysql://${address}"
|
||||
@@ -165,6 +172,7 @@ class MysqlSourceConfigurationFactory :
|
||||
checkpointTargetInterval = checkpointTargetInterval,
|
||||
maxConcurrency = maxConcurrency,
|
||||
checkPrivileges = pojo.checkPrivileges ?: true,
|
||||
maxSnapshotReadDuration = maxSnapshotReadTime
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user