Enable file perf test in CI so we catch files regressions. (#60331)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -15,6 +15,7 @@ import io.airbyte.cdk.load.util.deserializeToNode
|
||||
import io.airbyte.protocol.models.v0.AirbyteGlobalState
|
||||
import io.airbyte.protocol.models.v0.AirbyteMessage
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageFileReference
|
||||
import io.airbyte.protocol.models.v0.AirbyteStateMessage
|
||||
|
||||
sealed interface InputMessage {
|
||||
@@ -27,6 +28,7 @@ data class InputRecord(
|
||||
val emittedAtMs: Long,
|
||||
val meta: Meta?,
|
||||
val serialized: String,
|
||||
val fileReference: AirbyteRecordMessageFileReference? = null,
|
||||
) : InputMessage {
|
||||
/** Convenience constructor, primarily intended for use in tests. */
|
||||
constructor(
|
||||
@@ -35,12 +37,14 @@ data class InputRecord(
|
||||
data: String,
|
||||
emittedAtMs: Long,
|
||||
changes: MutableList<Meta.Change> = mutableListOf(),
|
||||
fileReference: AirbyteRecordMessageFileReference? = null,
|
||||
) : this(
|
||||
stream = DestinationStream.Descriptor(namespace, name),
|
||||
data = JsonToAirbyteValue().convert(data.deserializeToNode()),
|
||||
emittedAtMs = emittedAtMs,
|
||||
meta = Meta(changes),
|
||||
serialized = "",
|
||||
fileReference,
|
||||
)
|
||||
|
||||
override fun asProtocolMessage(): AirbyteMessage =
|
||||
@@ -56,6 +60,9 @@ data class InputRecord(
|
||||
if (meta != null) {
|
||||
it.withMeta(meta.asProtocolObject())
|
||||
}
|
||||
if (fileReference != null) {
|
||||
it.withFileReference(fileReference)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -223,19 +223,14 @@ abstract class IntegrationTest(
|
||||
destinationProcessFactory: DestinationProcessFactory = this.destinationProcessFactory,
|
||||
): List<AirbyteMessage> {
|
||||
destinationProcessFactory.testName = testPrettyName
|
||||
val fileTransferProperty =
|
||||
if (useFileTransfer) {
|
||||
mapOf(EnvVarConstants.FILE_TRANSFER_ENABLED to "true")
|
||||
} else {
|
||||
emptyMap()
|
||||
}
|
||||
|
||||
val destination =
|
||||
destinationProcessFactory.createDestinationProcess(
|
||||
"write",
|
||||
configContents,
|
||||
catalog.asProtocolObject(),
|
||||
useFileTransfer = useFileTransfer,
|
||||
micronautProperties = micronautProperties + fileTransferProperty,
|
||||
micronautProperties = micronautProperties,
|
||||
)
|
||||
return runBlocking(Dispatchers.IO) {
|
||||
launch { destination.run() }
|
||||
|
||||
@@ -35,8 +35,6 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
|
||||
import io.airbyte.cdk.load.data.UnionType
|
||||
import io.airbyte.cdk.load.data.UnknownType
|
||||
import io.airbyte.cdk.load.data.json.toAirbyteValue
|
||||
import io.airbyte.cdk.load.message.DestinationFile
|
||||
import io.airbyte.cdk.load.message.InputFile
|
||||
import io.airbyte.cdk.load.message.InputGlobalCheckpoint
|
||||
import io.airbyte.cdk.load.message.InputRecord
|
||||
import io.airbyte.cdk.load.message.InputStreamCheckpoint
|
||||
@@ -56,6 +54,7 @@ import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitE
|
||||
import io.airbyte.cdk.load.util.deserializeToNode
|
||||
import io.airbyte.cdk.load.util.serializeToString
|
||||
import io.airbyte.protocol.models.v0.AirbyteMessage
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageFileReference
|
||||
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
|
||||
import java.math.BigDecimal
|
||||
import java.math.BigInteger
|
||||
@@ -382,14 +381,30 @@ abstract class BasicFunctionalityIntegrationTest(
|
||||
generationId = 0,
|
||||
minimumGenerationId = 0,
|
||||
syncId = 42,
|
||||
isFileBased = true,
|
||||
includeFiles = true,
|
||||
)
|
||||
val fileMessage =
|
||||
DestinationFile.AirbyteRecordMessageFile(
|
||||
fileUrl = "/tmp/test_file",
|
||||
bytes = 1234L,
|
||||
fileRelativePath = "path/to/file",
|
||||
modified = 4321L,
|
||||
sourceFileUrl = "file://path/to/source",
|
||||
|
||||
val sourcePath = "path/to/file"
|
||||
// these must match the values hard-coded in DockerizedDestination
|
||||
val stagingDir = "tmp"
|
||||
val fileName = "test_file"
|
||||
val fileContents = "123"
|
||||
|
||||
val fileReference =
|
||||
AirbyteRecordMessageFileReference()
|
||||
.withSourceFileRelativePath(sourcePath)
|
||||
.withStagingFileUrl("/$stagingDir/$fileName")
|
||||
.withFileSizeBytes(1234L)
|
||||
|
||||
val input =
|
||||
InputRecord(
|
||||
namespace = randomizedNamespace,
|
||||
name = "test_stream_file",
|
||||
data = """{"id": 5678}""",
|
||||
emittedAtMs = 1234,
|
||||
changes = mutableListOf(),
|
||||
fileReference = fileReference,
|
||||
)
|
||||
|
||||
val messages =
|
||||
@@ -397,11 +412,7 @@ abstract class BasicFunctionalityIntegrationTest(
|
||||
updatedConfig,
|
||||
stream,
|
||||
listOf(
|
||||
InputFile(
|
||||
stream = stream,
|
||||
emittedAtMs = 1234,
|
||||
fileMessage = fileMessage,
|
||||
),
|
||||
input,
|
||||
InputStreamCheckpoint(
|
||||
streamName = stream.descriptor.name,
|
||||
streamNamespace = stream.descriptor.namespace,
|
||||
@@ -435,7 +446,7 @@ abstract class BasicFunctionalityIntegrationTest(
|
||||
val config = ValidatedJsonUtils.parseOne(configSpecClass, updatedConfig)
|
||||
val fileContent = dataDumper.dumpFile(config, stream)
|
||||
|
||||
assertEquals(mapOf("path/to/file" to "123"), fileContent)
|
||||
assertEquals(fileContents, fileContent[sourcePath])
|
||||
}
|
||||
|
||||
@Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/10413")
|
||||
|
||||
Reference in New Issue
Block a user