1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Bulk load CDK: Even more tests (#47377)

This commit is contained in:
Edward Gao
2024-10-28 15:59:35 -07:00
committed by GitHub
parent 9106d245d8
commit 9dcb7e7ac2
10 changed files with 322 additions and 60 deletions

View File

@@ -19,6 +19,7 @@ class MockBasicFunctionalityIntegrationTest :
NoopExpectedRecordMapper,
NoopNameMapper,
isStreamSchemaRetroactive = false,
supportsDedup = true,
) {
@Test
override fun testBasicWrite() {
@@ -36,8 +37,8 @@ class MockBasicFunctionalityIntegrationTest :
}
@Test
override fun testFunkyStreamAndColumnNames() {
super.testFunkyStreamAndColumnNames()
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}
@Test
@@ -54,4 +55,9 @@ class MockBasicFunctionalityIntegrationTest :
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}
@Test
override fun testDedup() {
super.testDedup()
}
}

View File

@@ -6,8 +6,12 @@ package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.RecordDiffer
import java.util.concurrent.ConcurrentHashMap
object MockDestinationBackend {
@@ -17,6 +21,63 @@ object MockDestinationBackend {
getFile(filename).addAll(records)
}
fun upsert(
filename: String,
primaryKey: List<List<String>>,
cursor: List<String>,
vararg records: OutputRecord
) {
fun getField(path: List<String>, record: OutputRecord): AirbyteValue? {
var currentValue: ObjectValue = record.data
// Iterate over the path, except the final element
for (pathElement in path.subList(0, (path.size - 2).coerceAtLeast(0))) {
when (val next = currentValue.values[pathElement]) {
null,
is NullValue -> return null
!is ObjectValue -> {
throw IllegalStateException(
"Attempted to traverse field list in ${record.data} but found non-object value at $pathElement: $next"
)
}
else -> currentValue = next
}
}
return currentValue.values[path.last()]
}
fun getPk(record: OutputRecord): List<AirbyteValue?> =
primaryKey.map { pkField -> getField(pkField, record) }
fun getCursor(record: OutputRecord): AirbyteValue? = getField(cursor, record)
val file = getFile(filename)
records.forEach { incomingRecord ->
val incomingPk = getPk(incomingRecord)
// Assume that in dedup mode, we don't have duplicates - so we can just find the first
// record with the same PK as the incoming record
val existingRecord =
file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 }
if (existingRecord == null) {
file.add(incomingRecord)
} else {
val incomingCursor = getCursor(incomingRecord)
val existingCursor = getCursor(existingRecord)
val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor)
// If the incoming record has a later cursor,
// or the same cursor but a later extractedAt,
// then upsert. (otherwise discard the incoming record.)
if (
compare > 0 ||
(compare == 0 && incomingRecord.extractedAt > existingRecord.extractedAt)
) {
file.remove(existingRecord)
val deletion = getField(listOf("_ab_cdc_deleted_at"), incomingRecord)
if (deletion == null || deletion is NullValue) {
file.add(incomingRecord)
}
}
}
}
}
fun readFile(filename: String): List<OutputRecord> {
return getFile(filename)
}

View File

@@ -4,6 +4,7 @@
package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
@@ -50,8 +51,8 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
return when (batch) {
is LocalBatch -> {
batch.records.forEach {
MockDestinationBackend.insert(
getFilename(it.stream),
val filename = getFilename(it.stream)
val record =
OutputRecord(
UUID.randomUUID(),
Instant.ofEpochMilli(it.emittedAtMs),
@@ -63,7 +64,17 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
syncId = stream.syncId
),
)
)
val importType = stream.importType
if (importType is Dedupe) {
MockDestinationBackend.upsert(
filename,
importType.primaryKey,
importType.cursor,
record
)
} else {
MockDestinationBackend.insert(filename, record)
}
}
PersistedBatch(batch.records)
}

View File

@@ -5,9 +5,17 @@
package io.airbyte.cdk.load.test.util
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import kotlin.reflect.jvm.jvmName
class RecordDiffer(
@@ -62,12 +70,7 @@ class RecordDiffer(
)
}
// Compare each PK field in order, until we find a field that the two records differ in.
// If all the fields are equal, then these two records have the same PK.
pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0
comparePks(pk1, pk2)
}
/**
@@ -235,7 +238,7 @@ class RecordDiffer(
// with it explicitly in the condition)
val expectedValue = expectedRecord.data.values[key]
val actualValue = actualRecord.data.values[key]
if (expectedValue != actualValue) {
if (valueComparator.compare(expectedValue, actualValue) != 0) {
diff.append("$key: Expected $expectedValue, but was $actualValue\n")
}
}
@@ -248,6 +251,16 @@ class RecordDiffer(
val valueComparator: Comparator<AirbyteValue> =
Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) }
/**
* Compare each PK field in order, until we find a field that the two records differ in. If
* all the fields are equal, then these two records have the same PK.
*/
fun comparePks(pk1: List<AirbyteValue?>, pk2: List<AirbyteValue?>) =
(pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
.firstOrNull { it != 0 }
?: 0)
private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int {
// when comparing values of different types, just sort by their class name.
// in theory, we could check for numeric types and handle them smartly...
@@ -255,9 +268,38 @@ class RecordDiffer(
return if (v1::class != v2::class) {
v1::class.jvmName.compareTo(v2::class.jvmName)
} else {
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
// Handle temporal types specifically, because they require explicit parsing
return when (v1) {
is DateValue ->
LocalDate.parse(v1.value)
.compareTo(LocalDate.parse((v2 as DateValue).value))
is TimeValue -> {
try {
val time1 = LocalTime.parse(v1.value)
val time2 = LocalTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
} catch (e: Exception) {
val time1 = OffsetTime.parse(v1.value)
val time2 = OffsetTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
}
}
is TimestampValue -> {
try {
val ts1 = LocalDateTime.parse(v1.value)
val ts2 = LocalDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
} catch (e: Exception) {
val ts1 = OffsetDateTime.parse(v1.value)
val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
}
}
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
else ->
@Suppress("UNCHECKED_CAST") (v1 as Comparable<AirbyteValue>).compareTo(v2)
}
}
}
}

View File

@@ -13,8 +13,6 @@ import io.micronaut.context.env.yaml.YamlPropertySourceLoader
import java.nio.file.Files
import java.nio.file.Path
const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST"
/**
* Represents a destination process, whether running in-JVM via micronaut, or as a separate Docker
* container. The general lifecycle is:

View File

@@ -11,8 +11,6 @@ import io.airbyte.protocol.models.v0.AirbyteLogMessage
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.nio.file.Files
@@ -20,7 +18,6 @@ import java.nio.file.Path
import java.time.Clock
import java.util.Locale
import java.util.Scanner
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
@@ -238,20 +235,9 @@ class DockerizedDestination(
}
}
@Singleton
@Requires(env = [DOCKERIZED_TEST_ENV])
class DockerizedDestinationFactory(
// Note that this is not the same property as in MetadataYamlPropertySource.
// We get this because IntegrationTest manually sets "classpath:metadata.yaml"
// as a property source.
// MetadataYamlPropertySource has nothing to do with this property.
@Value("\${data.docker-repository}") val imageName: String,
// Most tests will just use micronaut to inject this.
// But some tests will want to manually instantiate an instance,
// e.g. to run an older version of the connector.
// So we just hardcode 'dev' here; manual callers can pass in
// whatever they want.
@Value("dev") val imageVersion: String,
private val imageName: String,
private val imageVersion: String,
) : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,

View File

@@ -11,12 +11,10 @@ import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Requires
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.io.PrintWriter
import java.util.concurrent.Executors
import javax.inject.Singleton
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
@@ -93,11 +91,6 @@ class NonDockerizedDestination(
}
}
// Notably, not actually a Micronaut factory. We want to inject the actual
// factory into our tests, not a pre-instantiated destination, because we want
// to run multiple destination processes per test.
@Singleton
@Requires(notEnv = [DOCKERIZED_TEST_ENV])
class NonDockerizedDestinationFactory : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,

View File

@@ -7,14 +7,18 @@ package io.airbyte.cdk.load.write
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.load.test.util.DestinationCleaner
@@ -30,6 +34,7 @@ import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import java.time.OffsetDateTime
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -63,6 +68,7 @@ abstract class BasicFunctionalityIntegrationTest(
* retroactive schemas: writing a new file without a column has no effect on older files.
*/
val isStreamSchemaRetroactive: Boolean,
val supportsDedup: Boolean,
) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper) {
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
@@ -413,7 +419,7 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
@Disabled
open fun testFunkyStreamAndColumnNames() {
open fun testFunkyCharacters() {
assumeTrue(verifyDataWriting)
fun makeStream(
name: String,
@@ -427,7 +433,8 @@ abstract class BasicFunctionalityIntegrationTest(
minimumGenerationId = 0,
syncId = 42,
)
// Catalog with some weird schemas
// Catalog with some weird schemas.
// Every stream has an int `id`, and maybe some string fields.
val catalog =
DestinationCatalog(
listOf(
@@ -436,34 +443,41 @@ abstract class BasicFunctionalityIntegrationTest(
makeStream("STREAM_WITH_ALL_CAPS"),
makeStream("CapitalCase"),
makeStream(
"stream_with_edge_case_field_names",
"stream_with_edge_case_field_names_and_values",
linkedMapOf(
"id" to intType,
"fieldWithCamelCase" to intType,
"field_with_underscore" to intType,
"FIELD_WITH_ALL_CAPS" to intType,
"field_with_spécial_character" to intType,
"fieldWithCamelCase" to stringType,
"field_with_underscore" to stringType,
"FIELD_WITH_ALL_CAPS" to stringType,
"field_with_spécial_character" to stringType,
// "order" is a reserved word in many sql engines
"order" to intType,
"ProperCase" to intType,
"order" to stringType,
"ProperCase" to stringType,
)
),
// this is apparently trying to test for reserved words?
// https://github.com/airbytehq/airbyte/pull/1753
makeStream("groups", linkedMapOf("id" to intType, "authorization" to intType)),
makeStream(
"groups",
linkedMapOf("id" to intType, "authorization" to stringType)
),
)
)
// For each stream, generate a record containing every field in the schema
// For each stream, generate a record containing every field in the schema.
// The id field is always 42, and the string fields are always "foo\nbar".
val messages =
catalog.streams.map {
catalog.streams.map { stream ->
DestinationRecord(
it.descriptor,
stream.descriptor,
ObjectValue(
(it.schema as ObjectType).properties.mapValuesTo(linkedMapOf()) {
IntegerValue(42)
}
(stream.schema as ObjectType)
.properties
.mapValuesTo(linkedMapOf<String, AirbyteValue>()) {
StringValue("foo\nbar")
}
.also { it["id"] = IntegerValue(42) }
),
1234,
emittedAtMs = 1234,
meta = null,
serialized = "",
)
@@ -479,9 +493,10 @@ abstract class BasicFunctionalityIntegrationTest(
extractedAt = 1234,
generationId = 0,
data =
(stream.schema as ObjectType).properties.mapValuesTo(
linkedMapOf()
) { 42 },
(stream.schema as ObjectType)
.properties
.mapValuesTo(linkedMapOf<String, Any>()) { "foo\nbar" }
.also { it["id"] = 42 },
airbyteMeta = OutputRecord.Meta(syncId = 42)
)
),
@@ -684,8 +699,151 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
@Test
open fun testDedup() {
assumeTrue(supportsDedup)
fun makeStream(syncId: Long) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
importType =
Dedupe(
primaryKey = listOf(listOf("id1"), listOf("id2")),
cursor = listOf("updated_at"),
),
schema =
ObjectType(
properties =
linkedMapOf(
"id1" to intType,
"id2" to intType,
"updated_at" to timestamptzType,
"name" to stringType,
"_ab_cdc_deleted_at" to timestamptzType,
)
),
generationId = 42,
minimumGenerationId = 0,
syncId = syncId,
)
fun makeRecord(data: String, extractedAt: Long) =
DestinationRecord(
randomizedNamespace,
"test_stream",
data,
emittedAtMs = extractedAt,
)
val sync1Stream = makeStream(syncId = 42)
runSync(
configContents,
sync1Stream,
listOf(
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
// This obviously makes no sense in relation to updated_at being in the year 2000,
// but that's OK because (from destinations POV) updated_at has no relation to
// extractedAt.
makeRecord(
"""{"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "name": "Alice1", "_ab_cdc_deleted_at": null}""",
extractedAt = 1000,
),
// Emit a second record for id=(1,200) with a different updated_at.
makeRecord(
"""{"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "name": "Alice2", "_ab_cdc_deleted_at": null}""",
extractedAt = 1000,
),
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an
// explicit null, but we should handle both cases.
makeRecord(
"""{"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob1"}""",
extractedAt = 1000,
),
),
)
dumpAndDiffRecords(
parsedConfig,
listOf(
// Alice has only the newer record, and Bob also exists
OutputRecord(
extractedAt = 1000,
generationId = 42,
data =
mapOf(
"id1" to 1,
"id2" to 200,
"updated_at" to OffsetDateTime.parse("2000-01-01T00:01:00Z"),
"name" to "Alice2",
"_ab_cdc_deleted_at" to null
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
OutputRecord(
extractedAt = 1000,
generationId = 42,
data =
mapOf(
"id1" to 1,
"id2" to 201,
"updated_at" to OffsetDateTime.parse("2000-01-01T00:02:00Z"),
"name" to "Bob1"
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
),
sync1Stream,
primaryKey = listOf(listOf("id1"), listOf("id2")),
cursor = listOf("updated_at"),
)
val sync2Stream = makeStream(syncId = 43)
runSync(
configContents,
sync2Stream,
listOf(
// Update both Alice and Bob
makeRecord(
"""{"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "name": "Alice3", "_ab_cdc_deleted_at": null}""",
extractedAt = 2000,
),
makeRecord(
"""{"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "name": "Bob2"}""",
extractedAt = 2000,
),
// And delete Bob. Again, T+D doesn't check the actual _value_ of deleted_at (i.e.
// the fact that it's in the past is irrelevant). It only cares whether deleted_at
// is non-null. So the destination should delete Bob.
makeRecord(
"""{"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}""",
extractedAt = 2000,
),
),
)
dumpAndDiffRecords(
parsedConfig,
listOf(
// Alice still exists (and has been updated to the latest version), but Bob is gone
OutputRecord(
extractedAt = 2000,
generationId = 42,
data =
mapOf(
"id1" to 1,
"id2" to 200,
"updated_at" to OffsetDateTime.parse("2000-01-02T00:00:00Z"),
"name" to "Alice3",
"_ab_cdc_deleted_at" to null
),
airbyteMeta = OutputRecord.Meta(syncId = 43),
)
),
sync2Stream,
primaryKey = listOf(listOf("id1"), listOf("id2")),
cursor = listOf("updated_at"),
)
}
companion object {
private val intType = FieldType(IntegerType, nullable = true)
private val stringType = FieldType(StringType, nullable = true)
private val timestamptzType = FieldType(TimestampTypeWithTimezone, nullable = true)
}
}

View File

@@ -18,6 +18,7 @@ class DevNullBasicFunctionalityIntegrationTest :
NoopExpectedRecordMapper,
verifyDataWriting = false,
isStreamSchemaRetroactive = false,
supportsDedup = false,
) {
@Test
override fun testBasicWrite() {

View File

@@ -18,12 +18,18 @@ abstract class S3V2WriteTest(path: String) :
NoopDestinationCleaner,
NoopExpectedRecordMapper,
isStreamSchemaRetroactive = false,
supportsDedup = false,
) {
@Test
override fun testBasicWrite() {
super.testBasicWrite()
}
@Test
override fun testFunkyCharacters() {
super.testFunkyCharacters()
}
@Disabled
@Test
override fun testMidSyncCheckpointingStreamState() {