Fix multiple mssql issues (#69311)
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
testExecutionConcurrency=1
|
||||
JunitMethodExecutionTimeout=5m
|
||||
cdkVersion=0.1.58
|
||||
cdkVersion=0.1.77
|
||||
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
|
||||
dockerImageTag: 4.3.0-rc.5
|
||||
dockerImageTag: 4.3.0-rc.6
|
||||
dockerRepository: airbyte/source-mssql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
|
||||
githubIssueLabel: source-mssql
|
||||
|
||||
@@ -265,7 +265,6 @@ constructor(
|
||||
Duration.ofMillis(
|
||||
MsSqlServerSourceConfigurationSpecification.DEFAULT_HEARTBEAT_INTERVAL_MS
|
||||
),
|
||||
resourceAcquisitionHeartbeat = Duration.ofSeconds(15),
|
||||
incrementalReplicationConfiguration = incrementalReplicationConfiguration,
|
||||
databaseName = pojo.database
|
||||
)
|
||||
|
||||
@@ -96,8 +96,11 @@ object MsSqlServerStateMigration {
|
||||
"Migrating OrderedColumnLoadStatus state: ordered_col=${legacy.orderedCol}, ordered_col_val=${legacy.orderedColVal}"
|
||||
}
|
||||
|
||||
// Extract incremental state if present
|
||||
val incrementalState = legacy.incrementalState?.let { migrateCursorBasedStatusFromJson(it) }
|
||||
// Extract incremental state if present and not null
|
||||
val incrementalState =
|
||||
legacy.incrementalState
|
||||
?.takeIf { !it.isNull }
|
||||
?.let { migrateCursorBasedStatusFromJson(it) }
|
||||
|
||||
// Convert String to JsonNode, handling null and "null" string
|
||||
val pkValueNode: JsonNode? =
|
||||
|
||||
@@ -202,4 +202,32 @@ class MsSqlServerStateMigrationTest {
|
||||
assertEquals(0, parsed.cursorRecordCount)
|
||||
assertEquals(MsSqlServerJdbcStreamStateValue.CURRENT_VERSION, parsed.version)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `should handle ordered column state with explicit null incremental_state`() {
|
||||
// This test case simulates the exact scenario from the bug report where
|
||||
// incremental_state is explicitly set to null (JSON null, not missing field)
|
||||
val legacyOrderedColumnStateWithNullIncremental =
|
||||
"""
|
||||
{
|
||||
"version": 2,
|
||||
"state_type": "ordered_column",
|
||||
"ordered_col": "id",
|
||||
"ordered_col_val": "23",
|
||||
"incremental_state": null
|
||||
}
|
||||
""".trimIndent()
|
||||
|
||||
val parsed =
|
||||
MsSqlServerStateMigration.parseStateValue(
|
||||
Jsons.readTree(legacyOrderedColumnStateWithNullIncremental)
|
||||
)
|
||||
|
||||
// Should successfully migrate without NPE
|
||||
assertEquals("primary_key", parsed.stateType)
|
||||
assertEquals("id", parsed.pkName)
|
||||
assertEquals("23", parsed.pkValue?.asText())
|
||||
assertNull(parsed.incrementalState)
|
||||
assertEquals(MsSqlServerJdbcStreamStateValue.CURRENT_VERSION, parsed.version)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user