Bulk load CDK: test runner not micronaut, fix concurrent execution (#47006)
This commit is contained in:
@@ -15,13 +15,11 @@ import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
|
|||||||
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
|
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
|
||||||
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
|
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
|
||||||
import io.airbyte.protocol.models.v0.StreamDescriptor
|
import io.airbyte.protocol.models.v0.StreamDescriptor
|
||||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
import java.time.ZoneOffset
|
import java.time.ZoneOffset
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import javax.inject.Inject
|
|
||||||
import kotlin.test.fail
|
import kotlin.test.fail
|
||||||
import kotlinx.coroutines.async
|
import kotlinx.coroutines.async
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
@@ -36,13 +34,6 @@ import uk.org.webcompere.systemstubs.environment.EnvironmentVariables
|
|||||||
import uk.org.webcompere.systemstubs.jupiter.SystemStub
|
import uk.org.webcompere.systemstubs.jupiter.SystemStub
|
||||||
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension
|
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension
|
||||||
|
|
||||||
@MicronautTest(
|
|
||||||
// Manually add metadata.yaml as a property source so that we can use its
|
|
||||||
// values as injectable properties.
|
|
||||||
// This is _infinitely_ easier than trying to wire up
|
|
||||||
// MetadataYamlPropertySource to be available at test time.
|
|
||||||
propertySources = ["classpath:metadata.yaml"]
|
|
||||||
)
|
|
||||||
@Execution(ExecutionMode.CONCURRENT)
|
@Execution(ExecutionMode.CONCURRENT)
|
||||||
// Spotbugs doesn't let you suppress the actual lateinit property,
|
// Spotbugs doesn't let you suppress the actual lateinit property,
|
||||||
// so we have to suppress the entire class.
|
// so we have to suppress the entire class.
|
||||||
@@ -70,7 +61,7 @@ abstract class IntegrationTest(
|
|||||||
// Intentionally don't inject the actual destination process - we need a full factory
|
// Intentionally don't inject the actual destination process - we need a full factory
|
||||||
// because some tests want to run multiple syncs, so we need to run the destination
|
// because some tests want to run multiple syncs, so we need to run the destination
|
||||||
// multiple times.
|
// multiple times.
|
||||||
@Inject lateinit var destinationProcessFactory: DestinationProcessFactory
|
val destinationProcessFactory = DestinationProcessFactory.get()
|
||||||
|
|
||||||
@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
|
@Suppress("DEPRECATION") private val randomSuffix = RandomStringUtils.randomAlphabetic(4)
|
||||||
private val timestampString =
|
private val timestampString =
|
||||||
|
|||||||
@@ -10,6 +10,9 @@ import io.airbyte.cdk.command.FeatureFlag
|
|||||||
import io.airbyte.cdk.load.test.util.IntegrationTest
|
import io.airbyte.cdk.load.test.util.IntegrationTest
|
||||||
import io.airbyte.protocol.models.v0.AirbyteMessage
|
import io.airbyte.protocol.models.v0.AirbyteMessage
|
||||||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
|
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
|
||||||
|
import io.micronaut.context.env.yaml.YamlPropertySourceLoader
|
||||||
|
import java.nio.file.Files
|
||||||
|
import java.nio.file.Path
|
||||||
|
|
||||||
const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST"
|
const val DOCKERIZED_TEST_ENV = "DOCKERIZED_INTEGRATION_TEST"
|
||||||
|
|
||||||
@@ -56,4 +59,28 @@ abstract class DestinationProcessFactory {
|
|||||||
catalog: ConfiguredAirbyteCatalog? = null,
|
catalog: ConfiguredAirbyteCatalog? = null,
|
||||||
vararg featureFlags: FeatureFlag,
|
vararg featureFlags: FeatureFlag,
|
||||||
): DestinationProcess
|
): DestinationProcess
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun get(): DestinationProcessFactory =
|
||||||
|
when (val runner = System.getenv("AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER")) {
|
||||||
|
null,
|
||||||
|
"non-docker" -> NonDockerizedDestinationFactory()
|
||||||
|
"docker" -> {
|
||||||
|
val rawProperties: Map<String, Any?> =
|
||||||
|
YamlPropertySourceLoader()
|
||||||
|
.read(
|
||||||
|
"irrelevant",
|
||||||
|
Files.readAllBytes(Path.of("metadata.yaml")),
|
||||||
|
)
|
||||||
|
DockerizedDestinationFactory(
|
||||||
|
rawProperties["data.dockerRepository"] as String,
|
||||||
|
"dev"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
else ->
|
||||||
|
throw IllegalArgumentException(
|
||||||
|
"Unknown AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER environment variable: $runner"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -152,10 +152,6 @@ class AirbyteBulkConnectorPlugin implements Plugin<Project> {
|
|||||||
resources {
|
resources {
|
||||||
srcDir 'src/test-integration/resources'
|
srcDir 'src/test-integration/resources'
|
||||||
}
|
}
|
||||||
resources {
|
|
||||||
srcDir '.'
|
|
||||||
include 'metadata.yaml'
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// This source set should only be used for tests based on the old CDK's test classes,
|
// This source set should only be used for tests based on the old CDK's test classes,
|
||||||
// in particular DestinationAcceptanceTest / BaseTypingDedupingTest.
|
// in particular DestinationAcceptanceTest / BaseTypingDedupingTest.
|
||||||
@@ -178,6 +174,11 @@ class AirbyteBulkConnectorPlugin implements Plugin<Project> {
|
|||||||
testClassesDirs = project.sourceSets.integrationTest.output.classesDirs
|
testClassesDirs = project.sourceSets.integrationTest.output.classesDirs
|
||||||
classpath = project.sourceSets.integrationTest.runtimeClasspath
|
classpath = project.sourceSets.integrationTest.runtimeClasspath
|
||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
|
|
||||||
|
jvmArgs = project.test.jvmArgs
|
||||||
|
systemProperties = project.test.systemProperties
|
||||||
|
maxParallelForks = project.test.maxParallelForks
|
||||||
|
maxHeapSize = project.test.maxHeapSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// For historical reasons (i.e. airbyte-ci), this task is called integrationTestJava.
|
// For historical reasons (i.e. airbyte-ci), this task is called integrationTestJava.
|
||||||
@@ -193,7 +194,12 @@ class AirbyteBulkConnectorPlugin implements Plugin<Project> {
|
|||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
// We need a docker image to run this task, so depend on assemble
|
// We need a docker image to run this task, so depend on assemble
|
||||||
dependsOn project.tasks.assemble
|
dependsOn project.tasks.assemble
|
||||||
environment "MICRONAUT_ENVIRONMENTS", "DOCKERIZED_INTEGRATION_TEST"
|
environment "AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER", "docker"
|
||||||
|
|
||||||
|
jvmArgs = project.test.jvmArgs
|
||||||
|
systemProperties = project.test.systemProperties
|
||||||
|
maxParallelForks = project.test.maxParallelForks
|
||||||
|
maxHeapSize = project.test.maxHeapSize
|
||||||
}
|
}
|
||||||
|
|
||||||
project.dependencies {
|
project.dependencies {
|
||||||
|
|||||||
Reference in New Issue
Block a user