Bulk Load: Mark Completed States as Persisted. (#48109)
Closes airbytehq/airbyte-internal-issues#9977 When marking COMPLETE, also mark PERSISTED. This also simplifies the PERSISTED check to no longer need to check for COMPLETED states.
This commit is contained in:
@@ -140,9 +140,9 @@ class DefaultStreamManager(
|
||||
}
|
||||
|
||||
override fun <B : Batch> updateBatchState(batch: BatchEnvelope<B>) {
|
||||
val stateRanges =
|
||||
rangesState[batch.batch.state]
|
||||
?: throw IllegalArgumentException("Invalid batch state: ${batch.batch.state}")
|
||||
|
||||
rangesState[batch.batch.state]
|
||||
?: throw IllegalArgumentException("Invalid batch state: ${batch.batch.state}")
|
||||
|
||||
// Force the ranges to overlap at their endpoints, in order to work around
|
||||
// the behavior of `.encloses`, which otherwise would not consider adjacent ranges as
|
||||
@@ -152,8 +152,21 @@ class DefaultStreamManager(
|
||||
val expanded =
|
||||
batch.ranges.asRanges().map { it.span(Range.singleton(it.upperEndpoint() + 1)) }
|
||||
|
||||
stateRanges.addAll(expanded)
|
||||
log.info { "Updated ranges for ${stream.descriptor}[${batch.batch.state}]: $stateRanges" }
|
||||
when (batch.batch.state) {
|
||||
Batch.State.PERSISTED -> {
|
||||
rangesState[Batch.State.PERSISTED]?.addAll(expanded)
|
||||
}
|
||||
Batch.State.COMPLETE -> {
|
||||
// A COMPLETED state implies PERSISTED, so also mark PERSISTED.
|
||||
rangesState[Batch.State.PERSISTED]?.addAll(expanded)
|
||||
rangesState[Batch.State.COMPLETE]?.addAll(expanded)
|
||||
}
|
||||
else -> Unit
|
||||
}
|
||||
|
||||
log.info {
|
||||
"Updated ranges for ${stream.descriptor}[${batch.batch.state}]: $expanded. PERSISTED is also updated on COMPLETE."
|
||||
}
|
||||
}
|
||||
|
||||
/** True if all records in `[0, index)` have reached the given state. */
|
||||
@@ -172,10 +185,8 @@ class DefaultStreamManager(
|
||||
return isProcessingCompleteForState(recordCount.get(), Batch.State.COMPLETE)
|
||||
}
|
||||
|
||||
/** TODO: Handle conflating PERSISTED w/ COMPLETE upstream, to allow for overlap? */
|
||||
override fun areRecordsPersistedUntil(index: Long): Boolean {
|
||||
return isProcessingCompleteForState(index, Batch.State.PERSISTED) ||
|
||||
isProcessingCompleteForState(index, Batch.State.COMPLETE) // complete => persisted
|
||||
return isProcessingCompleteForState(index, Batch.State.PERSISTED)
|
||||
}
|
||||
|
||||
override fun markSucceeded() {
|
||||
|
||||
@@ -154,6 +154,34 @@ class StreamManagerTest {
|
||||
Pair(stream1, ExpectComplete(true)),
|
||||
)
|
||||
),
|
||||
TestCase(
|
||||
"Single stream, multiple batches, complete also persists",
|
||||
listOf(
|
||||
Pair(stream1, SetRecordCount(10)),
|
||||
Pair(stream1, AddComplete(0, 4)),
|
||||
Pair(stream1, ExpectPersistedUntil(5, true)),
|
||||
Pair(stream1, ExpectComplete(false)),
|
||||
Pair(stream1, SetEndOfStream),
|
||||
Pair(stream1, AddComplete(5, 9)),
|
||||
Pair(stream1, ExpectComplete(true)),
|
||||
)
|
||||
),
|
||||
TestCase(
|
||||
"Single stream, multiple batches, persist/complete out of order",
|
||||
listOf(
|
||||
Pair(stream1, SetRecordCount(10)),
|
||||
Pair(
|
||||
stream1,
|
||||
AddComplete(5, 9)
|
||||
), // complete a rangeset before the preceding rangeset is persisted
|
||||
Pair(stream1, AddPersisted(0, 4)),
|
||||
Pair(stream1, ExpectPersistedUntil(10, true)),
|
||||
Pair(stream1, ExpectComplete(false)),
|
||||
Pair(stream1, AddComplete(0, 4)),
|
||||
Pair(stream1, SetEndOfStream),
|
||||
Pair(stream1, ExpectComplete(true)),
|
||||
)
|
||||
),
|
||||
TestCase(
|
||||
"multiple streams",
|
||||
listOf(
|
||||
@@ -177,7 +205,43 @@ class StreamManagerTest {
|
||||
Pair(stream2, ExpectPersistedUntil(20, true)),
|
||||
Pair(stream2, ExpectComplete(true)),
|
||||
)
|
||||
)
|
||||
),
|
||||
TestCase(
|
||||
"mingle streams, multiple batches, complete also persists",
|
||||
listOf(
|
||||
Pair(stream1, SetRecordCount(10)),
|
||||
Pair(stream1, AddComplete(0, 4)),
|
||||
Pair(stream1, ExpectPersistedUntil(5, true)),
|
||||
Pair(stream2, AddComplete(0, 4)),
|
||||
Pair(stream2, ExpectPersistedUntil(5, true)),
|
||||
Pair(stream1, ExpectComplete(false)),
|
||||
Pair(stream2, ExpectComplete(false)),
|
||||
Pair(stream1, SetEndOfStream),
|
||||
Pair(stream1, AddComplete(5, 9)),
|
||||
Pair(stream2, AddComplete(5, 9)),
|
||||
Pair(stream2, SetEndOfStream),
|
||||
Pair(stream1, ExpectComplete(true)),
|
||||
Pair(stream2, ExpectComplete(true)),
|
||||
)
|
||||
),
|
||||
TestCase(
|
||||
"mingle streams, multiple batches, persist/complete out of order",
|
||||
listOf(
|
||||
Pair(stream1, SetRecordCount(10)),
|
||||
Pair(stream1, AddComplete(5, 9)),
|
||||
Pair(stream1, ExpectPersistedUntil(10, false)),
|
||||
Pair(stream2, AddComplete(5, 9)),
|
||||
Pair(stream2, ExpectPersistedUntil(10, false)),
|
||||
Pair(stream1, ExpectComplete(false)),
|
||||
Pair(stream2, ExpectComplete(false)),
|
||||
Pair(stream1, SetEndOfStream),
|
||||
Pair(stream1, AddComplete(0, 4)),
|
||||
Pair(stream2, AddComplete(0, 4)),
|
||||
Pair(stream2, SetEndOfStream),
|
||||
Pair(stream1, ExpectComplete(true)),
|
||||
Pair(stream2, ExpectComplete(true)),
|
||||
)
|
||||
),
|
||||
)
|
||||
.map { Arguments.of(it) }
|
||||
.stream()
|
||||
|
||||
@@ -277,5 +277,3 @@ To help illustrate what is possible, below are a couple examples of how the retr
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user