Handle empty table states (#48629)
This commit is contained in:
committed by
GitHub
parent
536742b4bd
commit
5f9d290800
@@ -93,8 +93,12 @@ class StateManager(
|
||||
* Updates the internal state of the [StateManager] to ensure idempotency (no redundant messages
|
||||
* are emitted).
|
||||
*/
|
||||
fun checkpoint(): List<AirbyteStateMessage> =
|
||||
listOfNotNull(global?.checkpoint()) + nonGlobal.mapNotNull { it.value.checkpoint() }
|
||||
fun checkpoint(): List<AirbyteStateMessage> {
|
||||
return listOfNotNull(global?.checkpoint()) +
|
||||
nonGlobal
|
||||
.mapNotNull { it.value.checkpoint() }
|
||||
.filter { it.stream.streamState.isNull.not() }
|
||||
}
|
||||
|
||||
private sealed class BaseStateManager<K : Feed>(
|
||||
override val feed: K,
|
||||
@@ -205,7 +209,12 @@ class StateManager(
|
||||
AirbyteStreamState()
|
||||
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
|
||||
.withStreamState(
|
||||
streamStateForCheckpoint.opaqueStateValue ?: Jsons.objectNode()
|
||||
when (streamStateForCheckpoint.opaqueStateValue?.isNull) {
|
||||
null,
|
||||
true -> Jsons.objectNode()
|
||||
false -> streamStateForCheckpoint.opaqueStateValue
|
||||
?: Jsons.objectNode()
|
||||
}
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -140,7 +140,9 @@ class JdbcSequentialPartitionsCreator<
|
||||
// Ensure that the cursor upper bound is known, if required.
|
||||
if (partition is JdbcCursorPartition<*>) {
|
||||
ensureCursorUpperBound()
|
||||
if (streamState.cursorUpperBound?.isNull == true) {
|
||||
if (
|
||||
streamState.cursorUpperBound == null || streamState.cursorUpperBound?.isNull == true
|
||||
) {
|
||||
log.info { "Maximum cursor column value query found that the table was empty." }
|
||||
return listOf(CheckpointOnlyPartitionReader())
|
||||
}
|
||||
@@ -192,7 +194,9 @@ class JdbcConcurrentPartitionsCreator<
|
||||
// Ensure that the cursor upper bound is known, if required.
|
||||
if (partition is JdbcCursorPartition<*>) {
|
||||
ensureCursorUpperBound()
|
||||
if (streamState.cursorUpperBound?.isNull == true) {
|
||||
if (
|
||||
streamState.cursorUpperBound == null || streamState.cursorUpperBound?.isNull == true
|
||||
) {
|
||||
log.info { "Maximum cursor column value query found that the table was empty." }
|
||||
return listOf(CheckpointOnlyPartitionReader())
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
|
||||
dockerImageTag: 3.9.0-rc.22
|
||||
dockerImageTag: 3.9.0-rc.23
|
||||
dockerRepository: airbyte/source-mysql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
|
||||
githubIssueLabel: source-mysql
|
||||
|
||||
@@ -38,13 +38,17 @@ data class MysqlCdcInitialSnapshotStateValue(
|
||||
primaryKeyCheckpoint: List<JsonNode>,
|
||||
): OpaqueStateValue {
|
||||
val primaryKeyField = primaryKey.first()
|
||||
return Jsons.valueToTree(
|
||||
MysqlCdcInitialSnapshotStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkVal = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = "primary_key",
|
||||
)
|
||||
)
|
||||
return when (primaryKeyCheckpoint.first().isNull) {
|
||||
true -> Jsons.nullNode()
|
||||
false ->
|
||||
Jsons.valueToTree(
|
||||
MysqlCdcInitialSnapshotStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkVal = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = "primary_key",
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,7 +212,6 @@ class MysqlJdbcCdcRfrSnapshotPartition(
|
||||
override val lowerBound: List<JsonNode>?,
|
||||
override val upperBound: List<JsonNode>?,
|
||||
) : MysqlJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
|
||||
|
||||
override val completeState: OpaqueStateValue
|
||||
get() =
|
||||
MysqlCdcInitialSnapshotStateValue.snapshotCheckpoint(
|
||||
@@ -263,7 +262,7 @@ sealed class MysqlJdbcCursorPartition(
|
||||
JdbcCursorPartition<DefaultJdbcStreamState> {
|
||||
|
||||
val cursorUpperBound: JsonNode
|
||||
get() = explicitCursorUpperBound ?: streamState.cursorUpperBound!!
|
||||
get() = explicitCursorUpperBound ?: streamState.cursorUpperBound ?: Jsons.nullNode()
|
||||
|
||||
override val cursorUpperBoundQuery: SelectQuery
|
||||
get() = selectQueryGenerator.generate(cursorUpperBoundQuerySpec.optimize())
|
||||
|
||||
@@ -158,11 +158,22 @@ class MysqlJdbcPartitionFactory(
|
||||
override fun create(streamFeedBootstrap: StreamFeedBootstrap): MysqlJdbcPartition? {
|
||||
val stream: Stream = streamFeedBootstrap.feed
|
||||
val streamState: DefaultJdbcStreamState = streamState(streamFeedBootstrap)
|
||||
val opaqueStateValue: OpaqueStateValue =
|
||||
when (streamFeedBootstrap.currentState?.isEmpty) {
|
||||
false -> streamFeedBootstrap.currentState!!
|
||||
else -> return coldStart(streamState)
|
||||
}
|
||||
|
||||
// An empty table stream state will be marked as a nullNode. This prevents repeated attempt
|
||||
// to read it
|
||||
if (streamFeedBootstrap.currentState?.isNull == true) {
|
||||
return null
|
||||
}
|
||||
|
||||
// A legacy saved state may be null for an empty table. We will attempt to read it again
|
||||
if (
|
||||
streamFeedBootstrap.currentState == null ||
|
||||
streamFeedBootstrap.currentState?.isEmpty == true
|
||||
) {
|
||||
return coldStart(streamState)
|
||||
}
|
||||
|
||||
val opaqueStateValue: OpaqueStateValue = streamFeedBootstrap.currentState!!
|
||||
|
||||
val isCursorBased: Boolean = !sharedState.configuration.global
|
||||
|
||||
@@ -281,6 +292,10 @@ class MysqlJdbcPartitionFactory(
|
||||
|
||||
// Compose a jsonnode of cursor label to cursor value to fit in
|
||||
// DefaultJdbcCursorIncrementalPartition
|
||||
if (cursorCheckpoint.isNull) {
|
||||
return coldStart(streamState)
|
||||
}
|
||||
|
||||
if (cursorCheckpoint.toString() == streamState.cursorUpperBound?.toString()) {
|
||||
// Incremental complete.
|
||||
return null
|
||||
|
||||
@@ -12,7 +12,7 @@ import io.airbyte.cdk.read.Stream
|
||||
import io.airbyte.cdk.util.Jsons
|
||||
|
||||
data class MysqlJdbcStreamStateValue(
|
||||
@JsonProperty("cursor") val cursors: String = "",
|
||||
@JsonProperty("cursor") val cursors: String? = null,
|
||||
@JsonProperty("version") val version: Int = 2,
|
||||
@JsonProperty("state_type") val stateType: String = StateType.CURSOR_BASED.stateType,
|
||||
@JsonProperty("stream_name") val streamName: String = "",
|
||||
@@ -34,14 +34,18 @@ data class MysqlJdbcStreamStateValue(
|
||||
cursorCheckpoint: JsonNode,
|
||||
stream: Stream,
|
||||
): OpaqueStateValue {
|
||||
return Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
cursorField = listOf(cursor.id),
|
||||
cursors = cursorCheckpoint.asText(),
|
||||
streamName = stream.name,
|
||||
streamNamespace = stream.namespace!!
|
||||
)
|
||||
)
|
||||
return when (cursorCheckpoint.isNull) {
|
||||
true -> Jsons.nullNode()
|
||||
false ->
|
||||
Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
cursorField = listOf(cursor.id),
|
||||
cursors = cursorCheckpoint.asText(),
|
||||
streamName = stream.name,
|
||||
streamNamespace = stream.namespace!!
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/** Value representing the progress of an ongoing snapshot not involving cursor columns. */
|
||||
@@ -50,13 +54,17 @@ data class MysqlJdbcStreamStateValue(
|
||||
primaryKeyCheckpoint: List<JsonNode>,
|
||||
): OpaqueStateValue {
|
||||
val primaryKeyField = primaryKey.first()
|
||||
return Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkValue = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = StateType.PRIMARY_KEY.stateType,
|
||||
)
|
||||
)
|
||||
return when (primaryKeyCheckpoint.first().isNull) {
|
||||
true -> Jsons.nullNode()
|
||||
false ->
|
||||
Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkValue = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = StateType.PRIMARY_KEY.stateType,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/** Value representing the progress of an ongoing snapshot involving cursor columns. */
|
||||
@@ -67,21 +75,25 @@ data class MysqlJdbcStreamStateValue(
|
||||
stream: Stream
|
||||
): OpaqueStateValue {
|
||||
val primaryKeyField = primaryKey.first()
|
||||
return Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkValue = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = StateType.PRIMARY_KEY.stateType,
|
||||
incrementalState =
|
||||
Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
cursorField = listOf(cursor.id),
|
||||
streamName = stream.name,
|
||||
streamNamespace = stream.namespace!!
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
return when (primaryKeyCheckpoint.first().isNull) {
|
||||
true -> Jsons.nullNode()
|
||||
false ->
|
||||
Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
pkName = primaryKeyField.id,
|
||||
pkValue = primaryKeyCheckpoint.first().asText(),
|
||||
stateType = StateType.PRIMARY_KEY.stateType,
|
||||
incrementalState =
|
||||
Jsons.valueToTree(
|
||||
MysqlJdbcStreamStateValue(
|
||||
cursorField = listOf(cursor.id),
|
||||
streamName = stream.name,
|
||||
streamNamespace = stream.namespace!!
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,13 +25,28 @@ import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import java.sql.Connection
|
||||
import java.sql.Statement
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.Timeout
|
||||
import org.testcontainers.containers.MySQLContainer
|
||||
|
||||
class MysqlCursorBasedIntegrationTest {
|
||||
|
||||
@BeforeEach
|
||||
fun resetTable() {
|
||||
connectionFactory.get().use { connection: Connection ->
|
||||
connection.isReadOnly = false
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute("DELETE FROM test.$tableName")
|
||||
}
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute("INSERT INTO test.$tableName (k, v) VALUES (10, 'foo'), (20, 'bar')")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testCursorBasedRead() {
|
||||
val run1: BufferingOutputConsumer =
|
||||
@@ -73,6 +88,16 @@ class MysqlCursorBasedIntegrationTest {
|
||||
assertEquals(recordMessageFromRun1.size, 1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testWithV1StateEmptyCursor() {
|
||||
var state: AirbyteStateMessage =
|
||||
Jsons.readValue(V1_STATE_EMPTY_CURSOR, AirbyteStateMessage::class.java)
|
||||
val run1: BufferingOutputConsumer =
|
||||
CliRunner.source("read", config, getConfiguredCatalog(), listOf(state)).run()
|
||||
val recordMessageFromRun1: List<AirbyteRecordMessage> = run1.records()
|
||||
assertEquals(recordMessageFromRun1.size, 2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testWithFullRefresh() {
|
||||
val fullRefreshCatalog =
|
||||
@@ -90,6 +115,50 @@ class MysqlCursorBasedIntegrationTest {
|
||||
assertEquals(recordMessageFromRun2.size, 0)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testWithFullRefreshWithEmptyTable() {
|
||||
connectionFactory.get().use { connection: Connection ->
|
||||
connection.isReadOnly = false
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute("DELETE FROM test.$tableName")
|
||||
}
|
||||
}
|
||||
|
||||
val fullRefreshCatalog =
|
||||
getConfiguredCatalog().apply { streams[0].syncMode = SyncMode.FULL_REFRESH }
|
||||
val run1: BufferingOutputConsumer =
|
||||
CliRunner.source("read", config, fullRefreshCatalog).run()
|
||||
|
||||
assertTrue(run1.states().isEmpty())
|
||||
assertTrue(run1.records().isEmpty())
|
||||
|
||||
val run2: BufferingOutputConsumer =
|
||||
CliRunner.source("read", config, fullRefreshCatalog).run()
|
||||
assertTrue(run2.states().isEmpty())
|
||||
assertTrue(run2.records().isEmpty())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testCursorBasedReadWithEmptyTable() {
|
||||
connectionFactory.get().use { connection: Connection ->
|
||||
connection.isReadOnly = false
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute("DELETE FROM test.$tableName")
|
||||
}
|
||||
}
|
||||
|
||||
val run1: BufferingOutputConsumer =
|
||||
CliRunner.source("read", config, getConfiguredCatalog()).run()
|
||||
|
||||
assertTrue(run1.states().isEmpty())
|
||||
assertTrue(run1.records().isEmpty())
|
||||
|
||||
val run2: BufferingOutputConsumer =
|
||||
CliRunner.source("read", config, getConfiguredCatalog()).run()
|
||||
assertTrue(run2.states().isEmpty())
|
||||
assertTrue(run2.records().isEmpty())
|
||||
}
|
||||
|
||||
companion object {
|
||||
val log = KotlinLogging.logger {}
|
||||
val dbContainer: MySQLContainer<*> = MysqlContainerFactory.shared(imageName = "mysql:8.0")
|
||||
@@ -135,11 +204,6 @@ class MysqlCursorBasedIntegrationTest {
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute("CREATE TABLE test.$tableName(k INT PRIMARY KEY, v VARCHAR(80))")
|
||||
}
|
||||
connection.createStatement().use { stmt: Statement ->
|
||||
stmt.execute(
|
||||
"INSERT INTO test.$tableName (k, v) VALUES (10, 'foo'), (20, 'bar')"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -166,4 +230,28 @@ class MysqlCursorBasedIntegrationTest {
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
// Legacy mysql connector saved the following state for an empty table in user cursor mode
|
||||
val V1_STATE_EMPTY_CURSOR: String =
|
||||
"""
|
||||
{
|
||||
"type": "STREAM",
|
||||
"stream": {
|
||||
"stream_descriptor": {
|
||||
"name": "${tableName}",
|
||||
"namespace": "test"
|
||||
},
|
||||
"stream_state": {
|
||||
"version": 2,
|
||||
"state_type": "cursor_based",
|
||||
"stream_name": "${tableName}",
|
||||
"cursor_field": [
|
||||
"k"
|
||||
],
|
||||
"stream_namespace": "test",
|
||||
"cursor_record_count": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user