diff --git a/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartition.kt b/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartition.kt index 4d2789bbd7b..def1d836a54 100644 --- a/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartition.kt +++ b/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartition.kt @@ -100,7 +100,7 @@ class TriggerUnsplittableSnapshotWithCursorPartition( get() = TriggerStreamStateValue.cursorIncrementalCheckpoint( cursor, - cursorCheckpoint = streamState.cursorUpperBound!!, + cursorCheckpoint = streamState.cursorUpperBound ?: Jsons.nullNode(), ) override val cursorUpperBoundQuery: SelectQuery @@ -276,7 +276,7 @@ sealed class TriggerCursorPartition( JdbcCursorPartition { val cursorUpperBound: JsonNode - get() = explicitCursorUpperBound ?: streamState.cursorUpperBound!! + get() = explicitCursorUpperBound ?: streamState.cursorUpperBound ?: Jsons.nullNode() val cursorUpperBoundFrom: From = if (triggerCdcPartitionState == null) from else diff --git a/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartitionFactory.kt b/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartitionFactory.kt index 43824883b74..1e78d6d6147 100644 --- a/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartitionFactory.kt +++ b/airbyte-cdk/bulk/toolkits/extract-trigger/src/main/kotlin/io/airbyte/cdk/TriggerPartitionFactory.kt @@ -63,6 +63,13 @@ class TriggerPartitionFactory( val stream: Stream = streamFeedBootstrap.feed val streamState: TriggerStreamState = streamState(streamFeedBootstrap) val opaqueStateValue: OpaqueStateValue? = streamFeedBootstrap.currentState + + // An empty table stream state will be marked as a nullNode. This prevents repeated attempt + // to read it + if (opaqueStateValue?.isNull == true) { + return null + } + if (opaqueStateValue == null) { return coldStart(streamState) }