1
0
mirror of synced 2025-12-19 18:14:56 -05:00

Bulk CDK: Rename stuff to prevent conflicts (#46650)

This commit is contained in:
Edward Gao
2024-10-08 14:12:35 -07:00
committed by GitHub
parent c2923bd095
commit 39b32b33a6
119 changed files with 813 additions and 690 deletions

View File

@@ -12,7 +12,7 @@ import io.airbyte.protocol.models.JsonSchemaType
* This maps to the subset of [JsonSchemaType] which is used in practice. Its main reason for * This maps to the subset of [JsonSchemaType] which is used in practice. Its main reason for
* existing is to provide type-safety and convenient comparisons and string representations. * existing is to provide type-safety and convenient comparisons and string representations.
*/ */
sealed interface AirbyteType { sealed interface AirbyteSchemaType {
/** Unwraps the underlying Airbyte protocol type object. */ /** Unwraps the underlying Airbyte protocol type object. */
fun asJsonSchemaType(): JsonSchemaType fun asJsonSchemaType(): JsonSchemaType
@@ -20,18 +20,18 @@ sealed interface AirbyteType {
fun asJsonSchema(): JsonNode = Jsons.valueToTree(asJsonSchemaType().jsonSchemaTypeMap) fun asJsonSchema(): JsonNode = Jsons.valueToTree(asJsonSchemaType().jsonSchemaTypeMap)
} }
data class ArrayAirbyteType( data class ArrayAirbyteSchemaType(
val item: AirbyteType, val item: AirbyteSchemaType,
) : AirbyteType { ) : AirbyteSchemaType {
override fun asJsonSchemaType(): JsonSchemaType = override fun asJsonSchemaType(): JsonSchemaType =
JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY) JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(item.asJsonSchemaType()) .withItems(item.asJsonSchemaType())
.build() .build()
} }
enum class LeafAirbyteType( enum class LeafAirbyteSchemaType(
private val jsonSchemaType: JsonSchemaType, private val jsonSchemaType: JsonSchemaType,
) : AirbyteType { ) : AirbyteSchemaType {
BOOLEAN(JsonSchemaType.BOOLEAN), BOOLEAN(JsonSchemaType.BOOLEAN),
STRING(JsonSchemaType.STRING), STRING(JsonSchemaType.STRING),
BINARY(JsonSchemaType.STRING_BASE_64), BINARY(JsonSchemaType.STRING_BASE_64),

View File

@@ -20,7 +20,7 @@ interface AirbyteStreamFactory {
discoveredStream.id.name, discoveredStream.id.name,
discoveredStream.id.namespace, discoveredStream.id.namespace,
discoveredStream.columns.map { discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType()) AirbyteField.of(it.id, it.type.airbyteSchemaType.asJsonSchemaType())
}, },
) )
} }

View File

@@ -1,12 +1,12 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover package io.airbyte.cdk.discover
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.IntCodec import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.JsonDecoder import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec import io.airbyte.cdk.data.JsonStringCodec
import io.airbyte.cdk.data.LeafAirbyteType import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.OffsetDateTimeCodec import io.airbyte.cdk.data.OffsetDateTimeCodec
import java.time.OffsetDateTime import java.time.OffsetDateTime
@@ -23,7 +23,7 @@ sealed interface FieldOrMetaField {
*/ */
interface FieldType { interface FieldType {
/** maps to [io.airbyte.protocol.models.Field.type] */ /** maps to [io.airbyte.protocol.models.Field.type] */
val airbyteType: AirbyteType val airbyteSchemaType: AirbyteSchemaType
val jsonEncoder: JsonEncoder<*> val jsonEncoder: JsonEncoder<*>
} }
@@ -73,19 +73,20 @@ enum class CommonMetaField(
} }
data object CdcStringMetaFieldType : LosslessFieldType { data object CdcStringMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.STRING override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.STRING
override val jsonEncoder: JsonEncoder<String> = JsonStringCodec override val jsonEncoder: JsonEncoder<String> = JsonStringCodec
override val jsonDecoder: JsonDecoder<String> = JsonStringCodec override val jsonDecoder: JsonDecoder<String> = JsonStringCodec
} }
data object CdcIntegerMetaFieldType : LosslessFieldType { data object CdcIntegerMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.INTEGER override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.INTEGER
override val jsonEncoder: JsonEncoder<Int> = IntCodec override val jsonEncoder: JsonEncoder<Int> = IntCodec
override val jsonDecoder: JsonDecoder<Int> = IntCodec override val jsonDecoder: JsonDecoder<Int> = IntCodec
} }
data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType { data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE override val airbyteSchemaType: AirbyteSchemaType =
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
} }

View File

@@ -2,7 +2,7 @@
package io.airbyte.cdk.output package io.airbyte.cdk.output
import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.DefaultImplementation import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Singleton import jakarta.inject.Singleton
@@ -43,8 +43,8 @@ data class FieldNotFound(
data class FieldTypeMismatch( data class FieldTypeMismatch(
override val streamID: StreamIdentifier, override val streamID: StreamIdentifier,
val fieldName: String, val fieldName: String,
val expected: AirbyteType, val expected: AirbyteSchemaType,
val actual: AirbyteType, val actual: AirbyteSchemaType,
) : CatalogValidationFailure ) : CatalogValidationFailure
data class InvalidPrimaryKey( data class InvalidPrimaryKey(

View File

@@ -10,9 +10,9 @@ import io.airbyte.cdk.command.GlobalInputState
import io.airbyte.cdk.command.InputState import io.airbyte.cdk.command.InputState
import io.airbyte.cdk.command.SourceConfiguration import io.airbyte.cdk.command.SourceConfiguration
import io.airbyte.cdk.command.StreamInputState import io.airbyte.cdk.command.StreamInputState
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.ArrayAirbyteType import io.airbyte.cdk.data.ArrayAirbyteSchemaType
import io.airbyte.cdk.data.LeafAirbyteType import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.discover.CommonMetaField import io.airbyte.cdk.discover.CommonMetaField
import io.airbyte.cdk.discover.Field import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField import io.airbyte.cdk.discover.FieldOrMetaField
@@ -132,7 +132,7 @@ class StateManagerFactory(
} }
} }
val expectedSchema: Map<String, AirbyteType> = val expectedSchema: Map<String, AirbyteSchemaType> =
jsonSchemaProperties.properties().associate { (id: String, schema: JsonNode) -> jsonSchemaProperties.properties().associate { (id: String, schema: JsonNode) ->
id to airbyteTypeFromJsonSchema(schema) id to airbyteTypeFromJsonSchema(schema)
} }
@@ -150,15 +150,15 @@ class StateManagerFactory(
handler.accept(FieldNotFound(streamID, id)) handler.accept(FieldNotFound(streamID, id))
return null return null
} }
val expectedAirbyteType: AirbyteType = expectedSchema[id] ?: return null val expectedAirbyteSchemaType: AirbyteSchemaType = expectedSchema[id] ?: return null
val actualAirbyteType: AirbyteType = actualColumn.type.airbyteType val actualAirbyteSchemaType: AirbyteSchemaType = actualColumn.type.airbyteSchemaType
if (expectedAirbyteType != actualAirbyteType) { if (expectedAirbyteSchemaType != actualAirbyteSchemaType) {
handler.accept( handler.accept(
FieldTypeMismatch( FieldTypeMismatch(
streamID, streamID,
id, id,
expectedAirbyteType, expectedAirbyteSchemaType,
actualAirbyteType, actualAirbyteSchemaType,
), ),
) )
return null return null
@@ -229,44 +229,44 @@ class StateManagerFactory(
} }
/** /**
* Recursively re-generates the original [AirbyteType] from a catalog stream field's JSON * Recursively re-generates the original [AirbyteSchemaType] from a catalog stream field's JSON
* schema. * schema.
*/ */
private fun airbyteTypeFromJsonSchema(jsonSchema: JsonNode): AirbyteType { private fun airbyteTypeFromJsonSchema(jsonSchema: JsonNode): AirbyteSchemaType {
fun value(key: String): String = jsonSchema[key]?.asText() ?: "" fun value(key: String): String = jsonSchema[key]?.asText() ?: ""
return when (value("type")) { return when (value("type")) {
"array" -> ArrayAirbyteType(airbyteTypeFromJsonSchema(jsonSchema["items"])) "array" -> ArrayAirbyteSchemaType(airbyteTypeFromJsonSchema(jsonSchema["items"]))
"null" -> LeafAirbyteType.NULL "null" -> LeafAirbyteSchemaType.NULL
"boolean" -> LeafAirbyteType.BOOLEAN "boolean" -> LeafAirbyteSchemaType.BOOLEAN
"number" -> "number" ->
when (value("airbyte_type")) { when (value("airbyte_type")) {
"integer", "integer",
"big_integer", -> LeafAirbyteType.INTEGER "big_integer", -> LeafAirbyteSchemaType.INTEGER
else -> LeafAirbyteType.NUMBER else -> LeafAirbyteSchemaType.NUMBER
} }
"string" -> "string" ->
when (value("format")) { when (value("format")) {
"date" -> LeafAirbyteType.DATE "date" -> LeafAirbyteSchemaType.DATE
"date-time" -> "date-time" ->
if (value("airbyte_type") == "timestamp_with_timezone") { if (value("airbyte_type") == "timestamp_with_timezone") {
LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE
} else { } else {
LeafAirbyteType.TIMESTAMP_WITHOUT_TIMEZONE LeafAirbyteSchemaType.TIMESTAMP_WITHOUT_TIMEZONE
} }
"time" -> "time" ->
if (value("airbyte_type") == "time_with_timezone") { if (value("airbyte_type") == "time_with_timezone") {
LeafAirbyteType.TIME_WITH_TIMEZONE LeafAirbyteSchemaType.TIME_WITH_TIMEZONE
} else { } else {
LeafAirbyteType.TIME_WITHOUT_TIMEZONE LeafAirbyteSchemaType.TIME_WITHOUT_TIMEZONE
} }
else -> else ->
if (value("contentEncoding") == "base64") { if (value("contentEncoding") == "base64") {
LeafAirbyteType.BINARY LeafAirbyteSchemaType.BINARY
} else { } else {
LeafAirbyteType.STRING LeafAirbyteSchemaType.STRING
} }
} }
else -> LeafAirbyteType.JSONB else -> LeafAirbyteSchemaType.JSONB
} }
} }
} }

View File

@@ -4,12 +4,12 @@
package io.airbyte.cdk.discover package io.airbyte.cdk.discover
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.IntCodec import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.JsonEncoder import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.LeafAirbyteType import io.airbyte.cdk.data.LeafAirbyteSchemaType
data object IntFieldType : FieldType { data object IntFieldType : FieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.INTEGER override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.INTEGER
override val jsonEncoder: JsonEncoder<*> = IntCodec override val jsonEncoder: JsonEncoder<*> = IntCodec
} }

View File

@@ -4,12 +4,13 @@
package io.airbyte.cdk.discover package io.airbyte.cdk.discover
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.JsonEncoder import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.LeafAirbyteType import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.OffsetDateTimeCodec import io.airbyte.cdk.data.OffsetDateTimeCodec
data object OffsetDateTimeFieldType : FieldType { data object OffsetDateTimeFieldType : FieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE override val airbyteSchemaType: AirbyteSchemaType =
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE
override val jsonEncoder: JsonEncoder<*> = OffsetDateTimeCodec override val jsonEncoder: JsonEncoder<*> = OffsetDateTimeCodec
} }

View File

@@ -4,12 +4,12 @@
package io.airbyte.cdk.discover package io.airbyte.cdk.discover
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.JsonEncoder import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.LeafAirbyteType import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.TextCodec import io.airbyte.cdk.data.TextCodec
data object StringFieldType : FieldType { data object StringFieldType : FieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.STRING override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.STRING
override val jsonEncoder: JsonEncoder<*> = TextCodec override val jsonEncoder: JsonEncoder<*> = TextCodec
} }

View File

@@ -21,7 +21,7 @@ class TestAirbyteStreamFactory : AirbyteStreamFactory {
supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) supportedSyncModes = listOf(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
(jsonSchema["properties"] as ObjectNode).apply { (jsonSchema["properties"] as ObjectNode).apply {
for (metaField in CommonMetaField.entries) { for (metaField in CommonMetaField.entries) {
set<ObjectNode>(metaField.id, metaField.type.airbyteType.asJsonSchema()) set<ObjectNode>(metaField.id, metaField.type.airbyteSchemaType.asJsonSchema())
} }
} }
defaultCursorField = listOf(CommonMetaField.CDC_LSN.id) defaultCursorField = listOf(CommonMetaField.CDC_LSN.id)

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.mock_integration_test package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.NoopNameMapper import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.test.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
class MockBasicFunctionalityIntegrationTest : class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest( BasicFunctionalityIntegrationTest(

View File

@@ -2,10 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.mock_integration_test package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.test.util.DestinationDataDumper import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.test.util.OutputRecord import io.airbyte.cdk.load.test.util.OutputRecord
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
object MockDestinationBackend { object MockDestinationBackend {

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.mock_integration_test package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.check.DestinationChecker import io.airbyte.cdk.load.check.DestinationChecker
import javax.inject.Singleton import javax.inject.Singleton
@Singleton @Singleton

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.mock_integration_test package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationConfigurationFactory import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,16 +2,16 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.mock_integration_test package io.airbyte.cdk.load.mock_integration_test
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.data.ObjectValue import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.test.util.OutputRecord import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.write.DestinationWriter import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import javax.inject.Singleton import javax.inject.Singleton

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.check package io.airbyte.cdk.load.check
import io.airbyte.cdk.Operation import io.airbyte.cdk.Operation
import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationConfigurationFactory import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.airbyte.cdk.output.ExceptionHandler import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteConnectionStatus

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.check package io.airbyte.cdk.load.check
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
/** /**
* A check operation that is run before the destination is used. * A check operation that is run before the destination is used.

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory

View File

@@ -2,8 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.cdk.command.Configuration
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton import jakarta.inject.Singleton
import java.nio.file.Path import java.nio.file.Path

View File

@@ -2,9 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.cdk.ConfigErrorException import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.command.ConfigurationSpecification
interface DestinationConfigurationFactory< interface DestinationConfigurationFactory<
I : ConfigurationSpecification, O : DestinationConfiguration> { I : ConfigurationSpecification, O : DestinationConfiguration> {

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.cdk.data.AirbyteType import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.data.AirbyteTypeToJsonSchema import io.airbyte.cdk.load.data.AirbyteTypeToJsonSchema
import io.airbyte.cdk.data.JsonSchemaToAirbyteType import io.airbyte.cdk.load.data.JsonSchemaToAirbyteType
import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode import io.airbyte.protocol.models.v0.DestinationSyncMode

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
sealed interface AirbyteType sealed interface AirbyteType

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import java.math.BigDecimal import java.math.BigDecimal
import java.time.LocalDate import java.time.LocalDate

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import java.math.BigDecimal import java.math.BigDecimal

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.file package io.airbyte.cdk.load.file
import io.micronaut.context.annotation.DefaultImplementation import io.micronaut.context.annotation.DefaultImplementation
import java.io.Closeable import java.io.Closeable

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.file package io.airbyte.cdk.load.file
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.file package io.airbyte.cdk.load.file
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import com.google.common.collect.Range import com.google.common.collect.Range
import com.google.common.collect.RangeSet import com.google.common.collect.RangeSet
import com.google.common.collect.TreeRangeSet import com.google.common.collect.TreeRangeSet
import io.airbyte.cdk.file.LocalFile import io.airbyte.cdk.load.file.LocalFile
/** /**
* Represents an accumulated batch of records in some stage of processing. * Represents an accumulated batch of records in some stage of processing.

View File

@@ -2,17 +2,17 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.data.AirbyteValue import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.data.AirbyteValueToJson import io.airbyte.cdk.load.data.AirbyteValueToJson
import io.airbyte.cdk.data.JsonToAirbyteValue import io.airbyte.cdk.load.data.JsonToAirbyteValue
import io.airbyte.cdk.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.message.CheckpointMessage.Checkpoint import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint
import io.airbyte.cdk.message.CheckpointMessage.Stats import io.airbyte.cdk.load.message.CheckpointMessage.Stats
import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteGlobalState import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import io.airbyte.cdk.util.Jsons import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.MemoryManager import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.Reserved
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import io.airbyte.cdk.util.CloseableCoroutine import io.airbyte.cdk.load.util.CloseableCoroutine
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow

View File

@@ -2,8 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.spec package io.airbyte.cdk.load.spec
import io.airbyte.cdk.spec.IdentitySpecificationExtender
import io.airbyte.cdk.spec.SpecificationExtender
import io.airbyte.protocol.models.v0.ConnectorSpecification import io.airbyte.protocol.models.v0.ConnectorSpecification
import io.airbyte.protocol.models.v0.DestinationSyncMode import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.micronaut.context.annotation.Replaces import io.micronaut.context.annotation.Replaces

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.file.TimeProvider import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.message.CheckpointMessage import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.util.use import io.airbyte.cdk.load.util.use
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.QueueReader import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.task.internal.ForceFlushEvent import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import io.airbyte.cdk.util.CloseableCoroutine import io.airbyte.cdk.load.util.CloseableCoroutine
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import com.google.common.collect.RangeSet import com.google.common.collect.RangeSet
import com.google.common.collect.TreeRangeSet import com.google.common.collect.TreeRangeSet
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,16 +2,16 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.StreamSucceeded import io.airbyte.cdk.load.state.StreamSucceeded
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.state.SyncSuccess import io.airbyte.cdk.load.state.SyncSuccess
import io.airbyte.cdk.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory
import io.airbyte.cdk.task.implementor.FailSyncTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,26 +2,26 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory
import io.airbyte.cdk.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory
import io.airbyte.cdk.task.implementor.ProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory
import io.airbyte.cdk.task.implementor.ProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory
import io.airbyte.cdk.task.implementor.SetupTaskFactory import io.airbyte.cdk.load.task.implementor.SetupTaskFactory
import io.airbyte.cdk.task.implementor.TeardownTaskFactory import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory
import io.airbyte.cdk.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory
import io.airbyte.cdk.task.internal.InputConsumerTask import io.airbyte.cdk.load.task.internal.InputConsumerTask
import io.airbyte.cdk.task.internal.SpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
import io.airbyte.cdk.task.internal.TimedForcedCheckpointFlushTask import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask
import io.airbyte.cdk.task.internal.UpdateCheckpointsTask import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,9 +2,9 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import io.airbyte.cdk.util.CloseableCoroutine import io.airbyte.cdk.load.util.CloseableCoroutine
interface Task { interface Task {
suspend fun execute() suspend fun execute()

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.task.StreamTask import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.StreamIncompleteResult import io.airbyte.cdk.load.state.StreamIncompleteResult
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskExceptionHandler import io.airbyte.cdk.load.task.DestinationTaskExceptionHandler
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskExceptionHandler import io.airbyte.cdk.load.task.DestinationTaskExceptionHandler
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.util.setOnce import io.airbyte.cdk.load.util.setOnce
import io.airbyte.cdk.write.DestinationWriter import io.airbyte.cdk.load.write.DestinationWriter
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,15 +2,15 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.task.StreamTask import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.write.DestinationWriter import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,15 +2,15 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.task.StreamTask import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,22 +2,22 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.Deserializer import io.airbyte.cdk.load.message.Deserializer
import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.DestinationStreamAffinedMessage import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
import io.airbyte.cdk.message.DestinationStreamComplete import io.airbyte.cdk.load.message.DestinationStreamComplete
import io.airbyte.cdk.message.DestinationStreamIncomplete import io.airbyte.cdk.load.message.DestinationStreamIncomplete
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.task.StreamTask import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.ImplementorTask import io.airbyte.cdk.load.task.ImplementorTask
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.airbyte.cdk.write.DestinationWriter import io.airbyte.cdk.load.write.DestinationWriter
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import io.airbyte.cdk.state.CheckpointManager import io.airbyte.cdk.load.state.CheckpointManager
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.airbyte.cdk.write.DestinationWriter import io.airbyte.cdk.load.write.DestinationWriter
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import io.airbyte.cdk.state.CheckpointManager import io.airbyte.cdk.load.state.CheckpointManager
import io.airbyte.cdk.task.InternalTask import io.airbyte.cdk.load.task.InternalTask
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,36 +2,36 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.CheckpointMessage import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.message.Deserializer import io.airbyte.cdk.load.message.Deserializer
import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.DestinationRecordWrapped import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.message.DestinationStreamAffinedMessage import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
import io.airbyte.cdk.message.DestinationStreamComplete import io.airbyte.cdk.load.message.DestinationStreamComplete
import io.airbyte.cdk.message.DestinationStreamIncomplete import io.airbyte.cdk.load.message.DestinationStreamIncomplete
import io.airbyte.cdk.message.GlobalCheckpoint import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.message.MessageQueueSupplier import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.message.QueueWriter import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.message.StreamCheckpoint import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.message.StreamCheckpointWrapped import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.message.StreamCompleteWrapped import io.airbyte.cdk.load.message.StreamCompleteWrapped
import io.airbyte.cdk.message.StreamRecordWrapped import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.message.Undefined import io.airbyte.cdk.load.message.Undefined
import io.airbyte.cdk.state.MemoryManager import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.InternalTask import io.airbyte.cdk.load.task.InternalTask
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.airbyte.cdk.util.use import io.airbyte.cdk.load.util.use
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,27 +2,27 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.file.TempFileProvider import io.airbyte.cdk.load.file.TempFileProvider
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.DestinationRecordWrapped import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.message.MessageQueueSupplier import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.message.QueueReader import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.airbyte.cdk.message.StreamCompleteWrapped import io.airbyte.cdk.load.message.StreamCompleteWrapped
import io.airbyte.cdk.message.StreamRecordWrapped import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.state.FlushStrategy import io.airbyte.cdk.load.state.FlushStrategy
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.task.DestinationTaskLauncher import io.airbyte.cdk.load.task.DestinationTaskLauncher
import io.airbyte.cdk.task.InternalTask import io.airbyte.cdk.load.task.InternalTask
import io.airbyte.cdk.task.StreamTask import io.airbyte.cdk.load.task.StreamTask
import io.airbyte.cdk.util.takeUntilInclusive import io.airbyte.cdk.load.util.takeUntilInclusive
import io.airbyte.cdk.util.use import io.airbyte.cdk.load.util.use
import io.airbyte.cdk.util.withNextAdjacentValue import io.airbyte.cdk.load.util.withNextAdjacentValue
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton import jakarta.inject.Singleton
import kotlinx.coroutines.flow.last import kotlinx.coroutines.flow.last

View File

@@ -2,16 +2,16 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.file.TimeProvider import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.message.ChannelMessageQueue import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.message.QueueWriter import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.state.CheckpointManager import io.airbyte.cdk.load.state.CheckpointManager
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.airbyte.cdk.util.use import io.airbyte.cdk.load.util.use
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,17 +2,17 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.CheckpointMessage import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.message.StreamCheckpointWrapped import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.state.CheckpointManager import io.airbyte.cdk.load.state.CheckpointManager
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.task.SyncTask import io.airbyte.cdk.load.task.SyncTask
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.util package io.airbyte.cdk.load.util
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.util package io.airbyte.cdk.load.util
import com.google.common.collect.Range import com.google.common.collect.Range

View File

@@ -2,10 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.write package io.airbyte.cdk.load.write
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.state.SyncFailure import io.airbyte.cdk.load.state.SyncFailure
import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.write package io.airbyte.cdk.load.write
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.state.StreamIncompleteResult import io.airbyte.cdk.load.state.StreamIncompleteResult
/** /**
* Implementor interface. The framework calls open and close once per stream at the beginning and * Implementor interface. The framework calls open and close once per stream at the beginning and

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.write package io.airbyte.cdk.load.write
import io.airbyte.cdk.Operation import io.airbyte.cdk.Operation
import io.airbyte.cdk.state.SyncFailure import io.airbyte.cdk.load.state.SyncFailure
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.state.SyncSuccess import io.airbyte.cdk.load.state.SyncSuccess
import io.airbyte.cdk.task.TaskLauncher import io.airbyte.cdk.load.task.TaskLauncher
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.AirbyteStream

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.airbyte.cdk.data.FieldType import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.data.IntegerType import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.data.ObjectType import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.data.StringType import io.airbyte.cdk.load.data.StringType
import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.command package io.airbyte.cdk.load.command
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory
@@ -10,7 +10,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
class AirbyteTypeToJsonSchemaTest { class AirbyteSchemaTypeToJsonSchemaTest {
@Test @Test
fun testRoundTrip() { fun testRoundTrip() {
val schema = JsonNodeFactory.instance.objectNode() val schema = JsonNodeFactory.instance.objectNode()

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.databind.node.ObjectNode
@@ -10,7 +10,7 @@ import io.airbyte.cdk.util.Jsons
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
class JsonSchemaToAirbyteTypeTest { class JsonSchemaToAirbyteSchemaTypeTest {
private fun ofType(type: String): ObjectNode { private fun ofType(type: String): ObjectNode {
return JsonNodeFactory.instance.objectNode().put("type", type) return JsonNodeFactory.instance.objectNode().put("type", type)
} }

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.data package io.airbyte.cdk.load.data
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory
import java.math.BigDecimal import java.math.BigDecimal

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.file package io.airbyte.cdk.load.file
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.file package io.airbyte.cdk.load.file
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.message package io.airbyte.cdk.load.message
import io.airbyte.cdk.command.Append import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.data.ObjectTypeWithEmptySchema import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteGlobalState import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage
@@ -19,7 +19,7 @@ import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamState import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments
@@ -47,7 +47,7 @@ class DestinationMessageTest {
fun testRoundTrip(message: AirbyteMessage) { fun testRoundTrip(message: AirbyteMessage) {
val roundTripped = val roundTripped =
factory.fromAirbyteMessage(message, Jsons.serialize(message)).asProtocolMessage() factory.fromAirbyteMessage(message, Jsons.serialize(message)).asProtocolMessage()
assertEquals(message, roundTripped) Assertions.assertEquals(message, roundTripped)
} }
// Checkpoint messages aren't round-trippable. // Checkpoint messages aren't round-trippable.
@@ -73,7 +73,7 @@ class DestinationMessageTest {
factory.fromAirbyteMessage(inputMessage, Jsons.serialize(inputMessage)) factory.fromAirbyteMessage(inputMessage, Jsons.serialize(inputMessage))
as StreamCheckpoint as StreamCheckpoint
assertEquals( Assertions.assertEquals(
inputMessage.also { inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
}, },
@@ -110,7 +110,7 @@ class DestinationMessageTest {
Jsons.serialize(inputMessage), Jsons.serialize(inputMessage),
) as GlobalCheckpoint ) as GlobalCheckpoint
assertEquals( Assertions.assertEquals(
inputMessage.also { inputMessage.also {
it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0)
}, },

View File

@@ -2,18 +2,18 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import com.google.common.collect.TreeRangeSet import com.google.common.collect.TreeRangeSet
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream1
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream2 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
import io.airbyte.cdk.file.TimeProvider import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SimpleBatch import io.airbyte.cdk.load.message.SimpleBatch
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject import jakarta.inject.Inject

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.MockDestinationCatalogFactory import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream2 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
import io.airbyte.cdk.message.ChannelMessageQueue import io.airbyte.cdk.load.message.ChannelMessageQueue
import io.airbyte.cdk.task.internal.ForceFlushEvent import io.airbyte.cdk.load.task.internal.ForceFlushEvent
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import io.micronaut.context.annotation.Replaces import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.file.TimeProvider import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.message.CheckpointMessage import io.airbyte.cdk.load.message.CheckpointMessage
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import jakarta.inject.Inject import jakarta.inject.Inject
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,15 +2,15 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream1
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream2 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SimpleBatch import io.airbyte.cdk.load.message.SimpleBatch
import java.util.stream.Stream import java.util.stream.Stream
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream1
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream2 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
import io.airbyte.cdk.test.util.CoroutineTestUtils import io.airbyte.cdk.load.test.util.CoroutineTestUtils
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject import jakarta.inject.Inject
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.state package io.airbyte.cdk.load.state
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SimpleBatch import io.airbyte.cdk.load.message.SimpleBatch
/** /**
* Because [SyncManager] and [StreamManager] have thin interfaces with no side effects, mocking them * Because [SyncManager] and [StreamManager] have thin interfaces with no side effects, mocking them

View File

@@ -2,17 +2,17 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.implementor.FailStreamTask import io.airbyte.cdk.load.task.implementor.FailStreamTask
import io.airbyte.cdk.task.implementor.FailStreamTaskFactory import io.airbyte.cdk.load.task.implementor.FailStreamTaskFactory
import io.airbyte.cdk.task.implementor.FailSyncTask import io.airbyte.cdk.load.task.implementor.FailSyncTask
import io.airbyte.cdk.task.implementor.FailSyncTaskFactory import io.airbyte.cdk.load.task.implementor.FailSyncTaskFactory
import io.airbyte.cdk.test.util.CoroutineTestUtils import io.airbyte.cdk.load.test.util.CoroutineTestUtils
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
@@ -87,7 +87,7 @@ class DestinationTaskExceptionHandlerTest {
runTest { runTest {
val mockTask = val mockTask =
object : StreamTask { object : StreamTask {
override val stream: DestinationStream = stream1 override val stream: DestinationStream = MockDestinationCatalogFactory.stream1
override suspend fun execute() { override suspend fun execute() {
throw RuntimeException("StreamTask failure") throw RuntimeException("StreamTask failure")
@@ -97,7 +97,7 @@ class DestinationTaskExceptionHandlerTest {
val wrappedTask = exceptionHandler.withExceptionHandling(mockTask) val wrappedTask = exceptionHandler.withExceptionHandling(mockTask)
wrappedTask.execute() wrappedTask.execute()
val (stream, exception) = mockFailStreamTaskFactory.didRunFor.receive() val (stream, exception) = mockFailStreamTaskFactory.didRunFor.receive()
Assertions.assertEquals(stream1, stream) Assertions.assertEquals(MockDestinationCatalogFactory.stream1, stream)
Assertions.assertTrue(exception is RuntimeException) Assertions.assertTrue(exception is RuntimeException)
} }
@@ -182,7 +182,7 @@ class DestinationTaskExceptionHandlerTest {
val innerTaskRan = Channel<Boolean>(Channel.UNLIMITED) val innerTaskRan = Channel<Boolean>(Channel.UNLIMITED)
val mockTask = val mockTask =
object : StreamTask { object : StreamTask {
override val stream: DestinationStream = stream1 override val stream: DestinationStream = MockDestinationCatalogFactory.stream1
override suspend fun execute() { override suspend fun execute() {
innerTaskRan.send(true) innerTaskRan.send(true)
@@ -190,7 +190,7 @@ class DestinationTaskExceptionHandlerTest {
} }
val wrappedTask = exceptionHandler.withExceptionHandling(mockTask) val wrappedTask = exceptionHandler.withExceptionHandling(mockTask)
val manager = syncManager.getStreamManager(stream1.descriptor) val manager = syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
manager.markEndOfStream() manager.markEndOfStream()
manager.markFailed(RuntimeException("dummy failure")) manager.markFailed(RuntimeException("dummy failure"))
launch { wrappedTask.execute() } launch { wrappedTask.execute() }
@@ -207,7 +207,7 @@ class DestinationTaskExceptionHandlerTest {
) = runTest { ) = runTest {
val mockTask = val mockTask =
object : StreamTask { object : StreamTask {
override val stream: DestinationStream = stream1 override val stream: DestinationStream = MockDestinationCatalogFactory.stream1
override suspend fun execute() { override suspend fun execute() {
// do nothing // do nothing
@@ -216,7 +216,7 @@ class DestinationTaskExceptionHandlerTest {
val wrappedTask = exceptionHandler.withExceptionHandling(mockTask) val wrappedTask = exceptionHandler.withExceptionHandling(mockTask)
val manager = syncManager.getStreamManager(stream1.descriptor) val manager = syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
manager.markEndOfStream() manager.markEndOfStream()
manager.markSucceeded() manager.markSucceeded()

View File

@@ -2,44 +2,44 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import com.google.common.collect.Range import com.google.common.collect.Range
import com.google.common.collect.TreeRangeSet import com.google.common.collect.TreeRangeSet
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.file.DefaultLocalFile import io.airbyte.cdk.load.file.DefaultLocalFile
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.implementor.CloseStreamTask import io.airbyte.cdk.load.task.implementor.CloseStreamTask
import io.airbyte.cdk.task.implementor.CloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.CloseStreamTaskFactory
import io.airbyte.cdk.task.implementor.DefaultCloseStreamTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultCloseStreamTaskFactory
import io.airbyte.cdk.task.implementor.DefaultOpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultOpenStreamTaskFactory
import io.airbyte.cdk.task.implementor.DefaultProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultProcessBatchTaskFactory
import io.airbyte.cdk.task.implementor.DefaultProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultProcessRecordsTaskFactory
import io.airbyte.cdk.task.implementor.DefaultSetupTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultSetupTaskFactory
import io.airbyte.cdk.task.implementor.DefaultTeardownTaskFactory import io.airbyte.cdk.load.task.implementor.DefaultTeardownTaskFactory
import io.airbyte.cdk.task.implementor.OpenStreamTask import io.airbyte.cdk.load.task.implementor.OpenStreamTask
import io.airbyte.cdk.task.implementor.OpenStreamTaskFactory import io.airbyte.cdk.load.task.implementor.OpenStreamTaskFactory
import io.airbyte.cdk.task.implementor.ProcessBatchTask import io.airbyte.cdk.load.task.implementor.ProcessBatchTask
import io.airbyte.cdk.task.implementor.ProcessBatchTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessBatchTaskFactory
import io.airbyte.cdk.task.implementor.ProcessRecordsTask import io.airbyte.cdk.load.task.implementor.ProcessRecordsTask
import io.airbyte.cdk.task.implementor.ProcessRecordsTaskFactory import io.airbyte.cdk.load.task.implementor.ProcessRecordsTaskFactory
import io.airbyte.cdk.task.implementor.SetupTask import io.airbyte.cdk.load.task.implementor.SetupTask
import io.airbyte.cdk.task.implementor.SetupTaskFactory import io.airbyte.cdk.load.task.implementor.SetupTaskFactory
import io.airbyte.cdk.task.implementor.TeardownTask import io.airbyte.cdk.load.task.implementor.TeardownTask
import io.airbyte.cdk.task.implementor.TeardownTaskFactory import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory
import io.airbyte.cdk.task.internal.DefaultSpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.DefaultSpillToDiskTaskFactory
import io.airbyte.cdk.task.internal.FlushCheckpointsTask import io.airbyte.cdk.load.task.internal.FlushCheckpointsTask
import io.airbyte.cdk.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory
import io.airbyte.cdk.task.internal.InputConsumerTask import io.airbyte.cdk.load.task.internal.InputConsumerTask
import io.airbyte.cdk.task.internal.SpillToDiskTask import io.airbyte.cdk.load.task.internal.SpillToDiskTask
import io.airbyte.cdk.task.internal.SpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
import io.airbyte.cdk.task.internal.TimedForcedCheckpointFlushTask import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask
import io.airbyte.cdk.task.internal.UpdateCheckpointsTask import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Replaces import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
@@ -337,7 +337,7 @@ class DestinationTaskLauncherTest {
@Test @Test
fun testHandleSpilledFileCompleteNotEndOfStream() = runTest { fun testHandleSpilledFileCompleteNotEndOfStream() = runTest {
taskLauncher.handleNewSpilledFile( taskLauncher.handleNewSpilledFile(
stream1, MockDestinationCatalogFactory.stream1,
BatchEnvelope( BatchEnvelope(
SpilledRawMessagesLocalFile(DefaultLocalFile(Path("not/a/real/file")), 100L) SpilledRawMessagesLocalFile(DefaultLocalFile(Path("not/a/real/file")), 100L)
), ),
@@ -345,7 +345,8 @@ class DestinationTaskLauncherTest {
) )
processRecordsTaskFactory.hasRun.receive() processRecordsTaskFactory.hasRun.receive()
mockSpillToDiskTaskFactory.streamHasRun[stream1.descriptor]?.receive() mockSpillToDiskTaskFactory.streamHasRun[MockDestinationCatalogFactory.stream1.descriptor]
?.receive()
?: Assertions.fail("SpillToDiskTask not run") ?: Assertions.fail("SpillToDiskTask not run")
} }
@@ -353,7 +354,7 @@ class DestinationTaskLauncherTest {
fun testHandleSpilledFileCompleteEndOfStream() = runTest { fun testHandleSpilledFileCompleteEndOfStream() = runTest {
launch { launch {
taskLauncher.handleNewSpilledFile( taskLauncher.handleNewSpilledFile(
stream1, MockDestinationCatalogFactory.stream1,
BatchEnvelope( BatchEnvelope(
SpilledRawMessagesLocalFile(DefaultLocalFile(Path("not/a/real/file")), 100L) SpilledRawMessagesLocalFile(DefaultLocalFile(Path("not/a/real/file")), 100L)
), ),
@@ -364,22 +365,25 @@ class DestinationTaskLauncherTest {
processRecordsTaskFactory.hasRun.receive() processRecordsTaskFactory.hasRun.receive()
delay(500) delay(500)
Assertions.assertTrue( Assertions.assertTrue(
mockSpillToDiskTaskFactory.streamHasRun[stream1.descriptor]?.tryReceive()?.isFailure != mockSpillToDiskTaskFactory.streamHasRun[
false MockDestinationCatalogFactory.stream1.descriptor]
?.tryReceive()
?.isFailure != false
) )
} }
@Test @Test
fun testHandleNewBatch() = runTest { fun testHandleNewBatch() = runTest {
val range = TreeRangeSet.create(listOf(Range.closed(0L, 100L))) val range = TreeRangeSet.create(listOf(Range.closed(0L, 100L)))
val streamManager = syncManager.getStreamManager(stream1.descriptor) val streamManager =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
repeat(100) { streamManager.countRecordIn() } repeat(100) { streamManager.countRecordIn() }
streamManager.markEndOfStream() streamManager.markEndOfStream()
// Verify incomplete batch triggers process batch // Verify incomplete batch triggers process batch
val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range) val incompleteBatch = BatchEnvelope(MockBatch(Batch.State.LOCAL), range)
taskLauncher.handleNewBatch(stream1, incompleteBatch) taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1, incompleteBatch)
Assertions.assertFalse(streamManager.areRecordsPersistedUntil(100L)) Assertions.assertFalse(streamManager.areRecordsPersistedUntil(100L))
val batchReceived = processBatchTaskFactory.hasRun.receive() val batchReceived = processBatchTaskFactory.hasRun.receive()
@@ -388,21 +392,21 @@ class DestinationTaskLauncherTest {
Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.tryReceive().isFailure) Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.tryReceive().isFailure)
val persistedBatch = BatchEnvelope(MockBatch(Batch.State.PERSISTED), range) val persistedBatch = BatchEnvelope(MockBatch(Batch.State.PERSISTED), range)
taskLauncher.handleNewBatch(stream1, persistedBatch) taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1, persistedBatch)
Assertions.assertTrue(streamManager.areRecordsPersistedUntil(100L)) Assertions.assertTrue(streamManager.areRecordsPersistedUntil(100L))
Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.receive()) Assertions.assertTrue(flushCheckpointsTaskFactory.hasRun.receive())
// Verify complete batch w/o batch processing complete does nothing // Verify complete batch w/o batch processing complete does nothing
val halfRange = TreeRangeSet.create(listOf(Range.closed(0L, 50L))) val halfRange = TreeRangeSet.create(listOf(Range.closed(0L, 50L)))
val completeBatchHalf = BatchEnvelope(MockBatch(Batch.State.COMPLETE), halfRange) val completeBatchHalf = BatchEnvelope(MockBatch(Batch.State.COMPLETE), halfRange)
taskLauncher.handleNewBatch(stream1, completeBatchHalf) taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1, completeBatchHalf)
delay(1000) delay(1000)
Assertions.assertTrue(closeStreamTaskFactory.hasRun.tryReceive().isFailure) Assertions.assertTrue(closeStreamTaskFactory.hasRun.tryReceive().isFailure)
// Verify complete batch w/ batch processing complete triggers close stream // Verify complete batch w/ batch processing complete triggers close stream
val secondHalf = TreeRangeSet.create(listOf(Range.closed(51L, 100L))) val secondHalf = TreeRangeSet.create(listOf(Range.closed(51L, 100L)))
val completingBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), secondHalf) val completingBatch = BatchEnvelope(MockBatch(Batch.State.COMPLETE), secondHalf)
taskLauncher.handleNewBatch(stream1, completingBatch) taskLauncher.handleNewBatch(MockDestinationCatalogFactory.stream1, completingBatch)
closeStreamTaskFactory.hasRun.receive() closeStreamTaskFactory.hasRun.receive()
Assertions.assertTrue(true) Assertions.assertTrue(true)
} }
@@ -410,7 +414,7 @@ class DestinationTaskLauncherTest {
@Test @Test
fun testHandleStreamClosed() = runTest { fun testHandleStreamClosed() = runTest {
// This should run teardown unconditionally. // This should run teardown unconditionally.
launch { taskLauncher.handleStreamClosed(stream1) } launch { taskLauncher.handleStreamClosed(MockDestinationCatalogFactory.stream1) }
teardownTaskFactory.hasRun.receive() teardownTaskFactory.hasRun.receive()
} }

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task package io.airbyte.cdk.load.task
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton import jakarta.inject.Singleton

View File

@@ -2,22 +2,22 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.implementor package io.airbyte.cdk.load.task.implementor
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.data.IntegerValue import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.file.MockTempFileProvider import io.airbyte.cdk.load.file.MockTempFileProvider
import io.airbyte.cdk.message.Batch import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.message.Deserializer import io.airbyte.cdk.load.message.Deserializer
import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.message.SpilledRawMessagesLocalFile
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.task.MockTaskLauncher import io.airbyte.cdk.load.task.MockTaskLauncher
import io.airbyte.cdk.write.StreamLoader import io.airbyte.cdk.load.write.StreamLoader
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
@@ -49,7 +49,7 @@ class ProcessRecordsTaskTest {
) : Batch ) : Batch
class MockStreamLoader : StreamLoader { class MockStreamLoader : StreamLoader {
override val stream: DestinationStream = stream1 override val stream: DestinationStream = MockDestinationCatalogFactory.stream1
data class SumAndCount(val sum: Long = 0, val count: Long = 0) data class SumAndCount(val sum: Long = 0, val count: Long = 0)
@@ -78,7 +78,7 @@ class ProcessRecordsTaskTest {
class MockDeserializer : Deserializer<DestinationMessage> { class MockDeserializer : Deserializer<DestinationMessage> {
override fun deserialize(serialized: String): DestinationMessage { override fun deserialize(serialized: String): DestinationMessage {
return DestinationRecord( return DestinationRecord(
stream = stream1.descriptor, stream = MockDestinationCatalogFactory.stream1.descriptor,
data = IntegerValue(serialized.toLong()), data = IntegerValue(serialized.toLong()),
emittedAtMs = 0L, emittedAtMs = 0L,
meta = null, meta = null,
@@ -104,7 +104,7 @@ class ProcessRecordsTaskTest {
val task = val task =
processRecordsTaskFactory.make( processRecordsTaskFactory.make(
taskLauncher = launcher, taskLauncher = launcher,
stream = stream1, stream = MockDestinationCatalogFactory.stream1,
fileEnvelope = BatchEnvelope(file, Range.closed(0, 1024)) fileEnvelope = BatchEnvelope(file, Range.closed(0, 1024))
) )
mockFile.linesToRead = (0 until recordCount).map { "$it" }.toMutableList() mockFile.linesToRead = (0 until recordCount).map { "$it" }.toMutableList()

View File

@@ -2,34 +2,33 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.JsonNodeFactory
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream2 import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.data.NullValue import io.airbyte.cdk.load.message.CheckpointMessage
import io.airbyte.cdk.message.CheckpointMessage import io.airbyte.cdk.load.message.CheckpointMessageWrapped
import io.airbyte.cdk.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.message.DestinationRecordWrapped import io.airbyte.cdk.load.message.DestinationStreamComplete
import io.airbyte.cdk.message.DestinationStreamComplete import io.airbyte.cdk.load.message.DestinationStreamIncomplete
import io.airbyte.cdk.message.DestinationStreamIncomplete import io.airbyte.cdk.load.message.GlobalCheckpoint
import io.airbyte.cdk.message.GlobalCheckpoint import io.airbyte.cdk.load.message.GlobalCheckpointWrapped
import io.airbyte.cdk.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.MessageQueue
import io.airbyte.cdk.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamCheckpoint
import io.airbyte.cdk.message.StreamCheckpoint import io.airbyte.cdk.load.message.StreamCheckpointWrapped
import io.airbyte.cdk.message.StreamCheckpointWrapped import io.airbyte.cdk.load.message.StreamCompleteWrapped
import io.airbyte.cdk.message.StreamCompleteWrapped import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.message.StreamRecordWrapped import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.state.MemoryManager import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.SyncManager
import io.airbyte.cdk.state.SyncManager import io.airbyte.cdk.load.test.util.CoroutineTestUtils
import io.airbyte.cdk.test.util.CoroutineTestUtils import io.airbyte.cdk.load.util.takeUntilInclusive
import io.airbyte.cdk.util.takeUntilInclusive
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
@@ -128,13 +127,20 @@ class InputConsumerTaskTest {
@Test @Test
fun testSendRecords() = runTest { fun testSendRecords() = runTest {
val queue1 = recordQueueSupplier.get(stream1.descriptor) val queue1 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream1.descriptor)
val queue2 = recordQueueSupplier.get(stream2.descriptor) val queue2 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream2.descriptor)
val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager1 =
val manager2 = syncManager.getStreamManager(stream2.descriptor) syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
val manager2 =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream2.descriptor)
(0 until 10).forEach { mockInputFlow.addMessage(makeRecord(stream1, "test${it}"), it * 2L) } (0 until 10).forEach {
mockInputFlow.addMessage(
makeRecord(MockDestinationCatalogFactory.stream1, "test${it}"),
it * 2L
)
}
launch { task.execute() } launch { task.execute() }
val messages1 = val messages1 =
@@ -148,7 +154,11 @@ class InputConsumerTaskTest {
Assertions.assertEquals(10, messages1.size) Assertions.assertEquals(10, messages1.size)
val expectedRecords = val expectedRecords =
(0 until 10).map { (0 until 10).map {
StreamRecordWrapped(it.toLong(), it * 2L, makeRecord(stream1, "test${it}")) StreamRecordWrapped(
it.toLong(),
it * 2L,
makeRecord(MockDestinationCatalogFactory.stream1, "test${it}")
)
} }
Assertions.assertEquals(expectedRecords, messages1.map { it.value }) Assertions.assertEquals(expectedRecords, messages1.map { it.value })
@@ -165,22 +175,35 @@ class InputConsumerTaskTest {
@Test @Test
fun testSendEndOfStream() = runTest { fun testSendEndOfStream() = runTest {
val queue1 = recordQueueSupplier.get(stream1.descriptor) val queue1 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream1.descriptor)
val queue2 = recordQueueSupplier.get(stream2.descriptor) val queue2 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream2.descriptor)
val manager1 = syncManager.getStreamManager(stream1.descriptor) val manager1 =
val manager2 = syncManager.getStreamManager(stream2.descriptor) syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor)
val manager2 =
syncManager.getStreamManager(MockDestinationCatalogFactory.stream2.descriptor)
(0 until 10).forEach { _ -> mockInputFlow.addMessage(makeRecord(stream1, "whatever"), 0L) } (0 until 10).forEach { _ ->
mockInputFlow.addMessage(
makeRecord(MockDestinationCatalogFactory.stream1, "whatever"),
0L
)
}
mockInputFlow.addMessage(makeRecord(stream2, "test"), 1L) mockInputFlow.addMessage(makeRecord(MockDestinationCatalogFactory.stream2, "test"), 1L)
mockInputFlow.addMessage(makeStreamComplete(stream1), 0L) mockInputFlow.addMessage(makeStreamComplete(MockDestinationCatalogFactory.stream1), 0L)
val job = launch { task.execute() } val job = launch { task.execute() }
mockInputFlow.stop() mockInputFlow.stop()
job.join() job.join()
queue2.close() queue2.close()
Assertions.assertEquals( Assertions.assertEquals(
listOf(StreamRecordWrapped(0, 1L, makeRecord(stream2, "test"))), listOf(
StreamRecordWrapped(
0,
1L,
makeRecord(MockDestinationCatalogFactory.stream2, "test")
)
),
queue2.consume().toList().map { it.value } queue2.consume().toList().map { it.value }
) )
Assertions.assertEquals(1L, manager2.recordCount()) Assertions.assertEquals(1L, manager2.recordCount())
@@ -209,10 +232,10 @@ class InputConsumerTaskTest {
val batches = val batches =
listOf( listOf(
TestEvent(stream1, 10, 10), TestEvent(MockDestinationCatalogFactory.stream1, 10, 10),
TestEvent(stream1, 5, 15), TestEvent(MockDestinationCatalogFactory.stream1, 5, 15),
TestEvent(stream2, 4, 4), TestEvent(MockDestinationCatalogFactory.stream2, 4, 4),
TestEvent(stream1, 3, 18), TestEvent(MockDestinationCatalogFactory.stream1, 3, 18),
) )
launch { task.execute() } launch { task.execute() }
@@ -239,12 +262,12 @@ class InputConsumerTaskTest {
val batches = val batches =
listOf( listOf(
AddRecords(stream1, 10), AddRecords(MockDestinationCatalogFactory.stream1, 10),
SendState(10, 0, 10), SendState(10, 0, 10),
AddRecords(stream2, 5), AddRecords(MockDestinationCatalogFactory.stream2, 5),
AddRecords(stream1, 4), AddRecords(MockDestinationCatalogFactory.stream1, 4),
SendState(14, 5, 9), SendState(14, 5, 9),
AddRecords(stream2, 3), AddRecords(MockDestinationCatalogFactory.stream2, 3),
SendState(14, 8, 3), SendState(14, 8, 3),
SendState(14, 8, 0), SendState(14, 8, 0),
) )
@@ -262,8 +285,14 @@ class InputConsumerTaskTest {
val state = val state =
checkpointQueue.consume().take(1).toList().first().value checkpointQueue.consume().take(1).toList().first().value
as GlobalCheckpointWrapped as GlobalCheckpointWrapped
val stream1State = state.streamIndexes.find { it.first == stream1.descriptor }!! val stream1State =
val stream2State = state.streamIndexes.find { it.first == stream2.descriptor }!! state.streamIndexes.find {
it.first == MockDestinationCatalogFactory.stream1.descriptor
}!!
val stream2State =
state.streamIndexes.find {
it.first == MockDestinationCatalogFactory.stream2.descriptor
}!!
Assertions.assertEquals(event.expectedStream1Count, stream1State.second) Assertions.assertEquals(event.expectedStream1Count, stream1State.second)
Assertions.assertEquals(event.expectedStream2Count, stream2State.second) Assertions.assertEquals(event.expectedStream2Count, stream2State.second)
Assertions.assertEquals( Assertions.assertEquals(
@@ -278,8 +307,8 @@ class InputConsumerTaskTest {
@Test @Test
fun testStreamIncompleteThrows() = runTest { fun testStreamIncompleteThrows() = runTest {
mockInputFlow.addMessage(makeRecord(stream1, "test"), 1L) mockInputFlow.addMessage(makeRecord(MockDestinationCatalogFactory.stream1, "test"), 1L)
mockInputFlow.addMessage(makeStreamIncomplete(stream1), 0L) mockInputFlow.addMessage(makeStreamIncomplete(MockDestinationCatalogFactory.stream1), 0L)
CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() } CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() }
mockInputFlow.stop() mockInputFlow.stop()
} }

View File

@@ -2,11 +2,11 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.message.Deserializer import io.airbyte.cdk.load.message.Deserializer
import io.airbyte.cdk.state.MemoryManager import io.airbyte.cdk.load.state.MemoryManager
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest

View File

@@ -2,22 +2,22 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import com.google.common.collect.Range import com.google.common.collect.Range
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.command.MockDestinationCatalogFactory.Companion.stream1 import io.airbyte.cdk.load.command.MockDestinationCatalogFactory
import io.airbyte.cdk.data.NullValue import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.file.MockTempFileProvider import io.airbyte.cdk.load.file.MockTempFileProvider
import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.message.DestinationRecordWrapped import io.airbyte.cdk.load.message.DestinationRecordWrapped
import io.airbyte.cdk.message.MessageQueueSupplier import io.airbyte.cdk.load.message.MessageQueueSupplier
import io.airbyte.cdk.message.StreamCompleteWrapped import io.airbyte.cdk.load.message.StreamCompleteWrapped
import io.airbyte.cdk.message.StreamRecordWrapped import io.airbyte.cdk.load.message.StreamRecordWrapped
import io.airbyte.cdk.state.FlushStrategy import io.airbyte.cdk.load.state.FlushStrategy
import io.airbyte.cdk.state.MemoryManager import io.airbyte.cdk.load.state.MemoryManager
import io.airbyte.cdk.state.Reserved import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.task.MockTaskLauncher import io.airbyte.cdk.load.task.MockTaskLauncher
import io.micronaut.context.annotation.Primary import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
@@ -59,7 +59,7 @@ class SpillToDiskTaskTest {
} }
private suspend fun primeMessageQueue(): Long { private suspend fun primeMessageQueue(): Long {
val queue = queueSupplier.get(stream1.descriptor) val queue = queueSupplier.get(MockDestinationCatalogFactory.stream1.descriptor)
val maxRecords = ((1024 * 1.5) / 8).toLong() val maxRecords = ((1024 * 1.5) / 8).toLong()
var recordsWritten = 0L var recordsWritten = 0L
var bytesReserved = 0L var bytesReserved = 0L
@@ -74,7 +74,7 @@ class SpillToDiskTaskTest {
sizeBytes = 8, sizeBytes = 8,
record = record =
DestinationRecord( DestinationRecord(
stream = stream1.descriptor, stream = MockDestinationCatalogFactory.stream1.descriptor,
data = NullValue, data = NullValue,
emittedAtMs = 0, emittedAtMs = 0,
meta = null, meta = null,
@@ -95,9 +95,13 @@ class SpillToDiskTaskTest {
Assertions.assertEquals(availableMemory - bytesReserved, memoryManager.remainingMemoryBytes) Assertions.assertEquals(availableMemory - bytesReserved, memoryManager.remainingMemoryBytes)
val mockTaskLauncher = MockTaskLauncher() val mockTaskLauncher = MockTaskLauncher()
spillToDiskTaskFactory.make(mockTaskLauncher, stream1).execute() spillToDiskTaskFactory
.make(mockTaskLauncher, MockDestinationCatalogFactory.stream1)
.execute()
Assertions.assertEquals(1, mockTaskLauncher.spilledFiles.size) Assertions.assertEquals(1, mockTaskLauncher.spilledFiles.size)
spillToDiskTaskFactory.make(mockTaskLauncher, stream1).execute() spillToDiskTaskFactory
.make(mockTaskLauncher, MockDestinationCatalogFactory.stream1)
.execute()
Assertions.assertEquals(2, mockTaskLauncher.spilledFiles.size) Assertions.assertEquals(2, mockTaskLauncher.spilledFiles.size)
Assertions.assertEquals(1024, mockTaskLauncher.spilledFiles[0].batch.totalSizeBytes) Assertions.assertEquals(1024, mockTaskLauncher.spilledFiles[0].batch.totalSizeBytes)

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.task.internal package io.airbyte.cdk.load.task.internal
import io.airbyte.cdk.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.file.MockTimeProvider import io.airbyte.cdk.load.file.MockTimeProvider
import io.airbyte.cdk.message.QueueReader import io.airbyte.cdk.load.message.QueueReader
import io.airbyte.cdk.state.MockCheckpointManager import io.airbyte.cdk.load.state.MockCheckpointManager
import io.airbyte.cdk.task.MockTaskLauncher import io.airbyte.cdk.load.task.MockTaskLauncher
import io.micronaut.test.extensions.junit5.annotation.MicronautTest import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject import jakarta.inject.Inject
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
import kotlin.reflect.KClass import kotlin.reflect.KClass

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
import java.time.OffsetDateTime import java.time.OffsetDateTime
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions

View File

@@ -2,15 +2,15 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.check package io.airbyte.cdk.load.check
import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.test.util.FakeDataDumper import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.destination_process.TestDeploymentMode import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.spec package io.airbyte.cdk.load.spec
import com.deblock.jsondiff.DiffGenerator import com.deblock.jsondiff.DiffGenerator
import com.deblock.jsondiff.diff.JsonDiff import com.deblock.jsondiff.diff.JsonDiff
@@ -12,17 +12,16 @@ import com.deblock.jsondiff.matcher.LenientJsonObjectPartialMatcher
import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher
import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher
import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer
import io.airbyte.cdk.test.util.FakeDataDumper import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.test.util.IntegrationTest import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.test.util.destination_process.DestinationProcessFactory import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.cdk.test.util.destination_process.TestDeploymentMode
import io.airbyte.cdk.util.Jsons import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertAll
@@ -67,7 +66,7 @@ abstract class SpecTest :
val messages = process.readMessages() val messages = process.readMessages()
val specMessages = messages.filter { it.type == AirbyteMessage.Type.SPEC } val specMessages = messages.filter { it.type == AirbyteMessage.Type.SPEC }
assertEquals( Assertions.assertEquals(
specMessages.size, specMessages.size,
1, 1,
"Expected to receive exactly one connection status message, but got ${specMessages.size}: $specMessages" "Expected to receive exactly one connection status message, but got ${specMessages.size}: $specMessages"
@@ -88,8 +87,8 @@ abstract class SpecTest :
DiffGenerator.diff(expectedSpec, Jsons.writeValueAsString(spec), jsonMatcher) DiffGenerator.diff(expectedSpec, Jsons.writeValueAsString(spec), jsonMatcher)
assertAll( assertAll(
"Spec snapshot test failed. Run this test locally and then `git diff <...>/$expectedSpecFilename` to see what changed, and commit the diff if that change was intentional.", "Spec snapshot test failed. Run this test locally and then `git diff <...>/$expectedSpecFilename` to see what changed, and commit the diff if that change was intentional.",
{ assertEquals("", OnlyErrorDiffViewer.from(diff).toString()) }, { Assertions.assertEquals("", OnlyErrorDiffViewer.from(diff).toString()) },
{ assertEquals(expectedSpec, actualSpecPrettyPrint) } { Assertions.assertEquals(expectedSpec, actualSpecPrettyPrint) }
) )
} }
} }

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
fun interface DestinationCleaner { fun interface DestinationCleaner {
/** /**

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
fun interface DestinationDataDumper { fun interface DestinationDataDumper {
fun dumpRecords( fun dumpRecords(

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
fun interface ExpectedRecordMapper { fun interface ExpectedRecordMapper {
fun mapRecord(expectedRecord: OutputRecord): OutputRecord fun mapRecord(expectedRecord: OutputRecord): OutputRecord

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationMessage
import io.airbyte.cdk.test.util.destination_process.DestinationProcessFactory import io.airbyte.cdk.load.test.util.destination_process.DestinationProcessFactory
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
fun interface NameMapper { fun interface NameMapper {
/** /**

View File

@@ -2,10 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
import io.airbyte.cdk.data.ObjectValue import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.message.DestinationRecord.Change import io.airbyte.cdk.load.message.DestinationRecord.Change
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util package io.airbyte.cdk.load.test.util
import io.airbyte.cdk.data.AirbyteValue import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.data.IntegerValue import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.data.NullValue import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.data.ObjectValue import io.airbyte.cdk.load.data.ObjectValue
import kotlin.reflect.jvm.jvmName import kotlin.reflect.jvm.jvmName
class RecordDiffer( class RecordDiffer(

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/ */
package io.airbyte.cdk.test.util.destination_process package io.airbyte.cdk.load.test.util.destination_process
import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage

Some files were not shown because too many files have changed in this diff Show More