trigger-cdc: fix empty changes case
This commit is contained in:
@@ -100,7 +100,7 @@ class TriggerUnsplittableSnapshotWithCursorPartition(
|
||||
get() =
|
||||
TriggerStreamStateValue.cursorIncrementalCheckpoint(
|
||||
cursor,
|
||||
cursorCheckpoint = streamState.cursorUpperBound!!,
|
||||
cursorCheckpoint = streamState.cursorUpperBound ?: Jsons.nullNode(),
|
||||
)
|
||||
|
||||
override val cursorUpperBoundQuery: SelectQuery
|
||||
@@ -276,7 +276,7 @@ sealed class TriggerCursorPartition(
|
||||
JdbcCursorPartition<TriggerStreamState> {
|
||||
|
||||
val cursorUpperBound: JsonNode
|
||||
get() = explicitCursorUpperBound ?: streamState.cursorUpperBound!!
|
||||
get() = explicitCursorUpperBound ?: streamState.cursorUpperBound ?: Jsons.nullNode()
|
||||
val cursorUpperBoundFrom: From =
|
||||
if (triggerCdcPartitionState == null) from
|
||||
else
|
||||
|
||||
@@ -63,6 +63,13 @@ class TriggerPartitionFactory(
|
||||
val stream: Stream = streamFeedBootstrap.feed
|
||||
val streamState: TriggerStreamState = streamState(streamFeedBootstrap)
|
||||
val opaqueStateValue: OpaqueStateValue? = streamFeedBootstrap.currentState
|
||||
|
||||
// An empty table stream state will be marked as a nullNode. This prevents repeated attempt
|
||||
// to read it
|
||||
if (opaqueStateValue?.isNull == true) {
|
||||
return null
|
||||
}
|
||||
|
||||
if (opaqueStateValue == null) {
|
||||
return coldStart(streamState)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user