source-mysql: clean up config (#49950)
This commit is contained in:
@@ -9,7 +9,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: source
|
||||
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
|
||||
dockerImageTag: 3.10.0-rc.1
|
||||
dockerImageTag: 3.10.0-rc.2
|
||||
dockerRepository: airbyte/source-mysql
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
|
||||
githubIssueLabel: source-mysql
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.airbyte.cdk.command.FeatureFlag
|
||||
import io.airbyte.cdk.command.JdbcSourceConfiguration
|
||||
import io.airbyte.cdk.command.SourceConfiguration
|
||||
import io.airbyte.cdk.command.SourceConfigurationFactory
|
||||
import io.airbyte.cdk.jdbc.SSLCertificateUtils
|
||||
import io.airbyte.cdk.ssh.SshConnectionOptions
|
||||
import io.airbyte.cdk.ssh.SshNoTunnelMethod
|
||||
import io.airbyte.cdk.ssh.SshTunnelMethodConfiguration
|
||||
@@ -15,9 +16,14 @@ import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import io.micronaut.context.annotation.Factory
|
||||
import jakarta.inject.Inject
|
||||
import jakarta.inject.Singleton
|
||||
import java.net.MalformedURLException
|
||||
import java.net.URI
|
||||
import java.net.URL
|
||||
import java.net.URLDecoder
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.FileSystems
|
||||
import java.time.Duration
|
||||
import java.util.UUID
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
@@ -37,9 +43,10 @@ data class MySqlSourceConfiguration(
|
||||
override val checkPrivileges: Boolean,
|
||||
override val debeziumHeartbeatInterval: Duration = Duration.ofSeconds(10),
|
||||
val debeziumKeepAliveInterval: Duration = Duration.ofMinutes(1),
|
||||
override val maxSnapshotReadDuration: Duration?
|
||||
) : JdbcSourceConfiguration, CdcSourceConfiguration {
|
||||
override val global = incrementalConfiguration is CdcIncrementalConfiguration
|
||||
override val maxSnapshotReadDuration: Duration?
|
||||
get() = (incrementalConfiguration as? CdcIncrementalConfiguration)?.initialLoadTimeout
|
||||
|
||||
/** Required to inject [MySqlSourceConfiguration] directly. */
|
||||
@Factory
|
||||
@@ -59,7 +66,6 @@ sealed interface IncrementalConfiguration
|
||||
data object UserDefinedCursorIncrementalConfiguration : IncrementalConfiguration
|
||||
|
||||
data class CdcIncrementalConfiguration(
|
||||
val initialWaitDuration: Duration,
|
||||
val initialLoadTimeout: Duration,
|
||||
val serverTimezone: String?,
|
||||
val invalidCdcCursorPositionBehavior: InvalidCdcCursorPositionBehavior
|
||||
@@ -81,7 +87,6 @@ class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
|
||||
): MySqlSourceConfiguration {
|
||||
val realHost: String = pojo.host
|
||||
val realPort: Int = pojo.port
|
||||
val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
|
||||
val jdbcProperties = mutableMapOf<String, String>()
|
||||
jdbcProperties["user"] = pojo.username
|
||||
pojo.password?.let { jdbcProperties["password"] = it }
|
||||
@@ -101,57 +106,35 @@ class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
|
||||
jdbcProperties[key] = URLDecoder.decode(urlEncodedValue, StandardCharsets.UTF_8)
|
||||
}
|
||||
}
|
||||
// Determine protocol and configure encryption.
|
||||
val encryption: Encryption? = pojo.getEncryptionValue()
|
||||
val jdbcEncryption =
|
||||
when (encryption) {
|
||||
is EncryptionPreferred -> {
|
||||
if (
|
||||
featureFlags.contains(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT) &&
|
||||
sshTunnel is SshNoTunnelMethod
|
||||
) {
|
||||
throw ConfigErrorException(
|
||||
"Connection from Airbyte Cloud requires " +
|
||||
"SSL encryption or an SSH tunnel."
|
||||
)
|
||||
}
|
||||
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.PREFERRED)
|
||||
}
|
||||
is EncryptionRequired ->
|
||||
MySqlSourceEncryption(sslMode = MySqlSourceEncryption.SslMode.REQUIRED)
|
||||
is SslVerifyCertificate ->
|
||||
MySqlSourceEncryption(
|
||||
sslMode = MySqlSourceEncryption.SslMode.VERIFY_CA,
|
||||
caCertificate = encryption.sslCertificate,
|
||||
clientCertificate = encryption.sslClientCertificate,
|
||||
clientKey = encryption.sslClientKey,
|
||||
clientKeyPassword = encryption.sslClientPassword
|
||||
)
|
||||
is SslVerifyIdentity ->
|
||||
MySqlSourceEncryption(
|
||||
sslMode = MySqlSourceEncryption.SslMode.VERIFY_IDENTITY,
|
||||
caCertificate = encryption.sslCertificate,
|
||||
clientCertificate = encryption.sslClientCertificate,
|
||||
clientKey = encryption.sslClientKey,
|
||||
clientKeyPassword = encryption.sslClientPassword
|
||||
)
|
||||
null -> TODO()
|
||||
}
|
||||
val sslJdbcParameters = jdbcEncryption.parseSSLConfig()
|
||||
jdbcProperties.putAll(sslJdbcParameters)
|
||||
|
||||
val cursorConfig = pojo.getCursorMethodConfigurationValue()
|
||||
val maxSnapshotReadTime: Duration? =
|
||||
when (cursorConfig is CdcCursor) {
|
||||
true -> cursorConfig.initialLoadTimeoutHours?.let { Duration.ofHours(it.toLong()) }
|
||||
else -> null
|
||||
}
|
||||
// Build JDBC URL
|
||||
// Configure SSH tunneling.
|
||||
val sshTunnel: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
|
||||
val sshOpts: SshConnectionOptions =
|
||||
SshConnectionOptions.fromAdditionalProperties(pojo.getAdditionalProperties())
|
||||
|
||||
// Configure SSL encryption.
|
||||
if (
|
||||
pojo.getEncryptionValue() is EncryptionPreferred &&
|
||||
sshTunnel is SshNoTunnelMethod &&
|
||||
featureFlags.contains(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT)
|
||||
) {
|
||||
throw ConfigErrorException(
|
||||
"Connection from Airbyte Cloud requires SSL encryption or an SSH tunnel."
|
||||
)
|
||||
}
|
||||
val sslJdbcProperties: Map<String, String> = fromEncryptionSpec(pojo.getEncryptionValue()!!)
|
||||
jdbcProperties.putAll(sslJdbcProperties)
|
||||
|
||||
// Configure cursor.
|
||||
val incremental: IncrementalConfiguration = fromIncrementalSpec(pojo.getIncrementalValue())
|
||||
|
||||
// Build JDBC URL.
|
||||
val address = "%s:%d"
|
||||
val jdbcUrlFmt = "jdbc:mysql://${address}"
|
||||
jdbcProperties["useCursorFetch"] = "true"
|
||||
jdbcProperties["sessionVariables"] = "autocommit=0"
|
||||
val sshOpts = SshConnectionOptions.fromAdditionalProperties(pojo.getAdditionalProperties())
|
||||
|
||||
// Internal configuration settings.
|
||||
val checkpointTargetInterval: Duration =
|
||||
Duration.ofSeconds(pojo.checkpointTargetIntervalSeconds?.toLong() ?: 0)
|
||||
if (!checkpointTargetInterval.isPositive) {
|
||||
@@ -161,24 +144,7 @@ class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
|
||||
if ((pojo.concurrency ?: 0) <= 0) {
|
||||
throw ConfigErrorException("Concurrency setting should be positive")
|
||||
}
|
||||
val incrementalConfiguration: IncrementalConfiguration =
|
||||
when (val incPojo = pojo.getCursorMethodConfigurationValue()) {
|
||||
UserDefinedCursor -> UserDefinedCursorIncrementalConfiguration
|
||||
is CdcCursor ->
|
||||
CdcIncrementalConfiguration(
|
||||
initialWaitDuration =
|
||||
Duration.ofSeconds(incPojo.initialWaitTimeInSeconds!!.toLong()),
|
||||
initialLoadTimeout =
|
||||
Duration.ofHours(incPojo.initialLoadTimeoutHours!!.toLong()),
|
||||
serverTimezone = incPojo.serverTimezone,
|
||||
invalidCdcCursorPositionBehavior =
|
||||
if (incPojo.invalidCdcCursorPositionBehavior == "Fail sync") {
|
||||
InvalidCdcCursorPositionBehavior.FAIL_SYNC
|
||||
} else {
|
||||
InvalidCdcCursorPositionBehavior.RESET_SYNC
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
return MySqlSourceConfiguration(
|
||||
realHost = realHost,
|
||||
realPort = realPort,
|
||||
@@ -187,11 +153,133 @@ class MySqlSourceConfigurationFactory @Inject constructor(val featureFlags: Set<
|
||||
jdbcUrlFmt = jdbcUrlFmt,
|
||||
jdbcProperties = jdbcProperties,
|
||||
namespaces = setOf(pojo.database),
|
||||
incrementalConfiguration = incrementalConfiguration,
|
||||
incrementalConfiguration = incremental,
|
||||
checkpointTargetInterval = checkpointTargetInterval,
|
||||
maxConcurrency = maxConcurrency,
|
||||
checkPrivileges = pojo.checkPrivileges ?: true,
|
||||
maxSnapshotReadDuration = maxSnapshotReadTime
|
||||
)
|
||||
}
|
||||
|
||||
private fun fromIncrementalSpec(
|
||||
incrementalSpec: IncrementalConfigurationSpecification
|
||||
): IncrementalConfiguration =
|
||||
when (incrementalSpec) {
|
||||
UserDefinedCursor -> UserDefinedCursorIncrementalConfiguration
|
||||
is Cdc -> {
|
||||
val initialLoadTimeout: Duration =
|
||||
Duration.ofHours(incrementalSpec.initialLoadTimeoutHours!!.toLong())
|
||||
val invalidCdcCursorPositionBehavior: InvalidCdcCursorPositionBehavior =
|
||||
if (incrementalSpec.invalidCdcCursorPositionBehavior == "Fail sync") {
|
||||
InvalidCdcCursorPositionBehavior.FAIL_SYNC
|
||||
} else {
|
||||
InvalidCdcCursorPositionBehavior.RESET_SYNC
|
||||
}
|
||||
CdcIncrementalConfiguration(
|
||||
initialLoadTimeout,
|
||||
incrementalSpec.serverTimezone,
|
||||
invalidCdcCursorPositionBehavior,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun fromEncryptionSpec(encryptionSpec: EncryptionSpecification): Map<String, String> {
|
||||
val extraJdbcProperties: MutableMap<String, String> = mutableMapOf()
|
||||
val sslData: SslData =
|
||||
when (encryptionSpec) {
|
||||
is EncryptionPreferred -> SslData("preferred")
|
||||
is EncryptionRequired -> SslData("required")
|
||||
is SslVerifyCertificate ->
|
||||
SslData(
|
||||
mode = "verify_ca",
|
||||
caCertificate = encryptionSpec.sslCertificate,
|
||||
clientCertificate = encryptionSpec.sslClientCertificate,
|
||||
clientKey = encryptionSpec.sslClientKey,
|
||||
keyStorePassword = encryptionSpec.sslClientPassword,
|
||||
)
|
||||
is SslVerifyIdentity ->
|
||||
SslData(
|
||||
mode = "verify_identity",
|
||||
caCertificate = encryptionSpec.sslCertificate,
|
||||
clientCertificate = encryptionSpec.sslClientCertificate,
|
||||
clientKey = encryptionSpec.sslClientKey,
|
||||
keyStorePassword = encryptionSpec.sslClientPassword,
|
||||
)
|
||||
}
|
||||
extraJdbcProperties[SSL_MODE] = sslData.mode
|
||||
if (sslData.caCertificate.isNullOrBlank()) {
|
||||
// if CA cert is not available - done
|
||||
return extraJdbcProperties
|
||||
}
|
||||
val password: String =
|
||||
sslData.keyStorePassword.takeUnless { it.isNullOrBlank() }
|
||||
?: UUID.randomUUID().toString()
|
||||
// Make keystore for CA cert with given password or generate a new password.
|
||||
val caCertKeyStoreUrl: URL =
|
||||
buildKeyStore("trust") {
|
||||
SSLCertificateUtils.keyStoreFromCertificate(
|
||||
sslData.caCertificate,
|
||||
password,
|
||||
FileSystems.getDefault(),
|
||||
directory = "",
|
||||
)
|
||||
}
|
||||
extraJdbcProperties[TRUST_KEY_STORE_URL] = caCertKeyStoreUrl.toString()
|
||||
extraJdbcProperties[TRUST_KEY_STORE_PASS] = password
|
||||
extraJdbcProperties[TRUST_KEY_STORE_TYPE] = KEY_STORE_TYPE_PKCS12
|
||||
|
||||
if (sslData.clientCertificate.isNullOrBlank() || sslData.clientKey.isNullOrBlank()) {
|
||||
// if Client cert is not available - done
|
||||
return extraJdbcProperties
|
||||
}
|
||||
// Make keystore for Client cert with given password or generate a new password.
|
||||
val clientCertKeyStoreUrl: URL =
|
||||
buildKeyStore("client") {
|
||||
SSLCertificateUtils.keyStoreFromClientCertificate(
|
||||
sslData.clientCertificate,
|
||||
sslData.clientKey,
|
||||
password,
|
||||
directory = ""
|
||||
)
|
||||
}
|
||||
extraJdbcProperties[CLIENT_KEY_STORE_URL] = clientCertKeyStoreUrl.toString()
|
||||
extraJdbcProperties[CLIENT_KEY_STORE_PASS] = password
|
||||
extraJdbcProperties[CLIENT_KEY_STORE_TYPE] = KEY_STORE_TYPE_PKCS12
|
||||
return extraJdbcProperties
|
||||
}
|
||||
|
||||
private data class SslData(
|
||||
val mode: String,
|
||||
val caCertificate: String? = null,
|
||||
val clientCertificate: String? = null,
|
||||
val clientKey: String? = null,
|
||||
val keyStorePassword: String? = null,
|
||||
)
|
||||
|
||||
private fun buildKeyStore(kind: String, uriSupplier: () -> URI): URL {
|
||||
val keyStoreUri: URI =
|
||||
try {
|
||||
uriSupplier()
|
||||
} catch (ex: Exception) {
|
||||
throw ConfigErrorException("Failed to create keystore for $kind certificate", ex)
|
||||
}
|
||||
val keyStoreUrl: URL =
|
||||
try {
|
||||
keyStoreUri.toURL()
|
||||
} catch (ex: MalformedURLException) {
|
||||
throw ConfigErrorException("Unable to get a URL for $kind key store", ex)
|
||||
}
|
||||
log.debug { "URL for $kind certificate keystore is $keyStoreUrl" }
|
||||
return keyStoreUrl
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl"
|
||||
const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword"
|
||||
const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl"
|
||||
const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword"
|
||||
const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType"
|
||||
const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType"
|
||||
const val KEY_STORE_TYPE_PKCS12: String = "PKCS12"
|
||||
const val SSL_MODE: String = "sslMode"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,12 +86,12 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
|
||||
|
||||
@JsonIgnore
|
||||
@ConfigurationBuilder(configurationPrefix = "ssl_mode")
|
||||
var encryption = MicronautPropertiesFriendlyEncryption()
|
||||
var encryption = MicronautPropertiesFriendlyEncryptionSpecification()
|
||||
|
||||
@JsonIgnore var encryptionJson: Encryption? = null
|
||||
@JsonIgnore var encryptionJson: EncryptionSpecification? = null
|
||||
|
||||
@JsonSetter("ssl_mode")
|
||||
fun setEncryptionValue(value: Encryption) {
|
||||
fun setEncryptionValue(value: EncryptionSpecification) {
|
||||
encryptionJson = value
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
|
||||
"The encryption method with is used when communicating with the database.",
|
||||
)
|
||||
@JsonSchemaInject(json = """{"order":8}""")
|
||||
fun getEncryptionValue(): Encryption? = encryptionJson ?: encryption.asEncryption()
|
||||
fun getEncryptionValue(): EncryptionSpecification? = encryptionJson ?: encryption.asEncryption()
|
||||
|
||||
@JsonIgnore
|
||||
@ConfigurationBuilder(configurationPrefix = "tunnel_method")
|
||||
@@ -126,12 +126,12 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
|
||||
|
||||
@JsonIgnore
|
||||
@ConfigurationBuilder(configurationPrefix = "replication_method")
|
||||
var replicationMethod = MicronautPropertiesFriendlyCursorMethodConfiguration()
|
||||
var replicationMethod = MicronautPropertiesFriendlyIncrementalConfigurationSpecification()
|
||||
|
||||
@JsonIgnore var replicationMethodJson: CursorMethodConfiguration? = null
|
||||
@JsonIgnore var replicationMethodJson: IncrementalConfigurationSpecification? = null
|
||||
|
||||
@JsonSetter("replication_method")
|
||||
fun setMethodValue(value: CursorMethodConfiguration) {
|
||||
fun setIncrementalValue(value: IncrementalConfigurationSpecification) {
|
||||
replicationMethodJson = value
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
|
||||
@JsonSchemaTitle("Update Method")
|
||||
@JsonPropertyDescription("Configures how data is extracted from the database.")
|
||||
@JsonSchemaInject(json = """{"order":10,"display_type":"radio"}""")
|
||||
fun getCursorMethodConfigurationValue(): CursorMethodConfiguration =
|
||||
fun getIncrementalValue(): IncrementalConfigurationSpecification =
|
||||
replicationMethodJson ?: replicationMethod.asCursorMethodConfiguration()
|
||||
|
||||
@JsonProperty("checkpoint_target_interval_seconds")
|
||||
@@ -191,26 +191,26 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
|
||||
)
|
||||
@JsonSchemaTitle("Encryption")
|
||||
@JsonSchemaDescription("The encryption method which is used when communicating with the database.")
|
||||
sealed interface Encryption
|
||||
sealed interface EncryptionSpecification
|
||||
|
||||
@JsonSchemaTitle("preferred")
|
||||
@JsonSchemaDescription(
|
||||
"To allow unencrypted communication only when the source doesn't support encryption.",
|
||||
)
|
||||
data object EncryptionPreferred : Encryption
|
||||
data object EncryptionPreferred : EncryptionSpecification
|
||||
|
||||
@JsonSchemaTitle("required")
|
||||
@JsonSchemaDescription(
|
||||
"To always require encryption. Note: The connection will fail if the source doesn't support encryption.",
|
||||
)
|
||||
data object EncryptionRequired : Encryption
|
||||
data object EncryptionRequired : EncryptionSpecification
|
||||
|
||||
@JsonSchemaTitle("verify_ca")
|
||||
@JsonSchemaDescription(
|
||||
"To always require encryption and verify that the source has a valid SSL certificate."
|
||||
)
|
||||
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
|
||||
class SslVerifyCertificate : Encryption {
|
||||
class SslVerifyCertificate : EncryptionSpecification {
|
||||
@JsonProperty("ca_certificate", required = true)
|
||||
@JsonSchemaTitle("CA certificate")
|
||||
@JsonPropertyDescription(
|
||||
@@ -249,7 +249,7 @@ class SslVerifyCertificate : Encryption {
|
||||
"To always require encryption and verify that the source has a valid SSL certificate."
|
||||
)
|
||||
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
|
||||
class SslVerifyIdentity : Encryption {
|
||||
class SslVerifyIdentity : EncryptionSpecification {
|
||||
@JsonProperty("ca_certificate", required = true)
|
||||
@JsonSchemaTitle("CA certificate")
|
||||
@JsonPropertyDescription(
|
||||
@@ -284,12 +284,12 @@ class SslVerifyIdentity : Encryption {
|
||||
}
|
||||
|
||||
@ConfigurationProperties("$CONNECTOR_CONFIG_PREFIX.ssl_mode")
|
||||
class MicronautPropertiesFriendlyEncryption {
|
||||
class MicronautPropertiesFriendlyEncryptionSpecification {
|
||||
var mode: String = "preferred"
|
||||
var sslCertificate: String? = null
|
||||
|
||||
@JsonValue
|
||||
fun asEncryption(): Encryption =
|
||||
fun asEncryption(): EncryptionSpecification =
|
||||
when (mode) {
|
||||
"preferred" -> EncryptionPreferred
|
||||
"required" -> EncryptionRequired
|
||||
@@ -302,11 +302,11 @@ class MicronautPropertiesFriendlyEncryption {
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "method")
|
||||
@JsonSubTypes(
|
||||
JsonSubTypes.Type(value = UserDefinedCursor::class, name = "STANDARD"),
|
||||
JsonSubTypes.Type(value = CdcCursor::class, name = "CDC")
|
||||
JsonSubTypes.Type(value = Cdc::class, name = "CDC")
|
||||
)
|
||||
@JsonSchemaTitle("Update Method")
|
||||
@JsonSchemaDescription("Configures how data is extracted from the database.")
|
||||
sealed interface CursorMethodConfiguration
|
||||
sealed interface IncrementalConfigurationSpecification
|
||||
|
||||
@JsonSchemaTitle("Scan Changes with User Defined Cursor")
|
||||
@JsonSchemaDescription(
|
||||
@@ -315,7 +315,7 @@ sealed interface CursorMethodConfiguration
|
||||
"#user-defined-cursor\">cursor column</a> chosen when configuring a connection " +
|
||||
"(e.g. created_at, updated_at).",
|
||||
)
|
||||
data object UserDefinedCursor : CursorMethodConfiguration
|
||||
data object UserDefinedCursor : IncrementalConfigurationSpecification
|
||||
|
||||
@JsonSchemaTitle("Read Changes using Change Data Capture (CDC)")
|
||||
@JsonSchemaDescription(
|
||||
@@ -324,24 +324,13 @@ data object UserDefinedCursor : CursorMethodConfiguration
|
||||
"\"https://docs.airbyte.com/integrations/sources/mssql/#change-data-capture-cdc\"" +
|
||||
"> change data capture feature</a>. This must be enabled on your database.",
|
||||
)
|
||||
class CdcCursor : CursorMethodConfiguration {
|
||||
@JsonProperty("initial_waiting_seconds")
|
||||
@JsonSchemaTitle("Initial Waiting Time in Seconds (Advanced)")
|
||||
@JsonSchemaDefault("300")
|
||||
@JsonPropertyDescription(
|
||||
"The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\" +\n" +
|
||||
" \"\\\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\\\"\" +\n" +
|
||||
" \"> initial waiting time</a>.",
|
||||
)
|
||||
@JsonSchemaInject(json = """{"order":1, "max": 1200, "min": 120, "always_show": true}""")
|
||||
var initialWaitTimeInSeconds: Int? = 300
|
||||
|
||||
class Cdc : IncrementalConfigurationSpecification {
|
||||
@JsonProperty("server_timezone")
|
||||
@JsonSchemaTitle("Configured server timezone for the MySQL source (Advanced)")
|
||||
@JsonPropertyDescription(
|
||||
"Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
|
||||
)
|
||||
@JsonSchemaInject(json = """{"order":2,"always_show":true}""")
|
||||
@JsonSchemaInject(json = """{"order":1,"always_show":true}""")
|
||||
var serverTimezone: String? = null
|
||||
|
||||
@JsonProperty("invalid_cdc_cursor_position_behavior")
|
||||
@@ -351,7 +340,7 @@ class CdcCursor : CursorMethodConfiguration {
|
||||
)
|
||||
@JsonSchemaDefault("Fail sync")
|
||||
@JsonSchemaInject(
|
||||
json = """{"order":3,"always_show":true, "enum": ["Fail sync","Re-sync data"]}"""
|
||||
json = """{"order":2,"always_show":true, "enum": ["Fail sync","Re-sync data"]}"""
|
||||
)
|
||||
var invalidCdcCursorPositionBehavior: String? = "Fail sync"
|
||||
|
||||
@@ -361,18 +350,18 @@ class CdcCursor : CursorMethodConfiguration {
|
||||
"The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
|
||||
)
|
||||
@JsonSchemaDefault("8")
|
||||
@JsonSchemaInject(json = """{"order":4, "max": 24, "min": 4,"always_show": true}""")
|
||||
@JsonSchemaInject(json = """{"order":3, "max": 24, "min": 4,"always_show": true}""")
|
||||
var initialLoadTimeoutHours: Int? = 8
|
||||
}
|
||||
|
||||
@ConfigurationProperties("$CONNECTOR_CONFIG_PREFIX.replication_method")
|
||||
class MicronautPropertiesFriendlyCursorMethodConfiguration {
|
||||
class MicronautPropertiesFriendlyIncrementalConfigurationSpecification {
|
||||
var method: String = "STANDARD"
|
||||
|
||||
fun asCursorMethodConfiguration(): CursorMethodConfiguration =
|
||||
fun asCursorMethodConfiguration(): IncrementalConfigurationSpecification =
|
||||
when (method) {
|
||||
"STANDARD" -> UserDefinedCursor
|
||||
"CDC" -> CdcCursor()
|
||||
"CDC" -> Cdc()
|
||||
else -> throw ConfigErrorException("invalid value $method")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,143 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.integrations.source.mysql
|
||||
|
||||
import io.airbyte.cdk.ConfigErrorException
|
||||
import io.airbyte.cdk.jdbc.SSLCertificateUtils
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import java.net.MalformedURLException
|
||||
import java.net.URI
|
||||
import java.nio.file.FileSystems
|
||||
import java.util.UUID
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
class MySqlSourceEncryption(
|
||||
val sslMode: SslMode = SslMode.PREFERRED,
|
||||
val caCertificate: String? = null,
|
||||
val clientCertificate: String? = null,
|
||||
val clientKey: String? = null,
|
||||
val clientKeyPassword: String? = null,
|
||||
) {
|
||||
|
||||
/**
|
||||
* Enum representing the SSL mode for MySQL connections. The actual jdbc property name is the
|
||||
* lower case of the enum name.
|
||||
*/
|
||||
enum class SslMode {
|
||||
PREFERRED,
|
||||
REQUIRED,
|
||||
VERIFY_CA,
|
||||
VERIFY_IDENTITY,
|
||||
}
|
||||
|
||||
fun parseSSLConfig(): Map<String, String> {
|
||||
var caCertKeyStorePair: Pair<URI, String>?
|
||||
var clientCertKeyStorePair: Pair<URI, String>?
|
||||
val additionalParameters: MutableMap<String, String> = mutableMapOf()
|
||||
|
||||
additionalParameters[SSL_MODE] = sslMode.name.lowercase()
|
||||
|
||||
caCertKeyStorePair = prepareCACertificateKeyStore()
|
||||
|
||||
if (null != caCertKeyStorePair) {
|
||||
log.debug { "uri for ca cert keystore: ${caCertKeyStorePair.first}" }
|
||||
try {
|
||||
additionalParameters.putAll(
|
||||
mapOf(
|
||||
TRUST_KEY_STORE_URL to caCertKeyStorePair.first.toURL().toString(),
|
||||
TRUST_KEY_STORE_PASS to caCertKeyStorePair.second,
|
||||
TRUST_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12
|
||||
)
|
||||
)
|
||||
} catch (e: MalformedURLException) {
|
||||
throw ConfigErrorException("Unable to get a URL for trust key store")
|
||||
}
|
||||
|
||||
clientCertKeyStorePair = prepareClientCertificateKeyStore()
|
||||
|
||||
if (null != clientCertKeyStorePair) {
|
||||
log.debug {
|
||||
"uri for client cert keystore: ${clientCertKeyStorePair.first} / ${clientCertKeyStorePair.second}"
|
||||
}
|
||||
try {
|
||||
additionalParameters.putAll(
|
||||
mapOf(
|
||||
CLIENT_KEY_STORE_URL to clientCertKeyStorePair.first.toURL().toString(),
|
||||
CLIENT_KEY_STORE_PASS to clientCertKeyStorePair.second,
|
||||
CLIENT_KEY_STORE_TYPE to KEY_STORE_TYPE_PKCS12
|
||||
)
|
||||
)
|
||||
} catch (e: MalformedURLException) {
|
||||
throw ConfigErrorException("Unable to get a URL for client key store")
|
||||
}
|
||||
}
|
||||
}
|
||||
return additionalParameters
|
||||
}
|
||||
|
||||
private fun getOrGeneratePassword(): String {
|
||||
if (!clientKeyPassword.isNullOrEmpty()) {
|
||||
return clientKeyPassword
|
||||
} else {
|
||||
return UUID.randomUUID().toString()
|
||||
}
|
||||
}
|
||||
|
||||
private fun prepareCACertificateKeyStore(): Pair<URI, String>? {
|
||||
// if config is not available - done
|
||||
// if has CA cert - make keystore with given password or generate a new password.
|
||||
var caCertKeyStorePair: Pair<URI, String>? = null
|
||||
|
||||
if (caCertificate.isNullOrEmpty()) {
|
||||
return caCertKeyStorePair
|
||||
}
|
||||
val clientKeyPassword = getOrGeneratePassword()
|
||||
try {
|
||||
val caCertKeyStoreUri =
|
||||
SSLCertificateUtils.keyStoreFromCertificate(
|
||||
caCertificate,
|
||||
clientKeyPassword,
|
||||
FileSystems.getDefault(),
|
||||
""
|
||||
)
|
||||
return Pair(caCertKeyStoreUri, clientKeyPassword)
|
||||
} catch (ex: Exception) {
|
||||
throw ConfigErrorException("Failed to create keystore for CA certificate.", ex)
|
||||
}
|
||||
}
|
||||
|
||||
private fun prepareClientCertificateKeyStore(): Pair<URI, String>? {
|
||||
var clientCertKeyStorePair: Pair<URI, String>? = null
|
||||
|
||||
if (!clientCertificate.isNullOrEmpty() && !clientKey.isNullOrEmpty()) {
|
||||
val clientKeyPassword = getOrGeneratePassword()
|
||||
try {
|
||||
val clientCertKeyStoreUri =
|
||||
SSLCertificateUtils.keyStoreFromClientCertificate(
|
||||
clientCertificate,
|
||||
clientKey,
|
||||
clientKeyPassword,
|
||||
""
|
||||
)
|
||||
clientCertKeyStorePair = Pair(clientCertKeyStoreUri, clientKeyPassword)
|
||||
} catch (ex: Exception) {
|
||||
throw RuntimeException("Failed to create keystore for Client certificate", ex)
|
||||
}
|
||||
}
|
||||
return clientCertKeyStorePair
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val TRUST_KEY_STORE_URL: String = "trustCertificateKeyStoreUrl"
|
||||
const val TRUST_KEY_STORE_PASS: String = "trustCertificateKeyStorePassword"
|
||||
const val CLIENT_KEY_STORE_URL: String = "clientCertificateKeyStoreUrl"
|
||||
const val CLIENT_KEY_STORE_PASS: String = "clientCertificateKeyStorePassword"
|
||||
const val CLIENT_KEY_STORE_TYPE: String = "clientCertificateKeyStoreType"
|
||||
const val TRUST_KEY_STORE_TYPE: String = "trustCertificateKeyStoreType"
|
||||
const val KEY_STORE_TYPE_PKCS12: String = "PKCS12"
|
||||
const val SSL_MODE: String = "sslMode"
|
||||
}
|
||||
}
|
||||
@@ -79,7 +79,7 @@ object MySqlContainerFactory {
|
||||
database = "test"
|
||||
checkpointTargetIntervalSeconds = 60
|
||||
concurrency = 1
|
||||
setMethodValue(UserDefinedCursor)
|
||||
setIncrementalValue(UserDefinedCursor)
|
||||
}
|
||||
|
||||
fun MySQLContainer<*>.execAsRoot(sql: String) {
|
||||
|
||||
@@ -51,7 +51,7 @@ class MySqlSourceCdcIntegrationTest {
|
||||
{
|
||||
val invalidConfig: MySqlSourceConfigurationSpecification =
|
||||
MySqlContainerFactory.config(nonCdcDbContainer).apply {
|
||||
setMethodValue(CdcCursor())
|
||||
setIncrementalValue(Cdc())
|
||||
}
|
||||
|
||||
val nonCdcConnectionFactory =
|
||||
@@ -109,7 +109,7 @@ class MySqlSourceCdcIntegrationTest {
|
||||
lateinit var dbContainer: MySQLContainer<*>
|
||||
|
||||
fun config(): MySqlSourceConfigurationSpecification =
|
||||
MySqlContainerFactory.config(dbContainer).apply { setMethodValue(CdcCursor()) }
|
||||
MySqlContainerFactory.config(dbContainer).apply { setIncrementalValue(Cdc()) }
|
||||
|
||||
val connectionFactory: JdbcConnectionFactory by lazy {
|
||||
JdbcConnectionFactory(MySqlSourceConfigurationFactory().make(config()))
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.junit.jupiter.api.Test
|
||||
|
||||
@MicronautTest(environments = [Environment.TEST], rebuildContext = true)
|
||||
class MySqlSourceConfigurationSpecificationTest {
|
||||
|
||||
@Inject
|
||||
lateinit var supplier: ConfigurationSpecificationSupplier<MySqlSourceConfigurationSpecification>
|
||||
|
||||
@@ -31,8 +32,8 @@ class MySqlSourceConfigurationSpecificationTest {
|
||||
Assertions.assertEquals("FOO", pojo.username)
|
||||
Assertions.assertEquals("BAR", pojo.password)
|
||||
Assertions.assertEquals("SYSTEM", pojo.database)
|
||||
val encryption: Encryption? = pojo.getEncryptionValue()
|
||||
Assertions.assertTrue(encryption is EncryptionPreferred, encryption!!::class.toString())
|
||||
val encryption: EncryptionSpecification = pojo.getEncryptionValue()!!
|
||||
Assertions.assertTrue(encryption is EncryptionPreferred, encryption::class.toString())
|
||||
val tunnelMethod: SshTunnelMethodConfiguration? = pojo.getTunnelMethodValue()
|
||||
Assertions.assertTrue(
|
||||
tunnelMethod is SshPasswordAuthTunnelMethod,
|
||||
|
||||
@@ -87,7 +87,6 @@ class MySqlSourceConfigurationTest {
|
||||
|
||||
val cdcCursor = config.incrementalConfiguration as CdcIncrementalConfiguration
|
||||
|
||||
Assertions.assertEquals(cdcCursor.initialWaitDuration, Duration.ofSeconds(301))
|
||||
Assertions.assertEquals(cdcCursor.initialLoadTimeout, Duration.ofHours(9))
|
||||
Assertions.assertEquals(
|
||||
cdcCursor.invalidCdcCursorPositionBehavior,
|
||||
|
||||
@@ -213,14 +213,14 @@ class MySqlSourceCursorBasedIntegrationTest {
|
||||
"type": "STREAM",
|
||||
"stream": {
|
||||
"stream_descriptor": {
|
||||
"name": "${tableName}",
|
||||
"name": "$tableName",
|
||||
"namespace": "test"
|
||||
},
|
||||
"stream_state": {
|
||||
"cursor": "10",
|
||||
"version": 2,
|
||||
"state_type": "cursor_based",
|
||||
"stream_name": "${tableName}",
|
||||
"stream_name": "$tableName",
|
||||
"cursor_field": [
|
||||
"k"
|
||||
],
|
||||
@@ -238,13 +238,13 @@ class MySqlSourceCursorBasedIntegrationTest {
|
||||
"type": "STREAM",
|
||||
"stream": {
|
||||
"stream_descriptor": {
|
||||
"name": "${tableName}",
|
||||
"name": "$tableName",
|
||||
"namespace": "test"
|
||||
},
|
||||
"stream_state": {
|
||||
"version": 2,
|
||||
"state_type": "cursor_based",
|
||||
"stream_name": "${tableName}",
|
||||
"stream_name": "$tableName",
|
||||
"cursor_field": [
|
||||
"k"
|
||||
],
|
||||
|
||||
@@ -56,12 +56,12 @@ object MySqlSourceDatatypeTestOperations :
|
||||
override fun streamConfigSpec(
|
||||
container: MySQLContainer<*>
|
||||
): MySqlSourceConfigurationSpecification =
|
||||
MySqlContainerFactory.config(container).also { it.setMethodValue(UserDefinedCursor) }
|
||||
MySqlContainerFactory.config(container).also { it.setIncrementalValue(UserDefinedCursor) }
|
||||
|
||||
override fun globalConfigSpec(
|
||||
container: MySQLContainer<*>
|
||||
): MySqlSourceConfigurationSpecification =
|
||||
MySqlContainerFactory.config(container).also { it.setMethodValue(CdcCursor()) }
|
||||
MySqlContainerFactory.config(container).also { it.setIncrementalValue(Cdc()) }
|
||||
|
||||
override val configFactory: MySqlSourceConfigurationFactory = MySqlSourceConfigurationFactory()
|
||||
|
||||
|
||||
@@ -117,9 +117,9 @@ class MySqlSourceJdbcPartitionFactoryTest {
|
||||
database = "localhost"
|
||||
}
|
||||
if (global) {
|
||||
configSpec.setMethodValue(CdcCursor())
|
||||
configSpec.setIncrementalValue(Cdc())
|
||||
} else {
|
||||
configSpec.setMethodValue(UserDefinedCursor)
|
||||
configSpec.setIncrementalValue(UserDefinedCursor)
|
||||
}
|
||||
val configFactory = MySqlSourceConfigurationFactory()
|
||||
val configuration = configFactory.make(configSpec)
|
||||
|
||||
@@ -88,26 +88,16 @@
|
||||
"description": "The amount of time an initial load is allowed to continue for before catching up on CDC logs.",
|
||||
"max": 24,
|
||||
"min": 4,
|
||||
"order": 4,
|
||||
"order": 3,
|
||||
"title": "Initial Load Timeout in Hours (Advanced)",
|
||||
"type": "integer"
|
||||
},
|
||||
"initial_waiting_seconds": {
|
||||
"always_show": true,
|
||||
"default": 300,
|
||||
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\" +\n \"\\\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\\\"\" +\n \"> initial waiting time</a>.",
|
||||
"max": 1200,
|
||||
"min": 120,
|
||||
"order": 1,
|
||||
"title": "Initial Waiting Time in Seconds (Advanced)",
|
||||
"type": "integer"
|
||||
},
|
||||
"invalid_cdc_cursor_position_behavior": {
|
||||
"always_show": true,
|
||||
"default": "Fail sync",
|
||||
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
|
||||
"enum": ["Fail sync", "Re-sync data"],
|
||||
"order": 3,
|
||||
"order": 2,
|
||||
"title": "Configured server timezone for the MySQL source (Advanced)",
|
||||
"type": "string"
|
||||
},
|
||||
@@ -119,7 +109,7 @@
|
||||
"server_timezone": {
|
||||
"always_show": true,
|
||||
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
|
||||
"order": 2,
|
||||
"order": 1,
|
||||
"title": "Configured server timezone for the MySQL source (Advanced)",
|
||||
"type": "string"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user