implement column filtering in the replication workflow (#20369)
* implement column filtering in the replication workflow * fixes to column selection in replication workflow * add a basic acceptance test for column selection * make CI acceptance tests run with new field selection flag enabled * fix format * readability improvements around columns selection tests and other small fixes
This commit is contained in:
@@ -10,7 +10,9 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import datadog.trace.api.Trace;
|
||||
import io.airbyte.commons.io.LineGobbler;
|
||||
import io.airbyte.config.FailureReason;
|
||||
@@ -28,6 +30,7 @@ import io.airbyte.protocol.models.AirbyteMessage;
|
||||
import io.airbyte.protocol.models.AirbyteMessage.Type;
|
||||
import io.airbyte.protocol.models.AirbyteRecordMessage;
|
||||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.workers.RecordSchemaValidator;
|
||||
import io.airbyte.workers.WorkerMetricReporter;
|
||||
import io.airbyte.workers.WorkerUtils;
|
||||
@@ -41,6 +44,7 @@ import io.airbyte.workers.internal.AirbyteSource;
|
||||
import io.airbyte.workers.internal.book_keeping.MessageTracker;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -94,6 +98,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
private final AtomicBoolean hasFailed;
|
||||
private final RecordSchemaValidator recordSchemaValidator;
|
||||
private final WorkerMetricReporter metricReporter;
|
||||
private final boolean fieldSelectionEnabled;
|
||||
|
||||
public DefaultReplicationWorker(final String jobId,
|
||||
final int attempt,
|
||||
@@ -102,7 +107,8 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
final AirbyteDestination destination,
|
||||
final MessageTracker messageTracker,
|
||||
final RecordSchemaValidator recordSchemaValidator,
|
||||
final WorkerMetricReporter metricReporter) {
|
||||
final WorkerMetricReporter metricReporter,
|
||||
final boolean fieldSelectionEnabled) {
|
||||
this.jobId = jobId;
|
||||
this.attempt = attempt;
|
||||
this.source = source;
|
||||
@@ -112,6 +118,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
this.executors = Executors.newFixedThreadPool(2);
|
||||
this.recordSchemaValidator = recordSchemaValidator;
|
||||
this.metricReporter = metricReporter;
|
||||
this.fieldSelectionEnabled = fieldSelectionEnabled;
|
||||
|
||||
this.cancelled = new AtomicBoolean(false);
|
||||
this.hasFailed = new AtomicBoolean(false);
|
||||
@@ -198,8 +205,18 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
});
|
||||
|
||||
final CompletableFuture<?> readSrcAndWriteDstThread = CompletableFuture.runAsync(
|
||||
readFromSrcAndWriteToDstRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter,
|
||||
timeTracker),
|
||||
readFromSrcAndWriteToDstRunnable(
|
||||
source,
|
||||
destination,
|
||||
sourceConfig.getCatalog(),
|
||||
cancelled,
|
||||
mapper,
|
||||
messageTracker,
|
||||
mdc,
|
||||
recordSchemaValidator,
|
||||
metricReporter,
|
||||
timeTracker,
|
||||
fieldSelectionEnabled),
|
||||
executors)
|
||||
.whenComplete((msg, ex) -> {
|
||||
if (ex != null) {
|
||||
@@ -279,18 +296,24 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
|
||||
private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource source,
|
||||
final AirbyteDestination destination,
|
||||
final ConfiguredAirbyteCatalog catalog,
|
||||
final AtomicBoolean cancelled,
|
||||
final AirbyteMapper mapper,
|
||||
final MessageTracker messageTracker,
|
||||
final Map<String, String> mdc,
|
||||
final RecordSchemaValidator recordSchemaValidator,
|
||||
final WorkerMetricReporter metricReporter,
|
||||
final ThreadedTimeTracker timeHolder) {
|
||||
final ThreadedTimeTracker timeHolder,
|
||||
final boolean fieldSelectionEnabled) {
|
||||
return () -> {
|
||||
MDC.setContextMap(mdc);
|
||||
LOGGER.info("Replication thread started.");
|
||||
Long recordsRead = 0L;
|
||||
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
|
||||
final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields = new HashMap<>();
|
||||
if (fieldSelectionEnabled) {
|
||||
populatedStreamToSelectedFields(catalog, streamToSelectedFields);
|
||||
}
|
||||
try {
|
||||
while (!cancelled.get() && !source.isFinished()) {
|
||||
final Optional<AirbyteMessage> messageOptional;
|
||||
@@ -302,6 +325,9 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
|
||||
if (messageOptional.isPresent()) {
|
||||
final AirbyteMessage airbyteMessage = messageOptional.get();
|
||||
if (fieldSelectionEnabled) {
|
||||
filterSelectedFields(streamToSelectedFields, airbyteMessage);
|
||||
}
|
||||
validateSchema(recordSchemaValidator, validationErrors, airbyteMessage);
|
||||
final AirbyteMessage message = mapper.mapMessage(airbyteMessage);
|
||||
|
||||
@@ -549,6 +575,47 @@ public class DefaultReplicationWorker implements ReplicationWorker {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a map from stream -> the explicit list of fields included for that stream, according to
|
||||
* the configured catalog. Since the configured catalog only includes the selected fields, this lets
|
||||
* us filter records to only the fields explicitly requested.
|
||||
*
|
||||
* @param catalog
|
||||
* @param streamToSelectedFields
|
||||
*/
|
||||
private static void populatedStreamToSelectedFields(final ConfiguredAirbyteCatalog catalog,
|
||||
final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields) {
|
||||
for (final var s : catalog.getStreams()) {
|
||||
final List<String> selectedFields = new ArrayList<>();
|
||||
final JsonNode propertiesNode = s.getStream().getJsonSchema().findPath("properties");
|
||||
if (propertiesNode.isObject()) {
|
||||
propertiesNode.fieldNames().forEachRemaining((fieldName) -> selectedFields.add(fieldName));
|
||||
} else {
|
||||
throw new RuntimeException("No properties node in stream schema");
|
||||
}
|
||||
streamToSelectedFields.put(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(s), selectedFields);
|
||||
}
|
||||
}
|
||||
|
||||
private static void filterSelectedFields(final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields,
|
||||
final AirbyteMessage airbyteMessage) {
|
||||
final AirbyteRecordMessage record = airbyteMessage.getRecord();
|
||||
|
||||
if (record == null) {
|
||||
// This isn't a record message, so we don't need to do any filtering.
|
||||
return;
|
||||
}
|
||||
|
||||
final AirbyteStreamNameNamespacePair messageStream = AirbyteStreamNameNamespacePair.fromRecordMessage(record);
|
||||
final List<String> selectedFields = streamToSelectedFields.getOrDefault(messageStream, Collections.emptyList());
|
||||
final JsonNode data = record.getData();
|
||||
if (data.isObject()) {
|
||||
((ObjectNode) data).retain(selectedFields);
|
||||
} else {
|
||||
throw new RuntimeException(String.format("Unexpected data in record: %s", data.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = WORKER_OPERATION_NAME)
|
||||
@Override
|
||||
public void cancel() {
|
||||
|
||||
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.io.IOs;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
@@ -61,6 +62,7 @@ import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -153,7 +155,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
@@ -181,7 +183,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
@@ -211,7 +213,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
assertTrue(output.getFailures().stream().anyMatch(f -> f.getFailureOrigin().equals(FailureOrigin.SOURCE)));
|
||||
@@ -231,7 +233,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -253,7 +255,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -274,7 +276,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertTrue(output.getFailures().stream()
|
||||
@@ -296,7 +298,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -322,7 +324,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
@@ -334,6 +336,68 @@ class DefaultReplicationWorkerTest {
|
||||
verify(destination, never()).accept(TRACE_MESSAGE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testOnlySelectedFieldsDeliveredToDestinationWithFieldSelectionEnabled() throws Exception {
|
||||
// Generate a record with an extra field.
|
||||
final AirbyteMessage recordWithExtraFields = Jsons.clone(RECORD_MESSAGE1);
|
||||
((ObjectNode) recordWithExtraFields.getRecord().getData()).put("AnUnexpectedField", "SomeValue");
|
||||
when(mapper.mapMessage(recordWithExtraFields)).thenReturn(recordWithExtraFields);
|
||||
when(source.attemptRead()).thenReturn(Optional.of(recordWithExtraFields));
|
||||
when(source.isFinished()).thenReturn(false, true);
|
||||
// Use a real schema validator to make sure validation doesn't affect this.
|
||||
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
|
||||
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
|
||||
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
|
||||
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
|
||||
final ReplicationWorker worker = new DefaultReplicationWorker(
|
||||
JOB_ID,
|
||||
JOB_ATTEMPT,
|
||||
source,
|
||||
mapper,
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter,
|
||||
true);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
// Despite reading recordWithExtraFields from the source, we write the original RECORD_MESSAGE1 to
|
||||
// the destination because the new field has been filtered out.
|
||||
verify(destination).accept(RECORD_MESSAGE1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAllFieldsDeliveredWithFieldSelectionDisabled() throws Exception {
|
||||
// Generate a record with an extra field.
|
||||
final AirbyteMessage recordWithExtraFields = Jsons.clone(RECORD_MESSAGE1);
|
||||
((ObjectNode) recordWithExtraFields.getRecord().getData()).put("AnUnexpectedField", "SomeValue");
|
||||
when(mapper.mapMessage(recordWithExtraFields)).thenReturn(recordWithExtraFields);
|
||||
when(source.attemptRead()).thenReturn(Optional.of(recordWithExtraFields));
|
||||
when(source.isFinished()).thenReturn(false, true);
|
||||
// Use a real schema validator to make sure validation doesn't affect this.
|
||||
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
|
||||
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
|
||||
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
|
||||
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
|
||||
final ReplicationWorker worker = new DefaultReplicationWorker(
|
||||
JOB_ID,
|
||||
JOB_ATTEMPT,
|
||||
source,
|
||||
mapper,
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter,
|
||||
false);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
// Despite the field not being in the catalog, we write the extra field anyway because field
|
||||
// selection is disabled.
|
||||
verify(destination).accept(recordWithExtraFields);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDestinationNonZeroExitValue() throws Exception {
|
||||
when(destination.getExitValue()).thenReturn(1);
|
||||
@@ -346,7 +410,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -367,7 +431,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -389,7 +453,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput output = worker.run(syncInput, jobRoot);
|
||||
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
|
||||
@@ -412,7 +476,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
worker.run(syncInput, jobRoot);
|
||||
|
||||
@@ -453,7 +517,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final Thread workerThread = new Thread(() -> {
|
||||
try {
|
||||
@@ -501,7 +565,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
|
||||
final ReplicationOutput replicationOutput = new ReplicationOutput()
|
||||
@@ -568,7 +632,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
|
||||
assertNotNull(actual);
|
||||
@@ -587,7 +651,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
|
||||
|
||||
@@ -621,7 +685,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput actual = worker.run(syncInput, jobRoot);
|
||||
final SyncStats expectedTotalStats = new SyncStats()
|
||||
@@ -667,7 +731,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
|
||||
final ReplicationOutput actual = worker.run(syncInputWithoutState, jobRoot);
|
||||
|
||||
@@ -687,7 +751,7 @@ class DefaultReplicationWorkerTest {
|
||||
destination,
|
||||
messageTracker,
|
||||
recordSchemaValidator,
|
||||
workerMetricReporter);
|
||||
workerMetricReporter, false);
|
||||
assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot));
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
|
||||
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
|
||||
public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES";
|
||||
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
|
||||
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
|
||||
|
||||
@Override
|
||||
public boolean autoDisablesFailingConnections() {
|
||||
@@ -47,6 +48,11 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
|
||||
return getEnvOrDefault(NEED_STATE_VALIDATION, true, Boolean::parseBoolean);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyFieldSelection() {
|
||||
return getEnvOrDefault(APPLY_FIELD_SELECTION, false, Boolean::parseBoolean);
|
||||
}
|
||||
|
||||
// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
|
||||
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
|
||||
final String value = System.getenv(key);
|
||||
|
||||
@@ -22,4 +22,6 @@ public interface FeatureFlags {
|
||||
|
||||
boolean needStateValidation();
|
||||
|
||||
boolean applyFieldSelection();
|
||||
|
||||
}
|
||||
|
||||
@@ -148,7 +148,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
|
||||
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
|
||||
new AirbyteMessageTracker(),
|
||||
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
|
||||
metricReporter);
|
||||
metricReporter, featureFlags.applyFieldSelection());
|
||||
|
||||
log.info("Running replication worker...");
|
||||
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
|
||||
|
||||
@@ -459,6 +459,29 @@ public class AirbyteAcceptanceTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the normalized destination matches the input records, only expecting a single id
|
||||
* column.
|
||||
*
|
||||
* @param sourceRecords
|
||||
* @throws Exception
|
||||
*/
|
||||
public void assertNormalizedDestinationContainsIdColumn(final List<JsonNode> sourceRecords) throws Exception {
|
||||
final Database destination = getDestinationDatabase();
|
||||
final String finalDestinationTable = String.format("%spublic.%s%s", OUTPUT_NAMESPACE_PREFIX, OUTPUT_STREAM_PREFIX, STREAM_NAME.replace(".", "_"));
|
||||
final List<JsonNode> destinationRecords = retrieveSourceRecords(destination, finalDestinationTable);
|
||||
|
||||
assertEquals(sourceRecords.size(), destinationRecords.size(),
|
||||
String.format("destination contains: %s record. source contains: %s", sourceRecords.size(), destinationRecords.size()));
|
||||
|
||||
for (final JsonNode sourceStreamRecord : sourceRecords) {
|
||||
assertTrue(
|
||||
destinationRecords.stream()
|
||||
.anyMatch(r -> r.get(COLUMN_ID).asInt() == sourceStreamRecord.get(COLUMN_ID).asInt()),
|
||||
String.format("destination does not contain record:\n %s \n destination contains:\n %s\n", sourceStreamRecord, destinationRecords));
|
||||
}
|
||||
}
|
||||
|
||||
public ConnectionRead createConnection(final String name,
|
||||
final UUID sourceId,
|
||||
final UUID destinationId,
|
||||
@@ -496,6 +519,13 @@ public class AirbyteAcceptanceTestHarness {
|
||||
.scheduleData(newScheduleData));
|
||||
}
|
||||
|
||||
public void updateConnectionCatalog(final UUID connectionId, final AirbyteCatalog catalog) throws ApiException {
|
||||
apiClient.getConnectionApi().updateConnection(
|
||||
new ConnectionUpdate()
|
||||
.connectionId(connectionId)
|
||||
.syncCatalog(catalog));
|
||||
}
|
||||
|
||||
public DestinationRead createPostgresDestination(final boolean isLegacy) throws ApiException {
|
||||
return createDestination(
|
||||
"AccTestDestination-" + UUID.randomUUID(),
|
||||
|
||||
@@ -68,6 +68,7 @@ import io.airbyte.api.client.model.generated.OperatorType;
|
||||
import io.airbyte.api.client.model.generated.OperatorWebhook;
|
||||
import io.airbyte.api.client.model.generated.OperatorWebhook.WebhookTypeEnum;
|
||||
import io.airbyte.api.client.model.generated.OperatorWebhookDbtCloud;
|
||||
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
|
||||
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
|
||||
import io.airbyte.api.client.model.generated.SourceDefinitionIdWithWorkspaceId;
|
||||
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
|
||||
@@ -99,6 +100,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.impl.DSL;
|
||||
import org.jooq.impl.SQLDataType;
|
||||
@@ -1417,6 +1419,60 @@ class BasicAcceptanceTests {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(19)
|
||||
void testIncrementalDedupeSyncRemoveOneColumn() throws Exception {
|
||||
// !!! NOTE !!! this test relies on a feature flag that currently defaults to false. If you're
|
||||
// running these tests locally against an external deployment and this test is failing, make sure
|
||||
// the flag is enabled.
|
||||
// Specifically:
|
||||
// APPLY_FIELD_SELECTION=true
|
||||
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
|
||||
final UUID destinationId = testHarness.createPostgresDestination().getDestinationId();
|
||||
final UUID operationId = testHarness.createOperation().getOperationId();
|
||||
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
|
||||
final SyncMode syncMode = SyncMode.INCREMENTAL;
|
||||
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
|
||||
catalog.getStreams().forEach(s -> s.getConfig()
|
||||
.syncMode(syncMode)
|
||||
.cursorField(List.of(COLUMN_ID))
|
||||
.destinationSyncMode(destinationSyncMode)
|
||||
.primaryKey(List.of(List.of(COLUMN_ID))));
|
||||
final UUID connectionId =
|
||||
testHarness.createConnection(TEST_CONNECTION, sourceId, destinationId, List.of(operationId), catalog, ConnectionScheduleType.MANUAL, null)
|
||||
.getConnectionId();
|
||||
|
||||
// sync from start
|
||||
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
|
||||
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
|
||||
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
|
||||
|
||||
testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE);
|
||||
|
||||
// Update the catalog, so we only select the id column.
|
||||
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("id"));
|
||||
testHarness.updateConnectionCatalog(connectionId, catalog);
|
||||
|
||||
// add new records and run again.
|
||||
final Database source = testHarness.getSourceDatabase();
|
||||
final List<JsonNode> expectedRawRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
|
||||
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'mike')"));
|
||||
source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')"));
|
||||
// The expected new raw records should only have the ID column.
|
||||
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).build()));
|
||||
expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).build()));
|
||||
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
|
||||
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
|
||||
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
|
||||
|
||||
// For the normalized records, they should all only have the ID column.
|
||||
final List<JsonNode> expectedNormalizedRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME).stream()
|
||||
.map((record) -> ((ObjectNode) record).retain(COLUMN_ID)).collect(Collectors.toList());
|
||||
|
||||
testHarness.assertRawDestinationContains(expectedRawRecords, new SchemaTableNamePair(PUBLIC, STREAM_NAME));
|
||||
testHarness.assertNormalizedDestinationContainsIdColumn(expectedNormalizedRecords);
|
||||
}
|
||||
|
||||
private void assertStreamStateContainsStream(final UUID connectionId, final List<StreamDescriptor> expectedStreamDescriptors) throws ApiException {
|
||||
final ConnectionState state = apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
|
||||
final List<StreamDescriptor> streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList();
|
||||
|
||||
@@ -304,7 +304,7 @@ public class ReplicationActivityImpl implements ReplicationActivity {
|
||||
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
|
||||
new AirbyteMessageTracker(),
|
||||
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
|
||||
metricReporter);
|
||||
metricReporter, false);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,9 @@ check_success() {
|
||||
echo "Starting app..."
|
||||
|
||||
# Detach so we can run subsequent commands
|
||||
VERSION=dev TRACKING_STRATEGY=logging USE_STREAM_CAPABLE_STATE=true BASIC_AUTH_USERNAME="" BASIC_AUTH_PASSWORD="" docker-compose -f docker-compose.yaml -f docker-compose.acceptance-test.yaml up -d
|
||||
# NOTE: this passes APPLY_FIELD_SELECTION=true, which enables a feature -- field selection -- which is currently disabled by default.
|
||||
# We want to run our CI tests against the new feature while we prepare to release it.
|
||||
VERSION=dev TRACKING_STRATEGY=logging USE_STREAM_CAPABLE_STATE=true BASIC_AUTH_USERNAME="" BASIC_AUTH_PASSWORD="" APPLY_FIELD_SELECTION=true docker-compose -f docker-compose.yaml -f docker-compose.acceptance-test.yaml up -d
|
||||
|
||||
# Sometimes source/dest containers using airbyte volumes survive shutdown, which need to be killed in order to shut down properly.
|
||||
shutdown_cmd="docker-compose down -v || docker kill \$(docker ps -a -f volume=airbyte_workspace -f volume=airbyte_data -f volume=airbyte_db -q) && docker-compose down -v"
|
||||
|
||||
Reference in New Issue
Block a user