feat: PoC check as declarative component (#61720)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
df5cbfbff6
commit
b091f64313
@@ -2,10 +2,8 @@ dependencies {
|
||||
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
|
||||
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
|
||||
|
||||
api("org.apache.commons:commons-csv:1.11.0")
|
||||
|
||||
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
|
||||
implementation 'dev.failsafe:failsafe-okhttp:3.3.2'
|
||||
api 'com.squareup.okhttp3:okhttp:4.12.0'
|
||||
api 'dev.failsafe:failsafe-okhttp:3.3.2'
|
||||
|
||||
testFixturesApi 'org.jetbrains.kotlin:kotlin-test'
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.http.authentication
|
||||
|
||||
import okhttp3.Credentials
|
||||
import okhttp3.Interceptor
|
||||
import okhttp3.Response as OkHttpResponse
|
||||
|
||||
class BasicAccessAuthenticator(
|
||||
private val username: String,
|
||||
private val password: String,
|
||||
) : Interceptor {
|
||||
|
||||
override fun intercept(chain: Interceptor.Chain): OkHttpResponse {
|
||||
val requestWithAuthorization =
|
||||
chain
|
||||
.request()
|
||||
.newBuilder()
|
||||
.header("Authorization", Credentials.basic(username, password))
|
||||
.build()
|
||||
return chain.proceed(requestWithAuthorization)
|
||||
}
|
||||
}
|
||||
11
airbyte-cdk/bulk/toolkits/load-low-code/build.gradle
Normal file
11
airbyte-cdk/bulk/toolkits/load-low-code/build.gradle
Normal file
@@ -0,0 +1,11 @@
|
||||
dependencies {
|
||||
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
|
||||
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
|
||||
|
||||
implementation project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-http')
|
||||
api 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
|
||||
api 'com.fasterxml.jackson.module:jackson-module-kotlin'
|
||||
api 'com.hubspot.jinjava:jinjava:2.7.4'
|
||||
|
||||
testFixturesApi 'org.jetbrains.kotlin:kotlin-test'
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.checker
|
||||
|
||||
import io.airbyte.cdk.load.check.DestinationChecker
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.http.HttpRequester
|
||||
|
||||
class HttpRequestChecker<C : DestinationConfiguration>(private val requester: HttpRequester) :
|
||||
DestinationChecker<C> {
|
||||
override fun check(config: C) {
|
||||
val response = requester.send()
|
||||
response.use {
|
||||
assert(
|
||||
it.statusCode == 200,
|
||||
{ "Expected status code to be 200 but was ${it.statusCode}" }
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.http
|
||||
|
||||
class HttpRequester(
|
||||
private val client: HttpClient,
|
||||
private val method: RequestMethod,
|
||||
private val url: String,
|
||||
) {
|
||||
|
||||
fun send(): Response {
|
||||
return client.send(
|
||||
Request(
|
||||
method = method,
|
||||
url = url
|
||||
// TODO eventually support the following
|
||||
// val headers: Map<String, String> = mapOf(),
|
||||
// val query: Map<String, List<String>> = mapOf(),
|
||||
// val body: ByteArray? = null,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.interpolation
|
||||
|
||||
import com.hubspot.jinjava.Jinjava
|
||||
|
||||
class StringInterpolator {
|
||||
private val interpolator = Jinjava()
|
||||
|
||||
fun interpolate(string: String, context: Map<String, Any>): String {
|
||||
return interpolator.render(string, context)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.lowcode
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
|
||||
import dev.failsafe.RetryPolicy
|
||||
import io.airbyte.cdk.load.checker.HttpRequestChecker
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.http.HttpRequester
|
||||
import io.airbyte.cdk.load.http.RequestMethod
|
||||
import io.airbyte.cdk.load.http.authentication.BasicAccessAuthenticator
|
||||
import io.airbyte.cdk.load.http.okhttp.AirbyteOkHttpClient
|
||||
import io.airbyte.cdk.load.interpolation.StringInterpolator
|
||||
import io.airbyte.cdk.load.model.DeclarativeDestination as DeclarativeDestinationModel
|
||||
import io.airbyte.cdk.load.model.checker.Checker as CheckerModel
|
||||
import io.airbyte.cdk.load.model.checker.HttpRequestChecker as HttpRequestCheckerModel
|
||||
import io.airbyte.cdk.load.model.http.HttpMethod
|
||||
import io.airbyte.cdk.load.model.http.HttpRequester as HttpRequesterModel
|
||||
import io.airbyte.cdk.load.model.http.authenticator.Authenticator as AuthenticatorModel
|
||||
import io.airbyte.cdk.load.model.http.authenticator.BasicAccessAuthenticator as BasicAccessAuthenticatorModel
|
||||
import io.airbyte.cdk.util.ResourceUtils
|
||||
import okhttp3.Interceptor
|
||||
import okhttp3.OkHttpClient
|
||||
|
||||
class DeclarativeDestinationFactory<T : DestinationConfiguration>(private val config: T) {
|
||||
private val stringInterpolator: StringInterpolator = StringInterpolator()
|
||||
|
||||
fun createDestinationChecker(): HttpRequestChecker<T> {
|
||||
val mapper = ObjectMapper(YAMLFactory())
|
||||
val manifestContent = ResourceUtils.readResource("manifest.yaml")
|
||||
val manifest: DeclarativeDestinationModel =
|
||||
mapper.readValue(manifestContent, DeclarativeDestinationModel::class.java)
|
||||
return createChecker(manifest.checker)
|
||||
}
|
||||
|
||||
private fun createAuthenticator(
|
||||
model: AuthenticatorModel,
|
||||
): Interceptor =
|
||||
when (model) {
|
||||
is BasicAccessAuthenticatorModel -> model.toInterceptor(createInterpolationContext())
|
||||
}
|
||||
|
||||
private fun createChecker(
|
||||
model: CheckerModel,
|
||||
): HttpRequestChecker<T> =
|
||||
when (model) {
|
||||
is HttpRequestCheckerModel -> HttpRequestChecker(model.requester.toRequester())
|
||||
}
|
||||
|
||||
fun BasicAccessAuthenticatorModel.toInterceptor(
|
||||
interpolationContext: Map<String, Any>
|
||||
): BasicAccessAuthenticator =
|
||||
BasicAccessAuthenticator(
|
||||
stringInterpolator.interpolate(this.username, interpolationContext),
|
||||
stringInterpolator.interpolate(this.password, interpolationContext),
|
||||
)
|
||||
|
||||
fun HttpRequesterModel.toRequester(): HttpRequester {
|
||||
val requester = this
|
||||
val okhttpClient: OkHttpClient =
|
||||
OkHttpClient.Builder()
|
||||
.apply {
|
||||
if (requester.authenticator != null) {
|
||||
this.addInterceptor(createAuthenticator(requester.authenticator))
|
||||
}
|
||||
}
|
||||
.build()
|
||||
return HttpRequester(
|
||||
AirbyteOkHttpClient(okhttpClient, RetryPolicy.ofDefaults()),
|
||||
this.method.toRequestMethod(),
|
||||
this.url,
|
||||
)
|
||||
}
|
||||
|
||||
fun HttpMethod.toRequestMethod(): RequestMethod =
|
||||
when (this) {
|
||||
HttpMethod.GET -> RequestMethod.GET
|
||||
HttpMethod.POST -> RequestMethod.POST
|
||||
HttpMethod.PUT -> RequestMethod.PUT
|
||||
HttpMethod.PATCH -> RequestMethod.PATCH
|
||||
HttpMethod.DELETE -> RequestMethod.DELETE
|
||||
HttpMethod.HEAD -> RequestMethod.HEAD
|
||||
HttpMethod.OPTIONS -> RequestMethod.OPTIONS
|
||||
}
|
||||
|
||||
private fun createInterpolationContext(): Map<String, T> = mapOf("config" to config)
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty
|
||||
import io.airbyte.cdk.load.model.checker.Checker
|
||||
|
||||
/**
|
||||
* Root configuration for a declarative destination that uploads data according to its declarative
|
||||
* components.
|
||||
*/
|
||||
data class DeclarativeDestination(@JsonProperty("checker") val checker: Checker)
|
||||
@@ -0,0 +1,13 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.checker
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo
|
||||
|
||||
/** Base interface for all checker types in declarative destinations. */
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
|
||||
@JsonSubTypes(JsonSubTypes.Type(value = HttpRequestChecker::class, name = "HttpRequestChecker"))
|
||||
sealed interface Checker
|
||||
@@ -0,0 +1,14 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.checker
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty
|
||||
import io.airbyte.cdk.load.model.http.HttpRequester
|
||||
|
||||
/**
|
||||
* Configuration for destination check operations. Performs a HTTP request to the destination API to
|
||||
* check if the configuration is valid.
|
||||
*/
|
||||
data class HttpRequestChecker(@JsonProperty("requester") val requester: HttpRequester) : Checker
|
||||
@@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.http
|
||||
|
||||
/** Enum representing HTTP methods supported by the HttpRequester. */
|
||||
enum class HttpMethod {
|
||||
GET,
|
||||
POST,
|
||||
PUT,
|
||||
PATCH,
|
||||
DELETE,
|
||||
HEAD,
|
||||
OPTIONS
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.http
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty
|
||||
import io.airbyte.cdk.load.model.http.authenticator.Authenticator
|
||||
|
||||
/** Describes a HTTP request configuration. */
|
||||
data class HttpRequester(
|
||||
@JsonProperty("type") val type: String = "HttpRequester",
|
||||
@JsonProperty("url") val url: String,
|
||||
@JsonProperty("method") val method: HttpMethod,
|
||||
@JsonProperty("authenticator") val authenticator: Authenticator? = null
|
||||
)
|
||||
@@ -0,0 +1,15 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.http.authenticator
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo
|
||||
|
||||
/** Base interface for all authenticator types in declarative destinations. */
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
|
||||
@JsonSubTypes(
|
||||
JsonSubTypes.Type(value = BasicAccessAuthenticator::class, name = "BasicAccessAuthenticator")
|
||||
)
|
||||
sealed interface Authenticator
|
||||
@@ -0,0 +1,13 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.model.http.authenticator
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty
|
||||
|
||||
/** Configuration for basic access authentication. */
|
||||
data class BasicAccessAuthenticator(
|
||||
@JsonProperty("username") val username: String,
|
||||
@JsonProperty("password") val password: String
|
||||
) : Authenticator
|
||||
@@ -0,0 +1,10 @@
|
||||
checker:
|
||||
type: HttpRequestChecker
|
||||
requester:
|
||||
type: HttpRequester
|
||||
url: https://api.aircall.io/v1/users
|
||||
method: GET
|
||||
authenticator:
|
||||
type: BasicAccessAuthenticator
|
||||
username: "{{ config.apiId }}"
|
||||
password: "{{ config.apiToken }}"
|
||||
@@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.interpolation
|
||||
|
||||
import io.airbyte.cdk.util.Jsons
|
||||
import kotlin.test.assertEquals
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class StringInterpolationTest {
|
||||
|
||||
@Test
|
||||
internal fun `test given if statement true when eval then return proper value`() {
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"{{ first == second ? 'true' : 'false' }}",
|
||||
mapOf("first" to 1, "second" to 1)
|
||||
)
|
||||
assertEquals("true", interpolatedValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given if statement false when eval then return proper value`() {
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"{{ 'true' if first == second else 'false' }}",
|
||||
mapOf("first" to 1, "second" to 2)
|
||||
)
|
||||
assertEquals("false", interpolatedValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given string interpolation when eval then insert variable into string`() {
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"{{protocol}}://login.salesforce.com/auth",
|
||||
mapOf("protocol" to "https")
|
||||
)
|
||||
assertEquals("https://login.salesforce.com/auth", interpolatedValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given string interpolation with condition true when eval then evaluate condition`() {
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"https://{{ 'sandbox' if isSandbox else 'login'}}.salesforce.com/auth",
|
||||
mapOf("isSandbox" to true)
|
||||
)
|
||||
assertEquals("https://sandbox.salesforce.com/auth", interpolatedValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given string interpolation with condition false when eval then evaluate condition`() {
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"https://{{ 'sandbox' if isSandbox else 'login'}}.salesforce.com/auth",
|
||||
mapOf("isSandbox" to false)
|
||||
)
|
||||
assertEquals("https://login.salesforce.com/auth", interpolatedValue)
|
||||
}
|
||||
|
||||
@Test
|
||||
internal fun `test given ObjectNode when eval then extract values from ObjectNode`() {
|
||||
val objectNode =
|
||||
Jsons.objectNode().apply {
|
||||
this.putObject("modificationMetadata").put("readOnlyValue", false)
|
||||
}
|
||||
|
||||
val interpolatedValue =
|
||||
StringInterpolator()
|
||||
.interpolate(
|
||||
"""{{ response.get("modificationMetadata").get("readOnlyValue") }}""",
|
||||
mapOf("response" to objectNode)
|
||||
)
|
||||
assertEquals("false", interpolatedValue)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.cdk.load.lowcode
|
||||
|
||||
import io.airbyte.cdk.load.command.DestinationConfiguration
|
||||
import io.airbyte.cdk.load.http.authentication.BasicAccessAuthenticator
|
||||
import io.airbyte.cdk.util.ResourceUtils
|
||||
import io.mockk.EqMatcher
|
||||
import io.mockk.every
|
||||
import io.mockk.mockkConstructor
|
||||
import io.mockk.mockkStatic
|
||||
import io.mockk.unmockkStatic
|
||||
import io.mockk.verify
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class MockConfig(
|
||||
val apiId: String,
|
||||
val apiToken: String,
|
||||
) : DestinationConfiguration()
|
||||
|
||||
val VALID_API_ID: String = "api_id"
|
||||
val VALID_API_TOKEN: String = "api_token"
|
||||
|
||||
class DeclarativeDestinationFactoryTest {
|
||||
|
||||
@Test
|
||||
internal fun `test when check then ensure check gets interpolated credentials`() {
|
||||
mockManifest(
|
||||
"""
|
||||
checker:
|
||||
type: HttpRequestChecker
|
||||
requester:
|
||||
type: HttpRequester
|
||||
url: https://airbyte.io/
|
||||
method: GET
|
||||
authenticator:
|
||||
type: BasicAccessAuthenticator
|
||||
username: "{{ config.apiId }}"
|
||||
password: "{{ config.apiToken }}"
|
||||
""".trimIndent()
|
||||
)
|
||||
mockkConstructor(BasicAccessAuthenticator::class)
|
||||
val config = MockConfig(VALID_API_ID, VALID_API_TOKEN)
|
||||
|
||||
try {
|
||||
DeclarativeDestinationFactory(config).createDestinationChecker().check(config)
|
||||
|
||||
verify {
|
||||
constructedWith<BasicAccessAuthenticator>(
|
||||
EqMatcher(VALID_API_ID),
|
||||
EqMatcher(VALID_API_TOKEN)
|
||||
)
|
||||
.intercept(any())
|
||||
}
|
||||
} finally {
|
||||
unmockkStatic("io.airbyte.cdk.util.ResourceUtils") // Clean up mocks
|
||||
}
|
||||
}
|
||||
|
||||
private fun mockManifest(manifestContent: String) {
|
||||
// Mock ResourceUtils to return our test manifest
|
||||
mockkStatic("io.airbyte.cdk.util.ResourceUtils")
|
||||
every { ResourceUtils.readResource("manifest.yaml") } returns manifestContent
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user