1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[cdk, source-postgres, source-mysql] a new error handling and translation framework (#40208)

Fixes airbytehq/airbyte-internal-issues#8516

This set of changes mainly moves error translation to be part of each connector.

In general, each connector needs to implement its own error translation class that inherits from the abstract class ConnectorExceptionTranslator, which is part of the CDK. By implementing, it means the connector developer or our support will populate the error dictionary with error samples with matching rules (e.g., regex). See the example we created for the Postgres source.
This commit is contained in:
Yue Li
2024-07-17 17:19:13 -07:00
committed by GitHub
parent 824b79c6f0
commit 4417769038
20 changed files with 781 additions and 648 deletions

View File

@@ -6,9 +6,9 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.41.8'
cdkVersionRequired = '0.42.1'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}
java {
@@ -46,3 +46,9 @@ jsonSchema2Pojo {
includeConstructors = false
includeSetters = true
}
compileKotlin {
dependsOn {
generateJsonSchema2Pojo
}
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.5.1
dockerImageTag: 3.6.0
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql

View File

@@ -642,8 +642,9 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
public static void main(final String[] args) throws Exception {
final Source source = MySqlSource.sshWrappedSource(new MySqlSource());
final MySqlSourceExceptionHandler exceptionHandler = new MySqlSourceExceptionHandler();
LOGGER.info("starting source: {}", MySqlSource.class);
new IntegrationRunner(source).run(args);
new IntegrationRunner(source).run(args, exceptionHandler);
LOGGER.info("completed source: {}", MySqlSource.class);
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.mysql
import io.airbyte.cdk.integrations.util.ConnectorErrorProfile
import io.airbyte.cdk.integrations.util.ConnectorExceptionHandler
import io.airbyte.cdk.integrations.util.FailureType
class MySqlSourceExceptionHandler : ConnectorExceptionHandler() {
override fun initializeErrorDictionary() {
// adding common error profiles
super.initializeErrorDictionary()
// adding connector specific error profiles
add(
ConnectorErrorProfile(
errorClass = "MySQL Syntax Exception",
regexMatchingPattern = ".*unknown column '.+' in 'field list'.*",
failureType = FailureType.CONFIG,
externalMessage =
"A column needed by MySQL source connector is missing in the database",
sampleInternalMessage = "Unknown column 'X' in 'field list'"
)
)
add(
ConnectorErrorProfile(
errorClass = "MySQL EOF Exception",
regexMatchingPattern =
".*can not read response from server. expected to read [1-9]\\d* bytes.*",
failureType = FailureType.TRANSIENT,
externalMessage = "Can not read data from MySQL server",
sampleInternalMessage =
"java.io.EOFException: Can not read response from server. Expected to read X bytes, read Y bytes before connection was unexpectedly lost."
)
)
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
import io.airbyte.integrations.source.mysql.MySqlSourceExceptionHandler
import java.io.EOFException
import java.sql.SQLSyntaxErrorException
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
class MySqlSourceExceptionHandlerTest {
private var exceptionHandler: MySqlSourceExceptionHandler? = null
@BeforeEach
fun setUp() {
exceptionHandler = MySqlSourceExceptionHandler()
}
@Test
fun testTranslateMySQLSyntaxException() {
val exception = SQLSyntaxErrorException("Unknown column 'xmin' in 'field list'")
val externalMessage = exceptionHandler!!.getExternalMessage(exception)
Assertions.assertEquals(
"A column needed by MySQL source connector is missing in the database",
externalMessage
)
}
@Test
fun testTranslateEOFException() {
val exception =
EOFException(
"Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost."
)
val externalMessage = exceptionHandler!!.getExternalMessage(exception)
Assertions.assertEquals("Can not read data from MySQL server", externalMessage)
}
}