1
0
mirror of synced 2025-12-25 02:09:19 -05:00

source-mssql: Fix primary key discovery and clustered index handling (#69194)

This commit is contained in:
Wenqi Hu
2025-11-07 12:36:10 -08:00
committed by GitHub
parent 4b367de700
commit f88fce736b
9 changed files with 832 additions and 144 deletions

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.3.0-rc.3
dockerImageTag: 4.3.0-rc.4
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql

View File

@@ -152,7 +152,7 @@ fun stateValueToJsonNode(field: Field, stateValue: String?): JsonNode {
sealed class MsSqlServerJdbcPartition(
val selectQueryGenerator: SelectQueryGenerator,
streamState: DefaultJdbcStreamState,
override val streamState: DefaultJdbcStreamState,
) : JdbcPartition<DefaultJdbcStreamState> {
val stream: Stream = streamState.stream
val from = From(stream.name, stream.namespace)
@@ -176,7 +176,7 @@ sealed class MsSqlServerJdbcPartition(
class MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
) : MsSqlServerJdbcPartition(selectQueryGenerator, streamState) {
override val completeState: OpaqueStateValue = MsSqlServerJdbcStreamStateValue.snapshotCompleted
@@ -184,7 +184,7 @@ class MsSqlServerJdbcNonResumableSnapshotPartition(
class MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
val cursor: Field,
val cursorCutoffTime: JsonNode? = null,
) :
@@ -322,7 +322,7 @@ sealed class MsSqlServerJdbcResumablePartition(
/** RFR for cursor based read. */
class MsSqlServerJdbcRfrSnapshotPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
primaryKey: List<Field>,
override val lowerBound: List<JsonNode>?,
override val upperBound: List<JsonNode>?,
@@ -351,7 +351,7 @@ class MsSqlServerJdbcRfrSnapshotPartition(
/** RFR for CDC. */
class MsSqlServerJdbcCdcRfrSnapshotPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
primaryKey: List<Field>,
override val lowerBound: List<JsonNode>?,
override val upperBound: List<JsonNode>?,
@@ -380,9 +380,9 @@ class MsSqlServerJdbcCdcRfrSnapshotPartition(
*/
class MsSqlServerJdbcCdcSnapshotPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
primaryKey: List<Field>,
override val lowerBound: List<JsonNode>?
override val lowerBound: List<JsonNode>?,
) : MsSqlServerJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
override val upperBound: List<JsonNode>? = null
override val completeState: OpaqueStateValue
@@ -414,7 +414,7 @@ sealed class MsSqlServerJdbcCursorPartition(
val cursorUpperBoundQuerySpec: SelectQuerySpec
get() =
if (cursorCutoffTime != null && checkpointColumns.contains(cursor)) {
if (cursorCutoffTime != null) {
// When excluding today's data, apply cutoff constraint to upper bound query too
SelectQuerySpec(
SelectColumnMaxValue(cursor),
@@ -425,9 +425,25 @@ sealed class MsSqlServerJdbcCursorPartition(
SelectQuerySpec(SelectColumnMaxValue(cursor), from)
}
// Override samplingQuery to avoid TABLESAMPLE for cursor-based operations
// TABLESAMPLE fails on views and isn't needed for cursor-based incremental reads
// which are typically small (only new/changed data)
override fun samplingQuery(sampleRateInvPow2: Int): SelectQuery {
val sampleSize: Int = streamState.sharedState.maxSampleSize
val querySpec =
SelectQuerySpec(
SelectColumns(stream.fields + checkpointColumns),
from,
NoWhere,
OrderBy(checkpointColumns),
Limit(sampleSize.toLong())
)
return selectQueryGenerator.generate(querySpec.optimize())
}
override val additionalWhereClause: WhereClauseNode?
get() =
if (cursorCutoffTime != null && checkpointColumns.contains(cursor)) {
if (cursorCutoffTime != null) {
// Add an additional constraint for the cutoff time
Lesser(cursor, cursorCutoffTime)
} else {
@@ -437,7 +453,7 @@ sealed class MsSqlServerJdbcCursorPartition(
class MsSqlServerJdbcSnapshotWithCursorPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
primaryKey: List<Field>,
override val lowerBound: List<JsonNode>?,
cursor: Field,
@@ -472,7 +488,7 @@ class MsSqlServerJdbcSnapshotWithCursorPartition(
class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
primaryKey: List<Field>,
override val lowerBound: List<JsonNode>?,
override val upperBound: List<JsonNode>?,
@@ -522,7 +538,7 @@ class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
*/
class MsSqlServerJdbcCursorIncrementalPartition(
selectQueryGenerator: SelectQueryGenerator,
override val streamState: DefaultJdbcStreamState,
streamState: DefaultJdbcStreamState,
cursor: Field,
val cursorLowerBound: JsonNode,
override val isLowerBoundIncluded: Boolean,

View File

@@ -35,6 +35,7 @@ class MsSqlServerJdbcPartitionFactory(
override val sharedState: DefaultJdbcSharedState,
val selectQueryGenerator: MsSqlSourceOperations,
val config: MsSqlServerSourceConfiguration,
val metadataQuerierFactory: MsSqlSourceMetadataQuerier.Factory,
) :
JdbcPartitionFactory<
DefaultJdbcSharedState,
@@ -43,6 +44,10 @@ class MsSqlServerJdbcPartitionFactory(
> {
private val log = KotlinLogging.logger {}
private val metadataQuerier: MsSqlSourceMetadataQuerier by lazy {
metadataQuerierFactory.session(config) as MsSqlSourceMetadataQuerier
}
private val streamStates = ConcurrentHashMap<StreamIdentifier, DefaultJdbcStreamState>()
override fun streamState(streamFeedBootstrap: StreamFeedBootstrap): DefaultJdbcStreamState =
@@ -50,18 +55,38 @@ class MsSqlServerJdbcPartitionFactory(
DefaultJdbcStreamState(sharedState, streamFeedBootstrap)
}
private fun findPkUpperBound(stream: Stream, pkChosenFromCatalog: List<Field>): JsonNode {
/** Detects if a stream corresponds to a SQL Server VIEW (vs a TABLE) */
private fun isView(stream: Stream): Boolean {
val tableName = metadataQuerier.findTableName(stream.id) ?: return false
return tableName.type.equals("VIEW", ignoreCase = true)
}
/**
* Returns the ordered column (from clustered index or PK) as a single-element list, or null if
* no ordered column is available. This is used for resumable partitioning.
*/
private fun getOrderedColumnAsList(stream: Stream): List<Field>? {
val orderedColumnName = metadataQuerier.getOrderedColumnForSync(stream.id) ?: return null
val orderedColumn = stream.fields.find { it.id == orderedColumnName } ?: return null
return listOf(orderedColumn)
}
private fun findPkUpperBound(stream: Stream): JsonNode {
// find upper bound using maxPk query
// Use the ordered column for sync (prefers clustered index for SQL Server performance)
val orderedColumnName = metadataQuerier.getOrderedColumnForSync(stream.id)!!
val orderedColumnForSync = stream.fields.find { it.id == orderedColumnName }!!
val jdbcConnectionFactory = JdbcConnectionFactory(config)
val from = From(stream.name, stream.namespace)
val maxPkQuery = SelectQuerySpec(SelectColumnMaxValue(pkChosenFromCatalog[0]), from)
val maxPkQuery = SelectQuerySpec(SelectColumnMaxValue(orderedColumnForSync), from)
jdbcConnectionFactory.get().use { connection ->
val stmt = connection.prepareStatement(selectQueryGenerator.generate(maxPkQuery).sql)
val rs = stmt.executeQuery()
if (rs.next()) {
val jdbcFieldType = pkChosenFromCatalog[0].type as JdbcFieldType<*>
val jdbcFieldType = orderedColumnForSync.type as JdbcFieldType<*>
val pkUpperBound: JsonNode = jdbcFieldType.get(rs, 1)
return pkUpperBound
} else {
@@ -73,22 +98,23 @@ class MsSqlServerJdbcPartitionFactory(
private fun coldStart(streamState: DefaultJdbcStreamState): MsSqlServerJdbcPartition {
val stream: Stream = streamState.stream
val pkChosenFromCatalog: List<Field> = stream.configuredPrimaryKey ?: listOf()
val isView = isView(stream)
val orderedColumns = getOrderedColumnAsList(stream)
if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
if (pkChosenFromCatalog.isEmpty()) {
if (orderedColumns == null) {
return MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator,
streamState,
)
}
val upperBound = findPkUpperBound(stream, pkChosenFromCatalog)
val upperBound = findPkUpperBound(stream)
return if (sharedState.configuration.global) {
MsSqlServerJdbcCdcRfrSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = null,
upperBound = listOf(upperBound),
)
@@ -96,7 +122,7 @@ class MsSqlServerJdbcPartitionFactory(
MsSqlServerJdbcRfrSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = null,
upperBound = listOf(upperBound),
)
@@ -104,10 +130,17 @@ class MsSqlServerJdbcPartitionFactory(
}
if (sharedState.configuration.global) {
// CDC mode: try to use ordered column for resumable snapshots
if (orderedColumns == null) {
return MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator,
streamState,
)
}
return MsSqlServerJdbcCdcSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = null,
)
}
@@ -118,7 +151,9 @@ class MsSqlServerJdbcPartitionFactory(
// Calculate cutoff time for cursor if exclude today's data is enabled
val cursorCutoffTime = getCursorCutoffTime(cursorChosenFromCatalog)
if (pkChosenFromCatalog.isEmpty()) {
// Views can't be sampled with TABLESAMPLE, so use non-resumable partitions
// which skip the sampling step entirely
if (isView || orderedColumns == null) {
return MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
selectQueryGenerator,
streamState,
@@ -126,10 +161,11 @@ class MsSqlServerJdbcPartitionFactory(
cursorCutoffTime = cursorCutoffTime,
)
}
return MsSqlServerJdbcSnapshotWithCursorPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = null,
cursorChosenFromCatalog,
cursorUpperBound = null,
@@ -141,18 +177,20 @@ class MsSqlServerJdbcPartitionFactory(
* Flowchart:
* 1. If the input state is null - using coldstart.
* ```
* a. If it's global but without PK, use non-resumable snapshot.
* b. If it's global with PK, use snapshot.
* a. If it's global but without PK, use non-resumable snapshot.
* b. If it's global with PK, use CDC snapshot.
* c. If it's not global, use snapshot with cursor.
* ```
* 2. If the input state is not null -
* ```
* a. If it's in global mode, JdbcPartitionFactory will not handle this. (TODO)
* a. If it's in global mode:
* i. JdbcPartitionFactory handles the initial CDC snapshot phase (resuming incomplete snapshots)
* ii. CdcPartitionsCreator (in CDK) handles CDC incremental reads after snapshot completes
* b. If it's cursor based, it could be either in PK read phase (initial read) or
* cursor read phase (incremental read). This is differentiated by the stateType.
* i. In PK read phase, use snapshot with cursor. If no PKs were found,
* cursor read phase (incremental read). This is determined by checking if pk_name is set.
* i. In PK read phase (pk_name != null), use snapshot with cursor. If no PKs were found,
* use non-resumable snapshot with cursor.
* ii. In cursor read phase, use cursor incremental.
* ii. In cursor read phase (pk_name == null), use cursor incremental.
* ```
*/
override fun create(streamFeedBootstrap: StreamFeedBootstrap): MsSqlServerJdbcPartition? {
@@ -171,25 +209,15 @@ class MsSqlServerJdbcPartitionFactory(
}
val isCursorBased: Boolean = !sharedState.configuration.global
val orderedColumns = getOrderedColumnAsList(stream)
val pkChosenFromCatalog: List<Field> = stream.configuredPrimaryKey ?: listOf()
if (
pkChosenFromCatalog.isEmpty() &&
stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH
) {
if (
streamState.streamFeedBootstrap.currentState ==
MsSqlServerJdbcStreamStateValue.snapshotCompleted
) {
return null
if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
if (orderedColumns == null) {
return handleFullRefreshWithoutPk(streamState)
}
return MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator,
streamState,
)
}
// CDC sync
if (!isCursorBased) {
val sv: MsSqlServerCdcInitialSnapshotStateValue =
Jsons.treeToValue(
@@ -198,18 +226,22 @@ class MsSqlServerJdbcPartitionFactory(
)
if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
val upperBound = findPkUpperBound(stream, pkChosenFromCatalog)
if (orderedColumns == null) {
return handleFullRefreshWithoutPk(streamState)
}
val upperBound = findPkUpperBound(stream)
if (sv.pkVal == upperBound.asText()) {
return null
}
val pkLowerBound: JsonNode = stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkVal)
val pkLowerBound: JsonNode = stateValueToJsonNode(orderedColumns.first(), sv.pkVal)
return MsSqlServerJdbcRfrSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = if (pkLowerBound.isNull) null else listOf(pkLowerBound),
upperBound = listOf(upperBound)
upperBound = listOf(upperBound),
)
}
@@ -221,30 +253,35 @@ class MsSqlServerJdbcPartitionFactory(
} else {
// This branch indicates snapshot is incomplete. We need to resume based on previous
// snapshot state.
val pkField = pkChosenFromCatalog.first()
val pkLowerBound: JsonNode = stateValueToJsonNode(pkField, sv.pkVal)
if (orderedColumns == null) {
log.warn {
"Table ${stream.name} has no PK or clustered index. Cannot resume CDC snapshot."
}
return MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator,
streamState,
)
}
val pkLowerBound: JsonNode = stateValueToJsonNode(orderedColumns.first(), sv.pkVal)
return MsSqlServerJdbcCdcSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
listOf(pkLowerBound),
)
}
} else {
} else { // Cursor-based sync
val sv: MsSqlServerJdbcStreamStateValue =
MsSqlServerStateMigration.parseStateValue(opaqueStateValue)
if (stream.configuredSyncMode == ConfiguredSyncMode.FULL_REFRESH) {
val upperBound = findPkUpperBound(stream, pkChosenFromCatalog)
val pkLowerBound: JsonNode =
if (sv.pkValue == null || sv.pkValue.isNull) {
Jsons.nullNode()
} else if (sv.pkValue.isTextual) {
stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkValue.asText())
} else {
sv.pkValue
}
if (orderedColumns == null) {
return handleFullRefreshWithoutPk(streamState)
}
val upperBound = findPkUpperBound(stream)
val pkLowerBound: JsonNode = extractPkLowerBound(sv.pkValue, orderedColumns.first())
if (!pkLowerBound.isNull && areValuesEqual(pkLowerBound, upperBound)) {
return null
@@ -253,32 +290,35 @@ class MsSqlServerJdbcPartitionFactory(
return MsSqlServerJdbcRfrSnapshotPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = if (pkLowerBound.isNull) null else listOf(pkLowerBound),
upperBound = listOf(upperBound)
upperBound = listOf(upperBound),
)
}
if (sv.stateType != StateType.CURSOR_BASED.stateType) {
// Loading value from catalog. Note there could be unexpected behaviors if user
// updates their schema but did not reset their state.
val pkLowerBound: JsonNode =
if (sv.pkValue == null || sv.pkValue.isNull) {
Jsons.nullNode()
} else if (sv.pkValue.isTextual) {
stateValueToJsonNode(pkChosenFromCatalog[0], sv.pkValue.asText())
} else {
sv.pkValue
}
// Resume from full refresh
if (sv.pkName != null) {
// Still in snapshot phase (PK read)
val cursorChosenFromCatalog: Field =
stream.configuredCursor as? Field ?: throw ConfigErrorException("no cursor")
// in a state where it's still in primary_key read part.
// Views can't be sampled with TABLESAMPLE, so use non-resumable partitions
if (isView(stream) || orderedColumns == null) {
return MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
selectQueryGenerator,
streamState,
cursorChosenFromCatalog,
cursorCutoffTime = getCursorCutoffTime(cursorChosenFromCatalog),
)
}
val pkLowerBound: JsonNode = extractPkLowerBound(sv.pkValue, orderedColumns.first())
// Still in primary_key read phase (snapshot with cursor)
return MsSqlServerJdbcSnapshotWithCursorPartition(
selectQueryGenerator,
streamState,
pkChosenFromCatalog,
orderedColumns,
lowerBound = listOf(pkLowerBound),
cursorChosenFromCatalog,
cursorUpperBound = null,
@@ -295,6 +335,7 @@ class MsSqlServerJdbcPartitionFactory(
return null
}
// Cursor read phase (incremental read)
val cursor: Field? = stream.fields.find { it.id == sv.cursorField.first() }
if (cursor == null) {
log.warn {
@@ -331,6 +372,29 @@ class MsSqlServerJdbcPartitionFactory(
}
}
private fun handleFullRefreshWithoutPk(
streamState: DefaultJdbcStreamState
): MsSqlServerJdbcPartition? {
if (
streamState.streamFeedBootstrap.currentState ==
MsSqlServerJdbcStreamStateValue.snapshotCompleted
) {
return null
}
return MsSqlServerJdbcNonResumableSnapshotPartition(
selectQueryGenerator,
streamState,
)
}
private fun extractPkLowerBound(pkValue: JsonNode?, orderedColumnForSync: Field): JsonNode {
return when {
pkValue == null || pkValue.isNull -> Jsons.nullNode()
pkValue.isTextual -> stateValueToJsonNode(orderedColumnForSync, pkValue.asText())
else -> pkValue
}
}
private fun getCursorCutoffTime(cursorField: Field): JsonNode? {
val incrementalConfig = config.incrementalReplicationConfiguration
return if (

View File

@@ -204,48 +204,31 @@ class MsSqlSourceMetadataQuerier(
}
/**
* Returns the primary key for discovery/catalog purposes.
*
* This returns the actual PRIMARY KEY constraint defined on the table, which represents the
* logical uniqueness constraint. This is used for catalog discovery and schema definition.
*
* Note: This is separate from the sync strategy (which column to use for ordered column
* loading), which is determined by [getOrderedColumnForSync].
*
* The logic flow:
* 1. Check for clustered index
* 2. If single-column clustered index exists → Use it
* 3. If composite clustered index exists → Use primary key
* 4. If no clustered index exists → Use primary key
* 5. If no primary key exists → Check configured catalog for user-defined logical PK
* 6. If no logical PK exists → Return empty list
* 1. Check for primary key constraint
* 2. If primary key exists → Use it
* 3. If no primary key exists → Check configured catalog for user-defined logical PK
* 4. If no logical PK exists → Return empty list
*/
override fun primaryKey(
streamID: StreamIdentifier,
): List<List<String>> {
val table: TableName = findTableName(streamID) ?: return listOf()
// First try to get clustered index keys
val clusteredIndexKeys = memoizedClusteredIndexKeys[table]
// Use clustered index if it exists and is a single column
// For composite clustered indexes, fall back to primary key
val databasePK =
when {
clusteredIndexKeys != null && clusteredIndexKeys.size == 1 -> {
log.info {
"Using single-column clustered index for table ${table.schema}.${table.name}"
}
clusteredIndexKeys
}
clusteredIndexKeys != null && clusteredIndexKeys.size > 1 -> {
log.info {
"Clustered index is composite for table ${table.schema}.${table.name}. Falling back to primary key."
}
memoizedPrimaryKeys[table]
}
else -> {
log.info {
"No clustered index found for table ${table.schema}.${table.name}. Using primary key."
}
memoizedPrimaryKeys[table]
}
}
// If we found a database PK, use it
// First try to get the actual primary key constraint
val databasePK = memoizedPrimaryKeys[table]
if (!databasePK.isNullOrEmpty()) {
log.info {
"Found primary key for table ${table.schema}.${table.name}: ${databasePK.flatten()}"
}
return databasePK
}
@@ -261,9 +244,63 @@ class MsSqlSourceMetadataQuerier(
return logicalPK
}
log.info { "No primary key or logical PK found for table ${table.schema}.${table.name}" }
return listOf()
}
/**
* Returns the column to use for ordered column (OC) syncing strategy.
*
* This determines which column should be used for incremental sync with ordered column loading,
* prioritizing SQL Server performance characteristics.
*
* The logic flow:
* 1. If single-column clustered index exists → Use it (best performance for SQL Server)
* 2. If composite clustered index or no clustered index → Use first column of primary key
* 3. If no primary key → Use first column of logical PK from configured catalog
* 4. If nothing available → Return null
*
* Note: This is separate from [primaryKey] which returns the full PK for discovery purposes.
*/
fun getOrderedColumnForSync(streamID: StreamIdentifier): String? {
val table: TableName = findTableName(streamID) ?: return null
// Prefer single-column clustered index for best SQL Server performance
val clusteredIndexKeys = memoizedClusteredIndexKeys[table]
if (clusteredIndexKeys != null && clusteredIndexKeys.size == 1) {
val column = clusteredIndexKeys[0][0]
log.info {
"Using single-column clustered index for sync: ${table.schema}.${table.name} -> $column"
}
return column
}
// Fall back to first column of primary key
val databasePK = memoizedPrimaryKeys[table]
if (!databasePK.isNullOrEmpty()) {
val column = databasePK[0][0]
log.info {
"Clustered index is composite or not found. Using first PK column for sync: ${table.schema}.${table.name} -> $column"
}
return column
}
// Fall back to first column of logical PK
val logicalPK = getUserDefinedPrimaryKey(streamID)
if (logicalPK.isNotEmpty()) {
val column = logicalPK[0][0]
log.info {
"No physical primary key. Using first logical PK column for sync: ${table.schema}.${table.name} -> $column"
}
return column
}
log.warn {
"No suitable column found for ordered column sync: ${table.schema}.${table.name}"
}
return null
}
/**
* Gets the user-defined logical primary key from the configured catalog. This is used for
* backward compatibility with the old connector where users could configure logical PKs for
@@ -357,27 +394,28 @@ class MsSqlSourceMetadataQuerier(
const val CLUSTERED_INDEX_QUERY_FMTSTR =
"""
SELECT
SELECT
s.name as table_schema,
t.name as table_name,
i.name as index_name,
ic.key_ordinal,
c.name as column_name
FROM
FROM
sys.tables t
INNER JOIN
INNER JOIN
sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN
INNER JOIN
sys.indexes i ON t.object_id = i.object_id
INNER JOIN
INNER JOIN
sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
INNER JOIN
INNER JOIN
sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE
WHERE
s.name IN (%s)
AND i.type = 1 -- Clustered index
AND i.is_unique = 1 -- Only unique indexes
AND ic.is_included_column = 0 -- Only key columns, not included columns
ORDER BY
ORDER BY
s.name, t.name, ic.key_ordinal;
"""

View File

@@ -12,6 +12,7 @@ import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.data.FloatCodec
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.LocalDateTimeCodec
import io.airbyte.cdk.data.TextCodec
import io.airbyte.cdk.discover.CdcIntegerMetaFieldType
import io.airbyte.cdk.discover.CdcOffsetDateTimeMetaFieldType
@@ -37,6 +38,8 @@ import jakarta.inject.Singleton
import java.sql.JDBCType
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.LocalDateTime
import java.time.OffsetDateTime
private val log = KotlinLogging.logger {}
@@ -89,7 +92,7 @@ class MsSqlSourceOperations :
JDBCType.LONGNVARCHAR -> StringFieldType
JDBCType.DATE -> LocalDateFieldType
JDBCType.TIME -> LocalTimeFieldType
JDBCType.TIMESTAMP -> LocalDateTimeFieldType
JDBCType.TIMESTAMP -> MsSqlServerLocalDateTimeFieldType
JDBCType.BINARY,
JDBCType.VARBINARY,
JDBCType.LONGVARBINARY -> BytesFieldType
@@ -114,6 +117,38 @@ class MsSqlSourceOperations :
return retVal
}
// Custom LocalDateTime accessor that truncates to 6 decimal places (microseconds)
// SQL Server datetime2 can have up to 7 decimal places, but destination may only support 6
// decimal places
data object MsSqlServerLocalDateTimeAccessor : JdbcAccessor<LocalDateTime> {
override fun get(
rs: ResultSet,
colIdx: Int,
): LocalDateTime? {
val timestamp = rs.getTimestamp(colIdx)?.takeUnless { rs.wasNull() } ?: return null
val localDateTime = timestamp.toLocalDateTime()
// Truncate to microseconds (6 decimal places) by zeroing out the nanoseconds beyond
// microseconds
val truncatedNanos = (localDateTime.nano / 1000) * 1000
return localDateTime.withNano(truncatedNanos)
}
override fun set(
stmt: PreparedStatement,
paramIdx: Int,
value: LocalDateTime,
) {
stmt.setTimestamp(paramIdx, Timestamp.valueOf(value))
}
}
data object MsSqlServerLocalDateTimeFieldType :
SymmetricJdbcFieldType<LocalDateTime>(
LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE,
MsSqlServerLocalDateTimeAccessor,
LocalDateTimeCodec,
)
data object MsSqlServerFloatAccessor : JdbcAccessor<Float> {
override fun get(
rs: ResultSet,
@@ -203,7 +238,7 @@ class MsSqlSourceOperations :
val jdbcType: JdbcFieldType<*>,
) {
BINARY_FIELD(BinaryStreamFieldType, "VARBINARY", "BINARY"),
DATETIME_TYPES(LocalDateTimeFieldType, "DATETIME", "DATETIME2", "SMALLDATETIME"),
DATETIME_TYPES(MsSqlServerLocalDateTimeFieldType, "DATETIME", "DATETIME2", "SMALLDATETIME"),
DATE(LocalDateFieldType, "DATE"),
DATETIMEOFFSET(OffsetDateTimeFieldType, "DATETIMEOFFSET"),
TIME_TYPE(LocalTimeFieldType, "TIME"),

View File

@@ -12,6 +12,7 @@ import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.MetaField
import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.discover.TableName
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.IntFieldType
@@ -31,6 +32,7 @@ import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.StreamFeedBootstrap
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.mockk.every
import io.mockk.mockk
import java.time.OffsetDateTime
import java.util.Base64
@@ -48,12 +50,46 @@ class MsSqlServerJdbcPartitionFactoryTest {
private val cdcSharedState = sharedState(global = true)
private val config = mockk<MsSqlServerSourceConfiguration>(relaxed = true)
val metadataQuerier =
mockk<MsSqlSourceMetadataQuerier>(relaxed = true) {
// Mock getOrderedColumnForSync to return the first PK column by default
every { getOrderedColumnForSync(any()) } answers
{
val streamId = firstArg<StreamIdentifier>()
// For full_refresh tests without PK/CI, return null
if (
streamId.name == "full_refresh_table" ||
streamId.name == "cdc_full_refresh_table"
) {
null
} else {
// For most tests, return "id" (the first PK column or clustered index)
fieldId.id
}
}
}
val metadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>(relaxed = true) {
every { session(any()) } returns metadataQuerier
}
val msSqlServerJdbcPartitionFactory =
MsSqlServerJdbcPartitionFactory(sharedState, selectQueryGenerator, config)
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
metadataQuerierFactory
)
val msSqlServerCdcJdbcPartitionFactory =
MsSqlServerJdbcPartitionFactory(cdcSharedState, selectQueryGenerator, config)
MsSqlServerJdbcPartitionFactory(
cdcSharedState,
selectQueryGenerator,
config,
metadataQuerierFactory
)
val fieldId = Field("id", IntFieldType)
val fieldName = Field("name", io.airbyte.cdk.jdbc.StringFieldType)
val stream =
Stream(
id =
@@ -202,7 +238,9 @@ class MsSqlServerJdbcPartitionFactoryTest {
)
val jdbcPartition =
msSqlServerJdbcPartitionFactory.create(streamFeedBootstrap(streamWithoutPk))
assertTrue(jdbcPartition is MsSqlServerJdbcNonResumableSnapshotWithCursorPartition)
// With our changes, if there's a clustered index (which the mock returns "id"),
// we use resumable snapshots even without a configured PK
assertTrue(jdbcPartition is MsSqlServerJdbcSnapshotWithCursorPartition)
}
@Test
@@ -516,4 +554,270 @@ class MsSqlServerJdbcPartitionFactoryTest {
// Verify it's BigDecimal, not Double
assertTrue(cursorLowerBound.isBigDecimal)
}
@Test
fun testViewDetectionForTable() {
// Test that regular tables are correctly identified as non-views
val tableStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("regular_table")
),
schema = setOf(fieldId),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(fieldId),
configuredCursor = fieldId,
)
// Mock the metadata querier to return a TABLE type
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(tableStream.id) } returns
TableName(name = "regular_table", schema = "dbo", type = "TABLE")
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns fieldId.id
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(tableStream))
// Regular tables with primary keys should use resumable partitions which support sampling
assertTrue(partition is MsSqlServerJdbcSnapshotWithCursorPartition)
}
@Test
fun testViewDetectionForView() {
// Test that views use non-resumable partitions to avoid sampling entirely
val viewStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("vw_test_view")
),
schema = setOf(fieldId),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(fieldId),
configuredCursor = fieldId,
)
// Mock the metadata querier to return a VIEW type
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(viewStream.id) } returns
TableName(name = "vw_test_view", schema = "dbo", type = "VIEW")
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns fieldId.id
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(viewStream))
// Views should use non-resumable partitions to skip sampling entirely
assertTrue(
partition is MsSqlServerJdbcNonResumableSnapshotWithCursorPartition,
"Views should use non-resumable partitions to avoid sampling"
)
}
@Test
fun testViewDetectionWithNullTableName() {
// Test that when metadata querier returns null, we default to treating it as a table
val unknownStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("unknown_object")
),
schema = setOf(fieldId),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(fieldId),
configuredCursor = fieldId,
)
// Mock the metadata querier to return null (object not found)
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(unknownStream.id) } returns null
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns fieldId.id
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(unknownStream))
// Unknown objects (when metadata returns null) should default to resumable partitions
assertTrue(partition is MsSqlServerJdbcSnapshotWithCursorPartition)
}
@Test
fun testViewWithConfiguredPkResumingFromState() {
// This test covers the critical bug: a view with a configured PK resuming from state
// should use non-resumable partitions to avoid TABLESAMPLE
val viewStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("VW_FICHES_PRODUITS")
),
schema = setOf(fieldId, fieldName),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(fieldId),
configuredCursor = fieldName,
)
// State indicating we're in the middle of a snapshot (pk_name is set)
val stateValue: OpaqueStateValue =
Jsons.readTree(
"""
{
"version": 3,
"state_type": "cursor_based",
"pk_name": "id",
"pk_val": "100",
"cursor_field": ["name"],
"cursor": null
}
"""
)
// Mock the metadata querier to return a VIEW type
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(viewStream.id) } returns
TableName(name = "VW_FICHES_PRODUITS", schema = "dbo", type = "VIEW")
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns fieldId.id
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(viewStream, stateValue))
// Views with configured PK resuming from state MUST use non-resumable partitions
// to avoid TABLESAMPLE which fails on views
assertTrue(
partition is MsSqlServerJdbcNonResumableSnapshotWithCursorPartition,
"Views with configured PK resuming from state should use non-resumable partitions to avoid TABLESAMPLE"
)
}
@Test
fun testViewWithoutConfiguredPk() {
// Test that views without a configured PK also use non-resumable partitions
val viewStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("vw_simple_view")
),
schema = setOf(fieldId, fieldName),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = null,
configuredCursor = fieldName,
)
// Mock the metadata querier to return a VIEW type
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(viewStream.id) } returns
TableName(name = "vw_simple_view", schema = "dbo", type = "VIEW")
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns null
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(viewStream))
// Views without configured PK should also use non-resumable partitions
assertTrue(
partition is MsSqlServerJdbcNonResumableSnapshotWithCursorPartition,
"Views without configured PK should use non-resumable partitions"
)
}
@Test
fun testViewInFullRefreshMode() {
// Test that views in full refresh mode also avoid resumable partitions
val viewStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("dbo").withName("vw_fullrefresh_view")
),
schema = setOf(fieldId, fieldName),
configuredSyncMode = ConfiguredSyncMode.FULL_REFRESH,
configuredPrimaryKey = null,
configuredCursor = null,
)
// Mock the metadata querier to return a VIEW type
val mockMetadataQuerier = mockk<MsSqlSourceMetadataQuerier>()
every { mockMetadataQuerier.findTableName(viewStream.id) } returns
TableName(name = "vw_fullrefresh_view", schema = "dbo", type = "VIEW")
every { mockMetadataQuerier.getOrderedColumnForSync(any()) } returns null
val mockMetadataQuerierFactory =
mockk<MsSqlSourceMetadataQuerier.Factory>() {
every { session(any()) } returns mockMetadataQuerier
}
val factoryWithMockedQuerier =
MsSqlServerJdbcPartitionFactory(
sharedState,
selectQueryGenerator,
config,
mockMetadataQuerierFactory
)
val partition = factoryWithMockedQuerier.create(streamFeedBootstrap(viewStream))
// Views in full refresh without PK should use non-resumable partitions
assertTrue(
partition is MsSqlServerJdbcNonResumableSnapshotPartition,
"Views in full refresh mode should use non-resumable partitions"
)
}
}

View File

@@ -0,0 +1,126 @@
/*
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mssql
import io.airbyte.integrations.source.mssql.MsSqlSourceOperations.MsSqlServerLocalDateTimeAccessor
import java.sql.ResultSet
import java.sql.Timestamp
import java.time.LocalDateTime
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.mockito.Mockito.`when`
class MsSqlServerLocalDateTimeAccessorTest {
@Test
fun `test truncates datetime with 7 decimal places to 6 decimal places`() {
// Create a LocalDateTime with 7 decimal places (123456789 nanoseconds)
val dateTimeWith7Decimals = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 123456789)
val timestamp = Timestamp.valueOf(dateTimeWith7Decimals)
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(timestamp)
`when`(resultSet.wasNull()).thenReturn(false)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
// Expected: truncated to microseconds (6 decimal places = 123456000 nanoseconds)
val expected = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 123456000)
assertEquals(expected, result)
assertEquals(123456000, result?.nano)
}
@Test
fun `test handles datetime with 6 decimal places correctly`() {
// Create a LocalDateTime with exactly 6 decimal places (123456000 nanoseconds)
val dateTimeWith6Decimals = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 123456000)
val timestamp = Timestamp.valueOf(dateTimeWith6Decimals)
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(timestamp)
`when`(resultSet.wasNull()).thenReturn(false)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
// Should remain unchanged
assertEquals(dateTimeWith6Decimals, result)
assertEquals(123456000, result?.nano)
}
@Test
fun `test handles datetime with less than 6 decimal places`() {
// Create a LocalDateTime with 3 decimal places (123000000 nanoseconds)
val dateTimeWith3Decimals = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 123000000)
val timestamp = Timestamp.valueOf(dateTimeWith3Decimals)
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(timestamp)
`when`(resultSet.wasNull()).thenReturn(false)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
// Should remain unchanged
assertEquals(dateTimeWith3Decimals, result)
assertEquals(123000000, result?.nano)
}
@Test
fun `test handles null timestamp`() {
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(null)
`when`(resultSet.wasNull()).thenReturn(true)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
assertEquals(null, result)
}
@Test
fun `test datetime2 with maximum precision 7 digits`() {
// SQL Server datetime2(7) can have up to 9999999 nanoseconds (7 decimal places)
val dateTimeMaxPrecision = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 9999999)
val timestamp = Timestamp.valueOf(dateTimeMaxPrecision)
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(timestamp)
`when`(resultSet.wasNull()).thenReturn(false)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
// Expected: truncated to microseconds (999999 * 1000 = 999999000 nanoseconds)
val expected = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 9999000)
assertEquals(expected, result)
assertEquals(9999000, result?.nano)
}
@Test
fun `test formats truncated datetime correctly for BigQuery compatibility`() {
// This is the actual failing case from the log:
// "2025-11-06T22:30:46.0033333" (7 decimals) should become "2025-11-06T22:30:46.003333" (6
// decimals)
val dateTimeWith7Decimals = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 3333333)
val timestamp = Timestamp.valueOf(dateTimeWith7Decimals)
val resultSet = mock(ResultSet::class.java)
`when`(resultSet.getTimestamp(1)).thenReturn(timestamp)
`when`(resultSet.wasNull()).thenReturn(false)
val result = MsSqlServerLocalDateTimeAccessor.get(resultSet, 1)
// Expected: truncated to 3333000 nanoseconds (003333 microseconds)
val expected = LocalDateTime.of(2025, 11, 6, 22, 30, 46, 3333000)
assertEquals(expected, result)
assertEquals(3333000, result?.nano)
// Verify it formats correctly with the standard codec pattern
val formatter = io.airbyte.cdk.data.LocalDateTimeCodec.formatter
val formattedResult = result?.format(formatter)
assertEquals("2025-11-06T22:30:46.003333", formattedResult)
}
}

View File

@@ -73,7 +73,9 @@ class MsSqlSourceMetadataQuerierTest {
"DROP TABLE IF EXISTS dbo.table_with_pk_no_clustered",
"DROP TABLE IF EXISTS dbo.table_with_pk_and_single_clustered",
"DROP TABLE IF EXISTS dbo.table_with_pk_and_composite_clustered",
"DROP TABLE IF EXISTS dbo.table_no_pk_no_clustered"
"DROP TABLE IF EXISTS dbo.table_no_pk_no_clustered",
"DROP TABLE IF EXISTS dbo.table_with_non_unique_clustered",
"DROP TABLE IF EXISTS dbo.table_with_composite_pk"
)
for (ddl in dropStatements) {
@@ -86,8 +88,9 @@ class MsSqlSourceMetadataQuerierTest {
}
}
// Test Case 1: Table with clustered index but no primary key
// Expected: Should use the clustered index column as primary key
// Test Case 1: Table with UNIQUE clustered index but no primary key
// Expected PK: Empty (no PK constraint)
// Expected OC: Use the unique clustered index column
connection.createStatement().use { stmt ->
stmt.execute(
"""
@@ -100,7 +103,7 @@ class MsSqlSourceMetadataQuerierTest {
)
stmt.execute(
"""
CREATE CLUSTERED INDEX idx_clustered_id
CREATE UNIQUE CLUSTERED INDEX idx_clustered_id
ON dbo.table_with_clustered_no_pk (id)
"""
)
@@ -121,9 +124,10 @@ class MsSqlSourceMetadataQuerierTest {
)
}
// Test Case 3: Table with both primary key and single-column clustered index on
// Test Case 3: Table with both primary key and single-column UNIQUE clustered index on
// different columns
// Expected: Should use the single-column clustered index
// Expected PK: Should return actual PK (id)
// Expected OC: Should use single-column unique clustered index (code)
connection.createStatement().use { stmt ->
stmt.execute(
"""
@@ -138,14 +142,15 @@ class MsSqlSourceMetadataQuerierTest {
)
stmt.execute(
"""
CREATE CLUSTERED INDEX idx_clustered_code
CREATE UNIQUE CLUSTERED INDEX idx_clustered_code
ON dbo.table_with_pk_and_single_clustered (code)
"""
)
}
// Test Case 4: Table with primary key and composite clustered index
// Expected: Should use the primary key (not the composite clustered index)
// Test Case 4: Table with primary key and UNIQUE composite clustered index
// Expected PK: Should return actual PK (id)
// Expected OC: Should use first PK column since CI is composite
connection.createStatement().use { stmt ->
stmt.execute(
"""
@@ -161,7 +166,7 @@ class MsSqlSourceMetadataQuerierTest {
)
stmt.execute(
"""
CREATE CLUSTERED INDEX idx_clustered_composite
CREATE UNIQUE CLUSTERED INDEX idx_clustered_composite
ON dbo.table_with_pk_and_composite_clustered (code, category)
"""
)
@@ -180,21 +185,64 @@ class MsSqlSourceMetadataQuerierTest {
"""
)
}
// Test Case 6: Table with PK and NON-UNIQUE clustered index
// Expected PK: Should return actual PK (id)
// Expected OC: Should fall back to first PK column (not CI, since CI is non-unique)
connection.createStatement().use { stmt ->
stmt.execute(
"""
CREATE TABLE dbo.table_with_non_unique_clustered (
id INT NOT NULL,
category NVARCHAR(50) NOT NULL,
name NVARCHAR(100),
created_at DATETIME2,
CONSTRAINT pk_table6 PRIMARY KEY NONCLUSTERED (id)
)
"""
)
stmt.execute(
"""
CREATE CLUSTERED INDEX idx_non_unique_category
ON dbo.table_with_non_unique_clustered (category)
"""
)
}
// Test Case 7: Table with composite PK (3 columns)
// Expected PK: Should return all 3 columns
// Expected OC: Should use first PK column
connection.createStatement().use { stmt ->
stmt.execute(
"""
CREATE TABLE dbo.table_with_composite_pk (
reg_id INT NOT NULL,
agent_id INT NOT NULL,
assigned_date DATE NOT NULL,
name NVARCHAR(100),
CONSTRAINT pk_table7 PRIMARY KEY NONCLUSTERED (reg_id, agent_id, assigned_date)
)
"""
)
}
}
}
@Test
@DisplayName("Should use single-column clustered index when no primary key exists")
@DisplayName("Should return empty PK when only clustered index exists (no PK constraint)")
fun testClusteredIndexNoPrimaryKey() {
val streamId =
StreamIdentifier.from(
StreamDescriptor().withName("table_with_clustered_no_pk").withNamespace("dbo")
)
// Test discovery: should return empty (no PK constraint)
val primaryKey = metadataQuerier.primaryKey(streamId)
assertTrue(primaryKey.isEmpty(), "Should return empty list when no PK constraint exists")
assertEquals(1, primaryKey.size, "Should have one primary key column")
assertEquals(listOf("id"), primaryKey[0], "Should use clustered index column 'id'")
// Test sync strategy: should use clustered index column
val orderedColumn = metadataQuerier.getOrderedColumnForSync(streamId)
assertEquals("id", orderedColumn, "Should use clustered index column 'id' for syncing")
}
@Test
@@ -212,7 +260,7 @@ class MsSqlSourceMetadataQuerierTest {
}
@Test
@DisplayName("Should prefer single-column clustered index over primary key")
@DisplayName("Discovery returns PK, sync strategy prefers clustered index")
fun testSingleClusteredIndexOverPrimaryKey() {
val streamId =
StreamIdentifier.from(
@@ -221,13 +269,17 @@ class MsSqlSourceMetadataQuerierTest {
.withNamespace("dbo")
)
// Test discovery: should return actual PK constraint
val primaryKey = metadataQuerier.primaryKey(streamId)
assertEquals(1, primaryKey.size, "Should have one primary key column")
assertEquals(listOf("id"), primaryKey[0], "Discovery should return actual primary key 'id'")
// Test sync strategy: should prefer clustered index over PK
val orderedColumn = metadataQuerier.getOrderedColumnForSync(streamId)
assertEquals(
listOf("code"),
primaryKey[0],
"Should use single-column clustered index 'code' instead of primary key 'id'"
"code",
orderedColumn,
"Sync strategy should prefer clustered index 'code' over PK"
)
}
@@ -381,6 +433,56 @@ class MsSqlSourceMetadataQuerierTest {
)
}
@Test
@DisplayName("Should filter out non-unique clustered index and fall back to PK")
fun testNonUniqueClusteredIndexFiltered() {
val streamId =
StreamIdentifier.from(
StreamDescriptor().withName("table_with_non_unique_clustered").withNamespace("dbo")
)
// Test discovery: should return actual PK constraint
val primaryKey = metadataQuerier.primaryKey(streamId)
assertEquals(1, primaryKey.size, "Should have one primary key column")
assertEquals(listOf("id"), primaryKey[0], "Discovery should return actual primary key 'id'")
// Verify that non-unique CI is NOT in memoizedClusteredIndexKeys
val tables = metadataQuerier.memoizedTableNames
val table =
tables.find { it.name == "table_with_non_unique_clustered" && it.schema == "dbo" }
assertNotNull(table, "Should find table_with_non_unique_clustered")
val clusteredKeys = metadataQuerier.memoizedClusteredIndexKeys[table]
assertNull(clusteredKeys, "Non-unique clustered index should be filtered out")
// Test sync strategy: should fall back to first PK column (not CI)
val orderedColumn = metadataQuerier.getOrderedColumnForSync(streamId)
assertEquals("id", orderedColumn, "Should fall back to PK 'id' since CI is non-unique")
}
@Test
@DisplayName("Should discover all columns in composite primary key")
fun testCompositePrimaryKey() {
val streamId =
StreamIdentifier.from(
StreamDescriptor().withName("table_with_composite_pk").withNamespace("dbo")
)
// Test discovery: should return all 3 PK columns
val primaryKey = metadataQuerier.primaryKey(streamId)
assertEquals(3, primaryKey.size, "Should have three primary key columns")
assertEquals(listOf("reg_id"), primaryKey[0], "First PK column should be 'reg_id'")
assertEquals(listOf("agent_id"), primaryKey[1], "Second PK column should be 'agent_id'")
assertEquals(
listOf("assigned_date"),
primaryKey[2],
"Third PK column should be 'assigned_date'"
)
// Test sync strategy: should use first PK column
val orderedColumn = metadataQuerier.getOrderedColumnForSync(streamId)
assertEquals("reg_id", orderedColumn, "Should use first PK column 'reg_id' for syncing")
}
@Test
@DisplayName("Should prefer physical PK over user-defined logical PK")
fun testPhysicalPrimaryKeyPreferredOverLogical() {
@@ -443,7 +545,9 @@ class MsSqlSourceMetadataQuerierTest {
"DROP TABLE IF EXISTS dbo.table_with_pk_no_clustered",
"DROP TABLE IF EXISTS dbo.table_with_pk_and_single_clustered",
"DROP TABLE IF EXISTS dbo.table_with_pk_and_composite_clustered",
"DROP TABLE IF EXISTS dbo.table_no_pk_no_clustered"
"DROP TABLE IF EXISTS dbo.table_no_pk_no_clustered",
"DROP TABLE IF EXISTS dbo.table_with_non_unique_clustered",
"DROP TABLE IF EXISTS dbo.table_with_composite_pk"
)
for (ddl in dropStatements) {

View File

@@ -454,6 +454,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.3.0-rc.4 | 2025-11-05 | [69194](https://github.com/airbytehq/airbyte/pull/69194) | Fix composite primary key discovery to show all PK columns; separate PK discovery from sync strategy |
| 4.3.0-rc.3 | 2025-10-31 | [69097](https://github.com/airbytehq/airbyte/pull/69097) | Fix connector state value type from string to json object |
| 4.3.0-rc.2 | 2025-10-29 | [69093](https://github.com/airbytehq/airbyte/pull/69093) | Fix state parsing error for integer and number |
| 4.3.0-rc.1 | 2025-10-28 | [63731](https://github.com/airbytehq/airbyte/pull/63731) | Migrate source mssql from old CDK to new CDK |