destinations v2: snowflake: threadsafe typing and deduping (#29878)
Co-authored-by: edgao <edgao@users.noreply.github.com> Co-authored-by: Evan Tahler <evan@airbyte.io> Co-authored-by: evantahler <evantahler@users.noreply.github.com>
This commit is contained in:
@@ -7,16 +7,23 @@ package io.airbyte.integrations.base.destination.typing_deduping;
|
||||
import static io.airbyte.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
|
||||
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads;
|
||||
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
|
||||
import static java.util.Collections.singleton;
|
||||
|
||||
import autovalue.shaded.kotlin.Pair;
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
@@ -30,7 +37,7 @@ import org.slf4j.LoggerFactory;
|
||||
* In a typical sync, destinations should call the methods:
|
||||
* <ol>
|
||||
* <li>{@link #prepareTables()} once at the start of the sync</li>
|
||||
* <li>{@link #typeAndDedupe(String, String)} as needed throughout the sync</li>
|
||||
* <li>{@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync</li>
|
||||
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
|
||||
* </ol>
|
||||
* Note that createFinalTables initializes some internal state. The other methods will throw an
|
||||
@@ -51,6 +58,16 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
private final ParsedCatalog parsedCatalog;
|
||||
private Set<StreamId> overwriteStreamsWithTmpTable;
|
||||
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
|
||||
// We only want to run a single instance of T+D per stream at a time. These objects are used for
|
||||
// synchronization per stream.
|
||||
// Use a read-write lock because we need the same semantics:
|
||||
// * any number of threads can insert to the raw tables at the same time, as long as T+D isn't
|
||||
// running (i.e. "read lock")
|
||||
// * T+D must run in complete isolation (i.e. "write lock")
|
||||
private final Map<StreamId, ReadWriteLock> tdLocks;
|
||||
// These locks are used to prevent multiple simultaneous attempts to T+D the same stream.
|
||||
// We use tryLock with these so that we don't queue up multiple T+D runs for the same stream.
|
||||
private final Map<StreamId, Lock> internalTdLocks;
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
@@ -66,6 +83,8 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
this.v1V2Migrator = v1V2Migrator;
|
||||
this.v2TableMigrator = v2TableMigrator;
|
||||
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
|
||||
this.tdLocks = new HashMap<>();
|
||||
this.internalTdLocks = new HashMap<>();
|
||||
this.executorService = Executors.newFixedThreadPool(countOfTypingDedupingThreads(defaultThreadCount),
|
||||
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
|
||||
}
|
||||
@@ -79,12 +98,6 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>(), defaultThreadCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
|
||||
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
|
||||
* empty) we write to a temporary final table, and swap it into the true final table at the end of
|
||||
* the sync. This is to prevent user downtime during a sync.
|
||||
*/
|
||||
public void prepareTables() throws Exception {
|
||||
if (overwriteStreamsWithTmpTable != null) {
|
||||
throw new IllegalStateException("Tables were already prepared.");
|
||||
@@ -137,38 +150,41 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
|
||||
}
|
||||
streamsWithSuccessfulSetup.add(new Pair<>(stream.id().originalNamespace(), stream.id().originalName()));
|
||||
|
||||
// Use fair locking. This slows down lock operations, but that performance hit is by far dwarfed
|
||||
// by our IO costs. This lock needs to be fair because the raw table writers are running almost
|
||||
// constantly,
|
||||
// and we don't want them to starve T+D.
|
||||
tdLocks.put(stream.id(), new ReentrantReadWriteLock(true));
|
||||
// This lock doesn't need to be fair; any T+D instance is equivalent and we'll skip T+D if we can't
|
||||
// immediately acquire the lock.
|
||||
internalTdLocks.put(stream.id(), new ReentrantLock());
|
||||
|
||||
return Optional.empty();
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Exception occurred while preparing tables for stream " + stream.id().originalName(), e);
|
||||
return Optional.of(e);
|
||||
}
|
||||
}, this.executorService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute typing and deduping for a single stream (i.e. fetch new raw records into the final table,
|
||||
* etc.).
|
||||
* <p>
|
||||
* This method is thread-safe; multiple threads can call it concurrently.
|
||||
*
|
||||
* @param originalNamespace The stream's namespace, as declared in the configured catalog
|
||||
* @param originalName The stream's name, as declared in the configured catalog
|
||||
*/
|
||||
public void typeAndDedupe(final String originalNamespace, final String originalName) throws Exception {
|
||||
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
|
||||
public void typeAndDedupe(final String originalNamespace, final String originalName, final boolean mustRun) throws Exception {
|
||||
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
|
||||
if (!streamsWithSuccessfulSetup.contains(new Pair<>(originalNamespace, originalName))) {
|
||||
// For example, if T+D setup fails, but the consumer tries to run T+D on all streams during close,
|
||||
// we should skip it.
|
||||
LOGGER.warn("Skipping typing and deduping for {}.{} because we could not set up the tables for this stream.", originalNamespace, originalName);
|
||||
return;
|
||||
}
|
||||
final String suffix = getFinalTableSuffix(streamConfig.id());
|
||||
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
|
||||
destinationHandler.execute(sql);
|
||||
final CompletableFuture<Optional<Exception>> task = typeAndDedupeTask(streamConfig, mustRun);
|
||||
reduceExceptions(
|
||||
singleton(task),
|
||||
String.format(
|
||||
"The Following Exceptions were thrown while typing and deduping %s.%s:\n",
|
||||
originalNamespace,
|
||||
originalName));
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<Exception>> typeAndDedupeTask(StreamConfig streamConfig) {
|
||||
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
|
||||
final var streamConfig = parsedCatalog.getStream(originalNamespace, originalName);
|
||||
return tdLocks.get(streamConfig.id()).readLock();
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<Exception>> typeAndDedupeTask(final StreamConfig streamConfig, final boolean mustRun) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
final var originalNamespace = streamConfig.id().originalNamespace();
|
||||
final var originalName = streamConfig.id().originalName();
|
||||
@@ -180,11 +196,38 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
originalName);
|
||||
return Optional.empty();
|
||||
}
|
||||
final String suffix = getFinalTableSuffix(streamConfig.id());
|
||||
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
|
||||
destinationHandler.execute(sql);
|
||||
|
||||
final boolean run;
|
||||
final Lock internalLock = internalTdLocks.get(streamConfig.id());
|
||||
if (mustRun) {
|
||||
// If we must run T+D, then wait until we acquire the lock.
|
||||
internalLock.lock();
|
||||
run = true;
|
||||
} else {
|
||||
// Otherwise, try and get the lock. If another thread already has it, then we should noop here.
|
||||
run = internalLock.tryLock();
|
||||
}
|
||||
|
||||
if (run) {
|
||||
LOGGER.info("Waiting for raw table writes to pause for {}.{}", originalNamespace, originalName);
|
||||
final Lock externalLock = tdLocks.get(streamConfig.id()).writeLock();
|
||||
externalLock.lock();
|
||||
try {
|
||||
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
|
||||
final String suffix = getFinalTableSuffix(streamConfig.id());
|
||||
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
|
||||
destinationHandler.execute(sql);
|
||||
} finally {
|
||||
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);
|
||||
externalLock.unlock();
|
||||
internalLock.unlock();
|
||||
}
|
||||
} else {
|
||||
LOGGER.info("Another thread is already trying to run typing and deduping for {}.{}. Skipping it here.", originalNamespace,
|
||||
originalName);
|
||||
}
|
||||
return Optional.empty();
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Exception occurred while typing and deduping stream " + originalName, e);
|
||||
return Optional.of(e);
|
||||
}
|
||||
@@ -196,7 +239,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
LOGGER.info("Typing and deduping all tables");
|
||||
final Set<CompletableFuture<Optional<Exception>>> typeAndDedupeTasks = new HashSet<>();
|
||||
parsedCatalog.streams().forEach(streamConfig -> {
|
||||
typeAndDedupeTasks.add(typeAndDedupeTask(streamConfig));
|
||||
typeAndDedupeTasks.add(typeAndDedupeTask(streamConfig, true));
|
||||
});
|
||||
CompletableFuture.allOf(typeAndDedupeTasks.toArray(CompletableFuture[]::new)).join();
|
||||
reduceExceptions(typeAndDedupeTasks, "The Following Exceptions were thrown while typing and deduping tables:\n");
|
||||
@@ -234,7 +277,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
|
||||
LOGGER.info("Overwriting final table with tmp table for stream {}.{}", streamId.originalNamespace(), streamId.originalName());
|
||||
try {
|
||||
destinationHandler.execute(overwriteFinalTable);
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Exception Occurred while committing final table for stream " + streamId.originalName(), e);
|
||||
return Optional.of(e);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,10 @@
|
||||
|
||||
package io.airbyte.integrations.base.destination.typing_deduping;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
public class NoopTyperDeduper implements TyperDeduper {
|
||||
|
||||
@Override
|
||||
@@ -12,10 +16,48 @@ public class NoopTyperDeduper implements TyperDeduper {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void typeAndDedupe(String originalNamespace, String originalName) throws Exception {
|
||||
public void typeAndDedupe(final String originalNamespace, final String originalName, final boolean mustRun) throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) {
|
||||
// Return a fake lock that does nothing.
|
||||
return new Lock() {
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock(final long time, final TimeUnit unit) throws InterruptedException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Condition newCondition() {
|
||||
return null;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitFinalTables() throws Exception {
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ public class TypeAndDedupeOperationValve extends ConcurrentHashMap<AirbyteStream
|
||||
|
||||
private static final Supplier<Long> SYSTEM_NOW = () -> System.currentTimeMillis();
|
||||
|
||||
private ConcurrentHashMap<AirbyteStreamNameNamespacePair, Integer> incrementalIndex;
|
||||
private final ConcurrentHashMap<AirbyteStreamNameNamespacePair, Integer> incrementalIndex;
|
||||
|
||||
private final Supplier<Long> nowness;
|
||||
|
||||
@@ -42,7 +42,7 @@ public class TypeAndDedupeOperationValve extends ConcurrentHashMap<AirbyteStream
|
||||
*
|
||||
* @param nownessSupplier Supplier which will return a long value representing now
|
||||
*/
|
||||
public TypeAndDedupeOperationValve(Supplier<Long> nownessSupplier) {
|
||||
public TypeAndDedupeOperationValve(final Supplier<Long> nownessSupplier) {
|
||||
super();
|
||||
incrementalIndex = new ConcurrentHashMap<>();
|
||||
this.nowness = nownessSupplier;
|
||||
@@ -66,6 +66,11 @@ public class TypeAndDedupeOperationValve extends ConcurrentHashMap<AirbyteStream
|
||||
put(key, nowness.get());
|
||||
}
|
||||
|
||||
public void addStreamIfAbsent(final AirbyteStreamNameNamespacePair key) {
|
||||
putIfAbsent(key, nowness.get());
|
||||
incrementalIndex.putIfAbsent(key, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether we should type and dedupe at this point in time for this particular stream.
|
||||
*
|
||||
|
||||
@@ -4,12 +4,54 @@
|
||||
|
||||
package io.airbyte.integrations.base.destination.typing_deduping;
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
public interface TyperDeduper {
|
||||
|
||||
/**
|
||||
* Create the tables that T+D will write to during the sync. In OVERWRITE mode, these might not be
|
||||
* the true final tables. Specifically, other than an initial sync (i.e. table does not exist, or is
|
||||
* empty) we write to a temporary final table, and swap it into the true final table at the end of
|
||||
* the sync. This is to prevent user downtime during a sync.
|
||||
*/
|
||||
void prepareTables() throws Exception;
|
||||
|
||||
void typeAndDedupe(String originalNamespace, String originalName) throws Exception;
|
||||
/**
|
||||
* Suggest that we execute typing and deduping for a single stream (i.e. fetch new raw records into
|
||||
* the final table, etc.).
|
||||
* <p>
|
||||
* This method is thread-safe; multiple threads can call it concurrently. If T+D is already running
|
||||
* for the given stream, this method may choose to do nothing. If a caller wishes to force T+D to
|
||||
* run (for example, at the end of a sync), they may set {@code mustRun} to true.
|
||||
* <p>
|
||||
* This method relies on callers to prevent concurrent modification to the underlying raw tables.
|
||||
* This is most easily accomplished using {@link #getRawTableInsertLock(String, String)}, if the
|
||||
* caller guards all raw table writes using {@code getRawTableInsertLock().lock()} and
|
||||
* {@code getRawTableInsertLock().unlock()}. While {@code typeAndDedupe} is executing, that lock
|
||||
* will be unavailable. However, callers are free to enforce this in other ways (for example,
|
||||
* single- threaded callers do not need to use the lock).
|
||||
*
|
||||
* @param originalNamespace The stream's namespace, as declared in the configured catalog
|
||||
* @param originalName The stream's name, as declared in the configured catalog
|
||||
*/
|
||||
void typeAndDedupe(String originalNamespace, String originalName, boolean mustRun) throws Exception;
|
||||
|
||||
/**
|
||||
* Get the lock that should be used to synchronize inserts to the raw table for a given stream. This
|
||||
* lock permits any number of threads to hold the lock, but
|
||||
* {@link #typeAndDedupe(String, String, boolean)} will not proceed while this lock is held.
|
||||
* <p>
|
||||
* This lock provides fairness guarantees, i.e. typeAndDedupe will not starve while waiting for the
|
||||
* lock (and similarly, raw table writers will not starve if many typeAndDedupe calls are queued).
|
||||
*/
|
||||
Lock getRawTableInsertLock(final String originalNamespace, final String originalName);
|
||||
|
||||
/**
|
||||
* Does any "end of sync" work. For most streams, this is a noop.
|
||||
* <p>
|
||||
* For OVERWRITE streams where we're writing to a temp table, this is where we swap the temp table
|
||||
* into the final table.
|
||||
*/
|
||||
void typeAndDedupe() throws Exception;
|
||||
|
||||
void commitFinalTables() throws Exception;
|
||||
|
||||
@@ -77,11 +77,11 @@ public class DefaultTyperDeduperTest {
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream");
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE overwrite_ns.overwrite_stream");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE append_ns.append_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE dedup_ns.dedup_stream");
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
@@ -107,11 +107,11 @@ public class DefaultTyperDeduperTest {
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream");
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE append_ns.append_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE dedup_ns.dedup_stream");
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
@@ -153,12 +153,12 @@ public class DefaultTyperDeduperTest {
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream");
|
||||
typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream", false);
|
||||
// NB: no airbyte_tmp suffix on the non-overwrite streams
|
||||
verify(destinationHandler).execute("UPDATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream");
|
||||
typerDeduper.typeAndDedupe("append_ns", "append_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE append_ns.append_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false);
|
||||
verify(destinationHandler).execute("UPDATE TABLE dedup_ns.dedup_stream");
|
||||
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
|
||||
clearInvocations(destinationHandler);
|
||||
@@ -188,7 +188,7 @@ public class DefaultTyperDeduperTest {
|
||||
@Test
|
||||
void nonexistentStream() {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> typerDeduper.typeAndDedupe("nonexistent_ns", "nonexistent_stream"));
|
||||
() -> typerDeduper.typeAndDedupe("nonexistent_ns", "nonexistent_stream", false));
|
||||
verifyNoInteractions(ignoreStubs(destinationHandler));
|
||||
}
|
||||
|
||||
@@ -199,7 +199,7 @@ public class DefaultTyperDeduperTest {
|
||||
assertThrows(Exception.class, () -> typerDeduper.prepareTables());
|
||||
clearInvocations(destinationHandler);
|
||||
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream");
|
||||
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false);
|
||||
typerDeduper.commitFinalTables();
|
||||
|
||||
verifyNoInteractions(ignoreStubs(destinationHandler));
|
||||
|
||||
@@ -10,8 +10,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
|
||||
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
|
||||
import io.airbyte.integrations.destination.buffered_stream_consumer.OnStartFunction;
|
||||
import io.airbyte.integrations.destination.jdbc.WriteConfig;
|
||||
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
@@ -77,8 +79,21 @@ public class GeneralStagingFunctions {
|
||||
final TyperDeduper typerDeduper)
|
||||
throws Exception {
|
||||
try {
|
||||
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
|
||||
tableName, schemaName);
|
||||
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(streamNamespace, streamName);
|
||||
rawTableInsertLock.lock();
|
||||
try {
|
||||
stagingOperations.copyIntoTableFromStage(database, stageName, stagingPath, stagedFiles,
|
||||
tableName, schemaName);
|
||||
} finally {
|
||||
rawTableInsertLock.unlock();
|
||||
}
|
||||
|
||||
final AirbyteStreamNameNamespacePair streamId = new AirbyteStreamNameNamespacePair(streamName, streamNamespace);
|
||||
typerDeduperValve.addStreamIfAbsent(streamId);
|
||||
if (typerDeduperValve.readyToTypeAndDedupe(streamId)) {
|
||||
typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName(), false);
|
||||
typerDeduperValve.updateTimeAndIncreaseInterval(streamId);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
stagingOperations.cleanUpStage(database, stageName, stagedFiles);
|
||||
log.info("Cleaning stage path {}", stagingPath);
|
||||
|
||||
@@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=2.0.6
|
||||
LABEL io.airbyte.version=2.0.7
|
||||
LABEL io.airbyte.name=airbyte/destination-bigquery
|
||||
|
||||
@@ -2,7 +2,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
|
||||
dockerImageTag: 2.0.6
|
||||
dockerImageTag: 2.0.7
|
||||
dockerRepository: airbyte/destination-bigquery
|
||||
githubIssueLabel: destination-bigquery
|
||||
icon: bigquery.svg
|
||||
|
||||
@@ -124,7 +124,7 @@ public class BigQueryRecordConsumer extends FailureTrackingAirbyteMessageConsume
|
||||
uploaderMap.forEach((streamId, uploader) -> {
|
||||
try {
|
||||
uploader.close(hasFailed, outputRecordCollector, lastStateMessage);
|
||||
typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName());
|
||||
typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName(), true);
|
||||
} catch (final Exception e) {
|
||||
exceptionsThrown.add(e);
|
||||
LOGGER.error("Exception while closing uploader {}", uploader, e);
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.integrations.base.AirbyteMessageConsumer;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
|
||||
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
|
||||
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
|
||||
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
|
||||
@@ -29,6 +30,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.protocol.models.v0.DestinationSyncMode;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -63,7 +65,7 @@ public class BigQueryStagingConsumerFactory {
|
||||
recordFormatterCreator,
|
||||
tmpTableNameTransformer);
|
||||
|
||||
CheckedConsumer<AirbyteStreamNameNamespacePair, Exception> typeAndDedupeStreamFunction =
|
||||
final CheckedConsumer<AirbyteStreamNameNamespacePair, Exception> typeAndDedupeStreamFunction =
|
||||
incrementalTypingAndDedupingStreamConsumer(typerDeduper);
|
||||
|
||||
return new BufferedStreamConsumer(
|
||||
@@ -72,7 +74,7 @@ public class BigQueryStagingConsumerFactory {
|
||||
new SerializedBufferingStrategy(
|
||||
onCreateBuffer,
|
||||
catalog,
|
||||
flushBufferFunction(bigQueryGcsOperations, writeConfigs, catalog, typeAndDedupeStreamFunction)),
|
||||
flushBufferFunction(bigQueryGcsOperations, writeConfigs, catalog, typeAndDedupeStreamFunction, typerDeduper)),
|
||||
onCloseFunction(bigQueryGcsOperations, writeConfigs, typerDeduper),
|
||||
catalog,
|
||||
json -> true,
|
||||
@@ -81,15 +83,15 @@ public class BigQueryStagingConsumerFactory {
|
||||
|
||||
// TODO Commenting this out for now since it slows down syncs
|
||||
private CheckedConsumer<AirbyteStreamNameNamespacePair, Exception> incrementalTypingAndDedupingStreamConsumer(final TyperDeduper typerDeduper) {
|
||||
// final TypeAndDedupeOperationValve valve = new TypeAndDedupeOperationValve();
|
||||
// final TypeAndDedupeOperationValve valve = new TypeAndDedupeOperationValve();
|
||||
return (streamId) -> {
|
||||
// if (!valve.containsKey(streamId)) {
|
||||
// valve.addStream(streamId);
|
||||
// }
|
||||
// if (valve.readyToTypeAndDedupe(streamId)) {
|
||||
// typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName());
|
||||
// valve.updateTimeAndIncreaseInterval(streamId);
|
||||
// }
|
||||
// if (!valve.containsKey(streamId)) {
|
||||
// valve.addStream(streamId);
|
||||
// }
|
||||
// if (valve.readyToTypeAndDedupe(streamId)) {
|
||||
// typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName(), false);
|
||||
// valve.updateTimeAndIncreaseInterval(streamId);
|
||||
// }
|
||||
};
|
||||
}
|
||||
|
||||
@@ -181,7 +183,8 @@ public class BigQueryStagingConsumerFactory {
|
||||
final BigQueryStagingOperations bigQueryGcsOperations,
|
||||
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
|
||||
final ConfiguredAirbyteCatalog catalog,
|
||||
final CheckedConsumer<AirbyteStreamNameNamespacePair, Exception> incrementalTypeAndDedupeConsumer) {
|
||||
final CheckedConsumer<AirbyteStreamNameNamespacePair, Exception> incrementalTypeAndDedupeConsumer,
|
||||
final TyperDeduper typerDeduper) {
|
||||
return (pair, writer) -> {
|
||||
LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
|
||||
if (!writeConfigs.containsKey(pair)) {
|
||||
@@ -202,8 +205,14 @@ public class BigQueryStagingConsumerFactory {
|
||||
* the sync
|
||||
*/
|
||||
writeConfig.addStagedFile(stagedFile);
|
||||
bigQueryGcsOperations.copyIntoTableFromStage(datasetId, stream, writeConfig.targetTableId(), writeConfig.tableSchema(),
|
||||
List.of(stagedFile));
|
||||
final Lock rawTableInsertLock = typerDeduper.getRawTableInsertLock(writeConfig.namespace(), writeConfig.streamName());
|
||||
rawTableInsertLock.lock();
|
||||
try {
|
||||
bigQueryGcsOperations.copyIntoTableFromStage(datasetId, stream, writeConfig.targetTableId(), writeConfig.tableSchema(),
|
||||
List.of(stagedFile));
|
||||
} finally {
|
||||
rawTableInsertLock.unlock();
|
||||
}
|
||||
incrementalTypeAndDedupeConsumer.accept(new AirbyteStreamNameNamespacePair(writeConfig.streamName(), writeConfig.namespace()));
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Failed to flush and commit buffer data into destination's raw table:", e);
|
||||
|
||||
@@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
|
||||
|
||||
ENV ENABLE_SENTRY true
|
||||
|
||||
LABEL io.airbyte.version=3.1.2
|
||||
LABEL io.airbyte.version=3.1.3
|
||||
LABEL io.airbyte.name=airbyte/destination-snowflake
|
||||
|
||||
@@ -2,7 +2,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
|
||||
dockerImageTag: 3.1.2
|
||||
dockerImageTag: 3.1.3
|
||||
dockerRepository: airbyte/destination-snowflake
|
||||
githubIssueLabel: destination-snowflake
|
||||
icon: snowflake.svg
|
||||
|
||||
@@ -24,5 +24,4 @@ dependencies {
|
||||
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-scaffold-java-jdbc')
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
|
||||
}
|
||||
|
||||
@@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 2.0.7 | 2023-08-29 | [29878](https://github.com/airbytehq/airbyte/pull/29878) | Internal code changes |
|
||||
| 2.0.6 | 2023-09-05 | [\#29917](https://github.com/airbytehq/airbyte/pull/29917) | Improve performance by changing metadata error array construction from ARRAY_CONCAT to ARRAY_AGG |
|
||||
| 2.0.5 | 2023-08-31 | [\#30020](https://github.com/airbytehq/airbyte/pull/30020) | Run typing and deduping tasks in parallel |
|
||||
| 2.0.4 | 2023-09-05 | [\#30117](https://github.com/airbytehq/airbyte/pull/30117) | Type and Dedupe at sync start and then every 6 hours |
|
||||
|
||||
@@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
| :-------------- | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| 3.1.3 | 2023-08-29 | [\#29878](https://github.com/airbytehq/airbyte/pull/29878) | Reenable incremental typing and deduping |
|
||||
| 3.1.2 | 2023-08-31 | [\#30020](https://github.com/airbytehq/airbyte/pull/30020) | Run typing and deduping tasks in parallel |
|
||||
| 3.1.1 | 2023-09-05 | [\#30117](https://github.com/airbytehq/airbyte/pull/30117) | Type and Dedupe at sync start and then every 6 hours |
|
||||
| 3.1.0 | 2023-09-01 | [\#30056](https://github.com/airbytehq/airbyte/pull/30056) | Upcase final table names to allow case-insensitive references |
|
||||
|
||||
Reference in New Issue
Block a user