1
0
mirror of synced 2025-12-25 11:06:55 -05:00

bulk-cdk: add feature flag environments (#46692)

This commit is contained in:
Marius Posta
2024-10-17 19:36:01 -07:00
committed by GitHub
parent 7b12647a42
commit a4847dfe49
19 changed files with 221 additions and 112 deletions

View File

@@ -2,13 +2,13 @@
package io.airbyte.cdk
import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.MetadataYamlPropertySource
import io.micronaut.configuration.picocli.MicronautFactory
import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
@@ -21,9 +21,11 @@ import picocli.CommandLine.Model.UsageMessageSpec
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
@@ -36,10 +38,11 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
testEnvironments: Map<String, String> = emptyMap(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, testBeanDefinitions, testEnvironments) {
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
@@ -55,21 +58,18 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
systemEnv: Map<String, String>,
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
val testProperties: Map<String, String> = emptyMap(),
) {
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different thread, e.g.
// `Dispatchers.IO`. Force the test env if needed. (Some tests launch the connector from the IO
// context to avoid blocking themselves.)
private val isTest = testBeanDefinitions.isNotEmpty()
val envs: Array<String> =
arrayOf(Environment.CLI, connectorType) +
if (isTest) {
arrayOf(Environment.TEST)
} else {
emptyArray()
}
// Set feature flag environments.
FeatureFlag.active(systemEnv).map { it.micronautEnvironmentName } +
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different
// thread, e.g. `Dispatchers.IO`. Force the test env if needed. Some tests launch the
// connector from the IO context to avoid blocking themselves.
listOfNotNull(Environment.TEST.takeIf { testBeanDefinitions.isNotEmpty() })
inline fun <reified R : Runnable> run() {
val picocliCommandLineFactory = PicocliCommandLineFactory(this)
@@ -84,7 +84,6 @@ sealed class AirbyteConnectorRunner(
ApplicationContext.builder(R::class.java, *envs)
.propertySources(
*listOfNotNull(
MapPropertySource("additional_properties", testProperties),
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
@@ -93,10 +92,6 @@ sealed class AirbyteConnectorRunner(
)
.beanDefinitions(*testBeanDefinitions)
.start()
// We can't rely on the isTest value from our constructor,
// because that won't autodetect junit in our stacktrace.
// So instead we ask micronaut (which will include if we explicitly added
// the TEST env).
val isTest: Boolean = ctx.environment.activeNames.contains(Environment.TEST)
val picocliFactory: CommandLine.IFactory = MicronautFactory(ctx)
val picocliCommandLine: CommandLine =
@@ -105,8 +100,10 @@ sealed class AirbyteConnectorRunner(
if (!isTest) {
// Required by the platform, otherwise syncs may hang.
exitProcess(exitCode)
} else if (exitCode != 0) {
// Otherwise, propagate failure to test callers.
}
// At this point, we're in a test.
if (exitCode != 0) {
// Propagate failure to test callers.
throw ConnectorUncleanExitException(exitCode)
}
}

View File

@@ -4,5 +4,6 @@
package io.airbyte.cdk
/** This is used only in tests. */
class ConnectorUncleanExitException(val exitCode: Int) :
Exception("Destination process exited uncleanly: $exitCode")

View File

@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.command
import io.micronaut.context.annotation.Factory
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.util.EnumSet
/**
* An enum of all feature flags, currently these are set via environment vars.
*
* Micronaut can inject a Set<FeatureFlag> singleton of all active feature flags.
*/
enum class FeatureFlag(
val micronautEnvironmentName: String,
val envVar: EnvVar,
val requiredEnvVarValue: String,
private val transformActualValue: (String) -> String = { it }
) {
/** [AIRBYTE_CLOUD_DEPLOYMENT] is active when the connector is running in Airbyte Cloud. */
AIRBYTE_CLOUD_DEPLOYMENT(
micronautEnvironmentName = AIRBYTE_CLOUD_ENV,
envVar = EnvVar.DEPLOYMENT_MODE,
requiredEnvVarValue = "CLOUD",
transformActualValue = { it.trim().uppercase() },
);
/** Environment variable binding shell declaration which activates the feature flag. */
val envVarBindingDeclaration: String
get() = "${envVar.name}=$requiredEnvVarValue"
enum class EnvVar(val defaultValue: String = "") {
DEPLOYMENT_MODE
}
companion object {
internal fun active(systemEnv: Map<String, String>): List<FeatureFlag> =
entries.filter { featureFlag: FeatureFlag ->
val envVar: EnvVar = featureFlag.envVar
val envVarValue: String = systemEnv[envVar.name] ?: envVar.defaultValue
featureFlag.transformActualValue(envVarValue) == featureFlag.requiredEnvVarValue
}
}
@Factory
private class MicronautFactory {
@Singleton
fun active(environment: Environment): Set<FeatureFlag> =
EnumSet.noneOf(FeatureFlag::class.java).apply {
addAll(
FeatureFlag.entries.filter {
environment.activeNames.contains(it.micronautEnvironmentName)
}
)
}
}
}
const val AIRBYTE_CLOUD_ENV = "airbyte-cloud"

View File

@@ -36,11 +36,12 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
vararg featureFlags: FeatureFlag,
): CliRunnable {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, out.beanDefinition)
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
}
return CliRunnable(runnable, out.results)
}
@@ -52,7 +53,7 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
testProperties: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
@@ -63,7 +64,7 @@ data object CliRunner {
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
testProperties,
featureFlags.systemEnv,
inputBeanDefinition,
out.beanDefinition,
)
@@ -77,6 +78,7 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
@@ -88,7 +90,7 @@ data object CliRunner {
baos.toByteArray()
}
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
return destination(op, config, catalog, state, inputStream)
return destination(op, config, catalog, state, inputStream, *featureFlags.toTypedArray())
}
private fun makeRunnable(
@@ -120,6 +122,9 @@ data object CliRunner {
}
}
private val Array<out FeatureFlag>.systemEnv: Map<String, String>
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()
private fun inputFile(contents: Any?): Path? =
contents?.let {
Files.createTempFile(null, null).also { file ->