1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Destination bigquery + Snowflake: Further error grouping improvements (#32190)

Co-authored-by: edgao <edgao@users.noreply.github.com>
This commit is contained in:
Edward Gao
2023-11-06 09:25:06 -08:00
committed by GitHub
parent 4ec05e0e1e
commit c81abe51a8
15 changed files with 75 additions and 27 deletions

View File

@@ -156,6 +156,7 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.2 | 2023-11-06 | [\#32190](https://github.com/airbytehq/airbyte/pull/32190) | Improve error deinterpolation |
| 0.4.0 | 2023-11-02 | [\#32050](https://github.com/airbytehq/airbyte/pull/32050) | Add 's3-destinations' CDK module. |
| 0.3.0 | 2023-11-02 | [\#31983](https://github.com/airbytehq/airbyte/pull/31983) | Add deinterpolation feature to AirbyteExceptionHandler. |
| 0.2.4 | 2023-10-31 | [\#31807](https://github.com/airbytehq/airbyte/pull/31807) | Handle case of debezium update and delete of records in mongodb. |

View File

@@ -6,10 +6,8 @@ package io.airbyte.cdk.integrations.base;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
@@ -27,8 +25,8 @@ public class AirbyteExceptionHandler implements Thread.UncaughtExceptionHandler
// Basic deinterpolation helpers to avoid doing _really_ dumb deinterpolation.
// E.g. if "id" is in the list of strings to remove, we don't want to modify the message "Invalid
// identifier".
private static final String REGEX_PREFIX = "(^|\\W)";
private static final String REGEX_SUFFIX = "($|\\W)";
private static final String REGEX_PREFIX = "(^|[^A-Za-z0-9])";
private static final String REGEX_SUFFIX = "($|[^A-Za-z0-9])";
/**
* If this list is populated, then the exception handler will attempt to deinterpolate the error
@@ -44,7 +42,11 @@ public class AirbyteExceptionHandler implements Thread.UncaughtExceptionHandler
* </ol>
*/
@VisibleForTesting
static final List<String> STRINGS_TO_DEINTERPOLATE = new ArrayList<>();
static final Set<String> STRINGS_TO_DEINTERPOLATE = new HashSet<>();
static {
addCommonStringsToDeinterpolate();
}
@VisibleForTesting
static final Set<Class<? extends Throwable>> THROWABLES_TO_DEINTERPOLATE = new HashSet<>();
@@ -113,7 +115,9 @@ public class AirbyteExceptionHandler implements Thread.UncaughtExceptionHandler
}
public static void addStringForDeinterpolation(final String string) {
STRINGS_TO_DEINTERPOLATE.add(string);
if (string != null) {
STRINGS_TO_DEINTERPOLATE.add(string);
}
}
public static void addAllStringsInConfigForDeinterpolation(final JsonNode node) {
@@ -132,4 +136,15 @@ public class AirbyteExceptionHandler implements Thread.UncaughtExceptionHandler
System.exit(1);
}
@VisibleForTesting
static void addCommonStringsToDeinterpolate() {
// Add some common strings to deinterpolate, regardless of what the connector is doing
STRINGS_TO_DEINTERPOLATE.add("description");
STRINGS_TO_DEINTERPOLATE.add("id");
STRINGS_TO_DEINTERPOLATE.add("location");
STRINGS_TO_DEINTERPOLATE.add("name");
STRINGS_TO_DEINTERPOLATE.add("status");
STRINGS_TO_DEINTERPOLATE.add("type");
}
}

View File

@@ -1 +1 @@
version=0.4.1
version=0.4.2

View File

@@ -61,13 +61,16 @@ public class AirbyteExceptionHandlerTest {
AirbyteExceptionHandler.addStringForDeinterpolation("foo");
AirbyteExceptionHandler.addStringForDeinterpolation("bar");
runTestWithMessage("Error happened in foo.bar");
// foo and bar are added to the list explicitly
// name and description are added implicitly by the exception handler.
// all of them should be replaced by '?'
runTestWithMessage("Error happened in arst_foo_bar_zxcv (name: description)");
final AirbyteMessage traceMessage = findFirstTraceMessage();
assertAll(
() -> assertEquals(AirbyteTraceMessage.Type.ERROR, traceMessage.getTrace().getType()),
() -> assertEquals("Error happened in foo.bar", traceMessage.getTrace().getError().getMessage()),
() -> assertEquals("Error happened in ?.?", traceMessage.getTrace().getError().getInternalMessage()),
() -> assertEquals("Error happened in arst_foo_bar_zxcv (name: description)", traceMessage.getTrace().getError().getMessage()),
() -> assertEquals("Error happened in arst_?_?_zxcv (?: ?)", traceMessage.getTrace().getError().getInternalMessage()),
() -> assertEquals(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR, traceMessage.getTrace().getError().getFailureType()),
() -> Assertions.assertNull(traceMessage.getTrace().getError().getStackTrace(),
"Stacktrace should be null if deinterpolating the error message"));
@@ -169,14 +172,25 @@ public class AirbyteExceptionHandlerTest {
@AfterEach
public void teardown() {
System.setOut(originalOut);
AirbyteExceptionHandler.STRINGS_TO_DEINTERPOLATE.clear();
AirbyteExceptionHandler.addCommonStringsToDeinterpolate();
AirbyteExceptionHandler.THROWABLES_TO_DEINTERPOLATE.clear();
}
private AirbyteMessage findFirstTraceMessage() {
final Optional<AirbyteMessage> maybeTraceMessage = Arrays.stream(outContent.toString(StandardCharsets.UTF_8).split("\n"))
.map(line -> Jsons.deserialize(line, AirbyteMessage.class))
.filter(message -> message.getType() == AirbyteMessage.Type.TRACE)
.map(line -> {
// these tests sometimes emit non-json stdout (e.g. log4j warnings)
// so we try-catch to handle those malformed lines.
try {
return Jsons.deserialize(line, AirbyteMessage.class);
} catch (final Exception e) {
return null;
}
})
.filter(message -> message != null && message.getType() == AirbyteMessage.Type.TRACE)
.findFirst();
assertTrue(maybeTraceMessage.isPresent(), "Expected to find a trace message in stdout");
return maybeTraceMessage.get();

View File

@@ -4,7 +4,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.4.1'
cdkVersionRequired = '0.4.2'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.3.6
dockerImageTag: 2.3.7
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg

View File

@@ -239,6 +239,12 @@ public class BigQueryDestination extends BaseConnector implements Destination {
final TyperDeduper typerDeduper = buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
if (serviceAccountKey.isTextual()) {
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(Jsons.deserialize(serviceAccountKey.asText()));
} else {
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(serviceAccountKey);
}
if (uploadingMethod == UploadingMethod.STANDARD) {
LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " +

View File

@@ -35,6 +35,7 @@ import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
@@ -93,7 +94,9 @@ public class BigQueryUtils {
static Job waitForQuery(final Job queryJob) {
try {
return queryJob.waitFor();
final Job job = queryJob.waitFor();
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
return job;
} catch (final Exception e) {
LOGGER.error("Failed to wait for a query job:" + queryJob);
throw new RuntimeException(e);
@@ -443,6 +446,7 @@ public class BigQueryUtils {
public static void waitForJobFinish(final Job job) throws InterruptedException {
if (job != null) {
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
try {
LOGGER.info("Waiting for job finish {}. Status: {}", job, job.getStatus());
job.waitFor();

View File

@@ -19,6 +19,7 @@ import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.Streams;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.math.BigInteger;
@@ -102,6 +103,7 @@ public class BigQueryDestinationHandler implements DestinationHandler<TableDefin
* doesn't do a good job of inferring the query location. Pass it in explicitly.
*/
Job job = bq.create(JobInfo.of(JobId.newBuilder().setLocation(datasetLocation).build(), QueryJobConfiguration.newBuilder(sql).build()));
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
// job.waitFor() gets stuck forever in some failure cases, so manually poll the job instead.
while (!JobStatus.State.DONE.equals(job.getStatus().getState())) {
Thread.sleep(1000L);

View File

@@ -18,6 +18,7 @@ import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationWriter;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
@@ -107,7 +108,7 @@ public abstract class AbstractBigQueryUploader<T extends DestinationWriter> {
public void closeAfterPush() {
try {
this.writer.close(false);
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
@@ -234,6 +235,7 @@ public abstract class AbstractBigQueryUploader<T extends DestinationWriter> {
.build();
final Job job = bigQuery.create(JobInfo.of(configuration));
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
final ImmutablePair<Job, String> jobStringImmutablePair = BigQueryUtils.executeQuery(job);
if (jobStringImmutablePair.getRight() != null) {
LOGGER.error("Failed on copy tables with error:" + job.getStatus());

View File

@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.common.base.Charsets;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationWriter;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
@@ -23,7 +24,7 @@ public class BigQueryTableWriter implements DestinationWriter {
private final TableDataWriteChannel writeChannel;
public BigQueryTableWriter(TableDataWriteChannel writeChannel) {
public BigQueryTableWriter(final TableDataWriteChannel writeChannel) {
this.writeChannel = writeChannel;
}
@@ -31,30 +32,31 @@ public class BigQueryTableWriter implements DestinationWriter {
public void initialize() throws IOException {}
@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) {
public void write(final UUID id, final AirbyteRecordMessage recordMessage) {
throw new RuntimeException("This write method is not used!");
}
@Override
public void write(JsonNode formattedData) throws IOException {
public void write(final JsonNode formattedData) throws IOException {
writeChannel.write(ByteBuffer.wrap((Jsons.serialize(formattedData) + "\n").getBytes(Charsets.UTF_8)));
}
@Override
public void write(String formattedData) throws IOException {
public void write(final String formattedData) throws IOException {
writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8)));
}
@Override
public void close(boolean hasFailed) throws IOException {
public void close(final boolean hasFailed) throws IOException {
this.writeChannel.close();
try {
Job job = writeChannel.getJob();
final Job job = writeChannel.getJob();
if (job != null && job.getStatus().getError() != null) {
AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag());
throw new RuntimeException("Fail to complete a load job in big query, Job id: " + writeChannel.getJob().getJobId() +
", with error: " + writeChannel.getJob().getStatus().getError());
}
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

View File

@@ -4,7 +4,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.4.1'
cdkVersionRequired = '0.4.2'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}

View File

@@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.4.7
dockerImageTag: 3.4.8
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg

View File

@@ -140,7 +140,8 @@ Now that you have set up the BigQuery destination connector, check out the follo
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.6 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
| 2.3.7 | 2023-11-06 | [#32190](https://github.com/airbytehq/airbyte/pull/32190) | Further improve error reporting |
| 2.3.6 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
| 2.3.5 | 2023-11-02 | [31983](https://github.com/airbytehq/airbyte/pull/31983) | Improve error reporting |
| 2.3.4 | 2023-10-31 | [32010](https://github.com/airbytehq/airbyte/pull/32010) | Add additional data centers. |
| 2.3.3 | 2023-10-30 | [31985](https://github.com/airbytehq/airbyte/pull/31985) | Delay upgrade deadline to Nov 7 |

View File

@@ -277,7 +277,8 @@ Otherwise, make sure to grant the role the required permissions in the desired n
| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.7 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
| 3.4.8 | 2023-11-06 | [#32190](https://github.com/airbytehq/airbyte/pull/32190) | Further improve error reporting |
| 3.4.7 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
| 3.4.6 | 2023-11-02 | [32124](https://github.com/airbytehq/airbyte/pull/32124) | Revert `merge` statement |
| 3.4.5 | 2023-11-02 | [31983](https://github.com/airbytehq/airbyte/pull/31983) | Improve error reporting |
| 3.4.4 | 2023-10-30 | [31985](https://github.com/airbytehq/airbyte/pull/31985) | Delay upgrade deadline to Nov 7 |