1
0
mirror of synced 2026-01-04 18:04:31 -05:00

Simple default replication worker refactor (#19002)

Logic in this class is going to have to change as part of two big upcoming projects:
- column selection
- progress bars

To prepare for this, I've gone ahead and refactored the run method for readability. This is a monster function. The current function is too long and contains several operational abstractions, increasing unnecessary complexity. This is the core of what we do, so it's important to ensure this code is extremely understandable.

Ultimately we want to probably want to break the run method up into two or more separate classes - one that deals with replication and one that deals with outputs - for better testing, readability and isolation. This sets the stage for that.

I have intentionally NOT removed or touched any logic, nor have I put thought into consolidating the function signatures to preserve as much of the pre-existing logic and keep the changeset small and reviewable.

This changeset only renames and moves code around.
This commit is contained in:
Davin Chia
2022-11-07 16:43:20 -08:00
committed by GitHub
parent 2b52b53ece
commit 594cf29bbb

View File

@@ -9,6 +9,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
@@ -139,7 +140,6 @@ public class DefaultReplicationWorker implements ReplicationWorker {
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));
final ThreadedTimeTracker timeTracker = new ThreadedTimeTracker();
final long startTime = System.currentTimeMillis();
timeTracker.trackReplicationStartTime();
final AtomicReference<FailureReason> replicationRunnableFailureRef = new AtomicReference<>();
@@ -152,185 +152,11 @@ public class DefaultReplicationWorker implements ReplicationWorker {
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
final WorkerSourceConfig sourceConfig = WorkerUtils.syncToWorkerSourceConfig(syncInput);
final Map<String, String> mdc = MDC.getCopyOfContextMap();
ApmTraceUtils.addTagsToTrace(generateTraceTags(destinationConfig, jobRoot));
// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
destination.start(destinationConfig, jobRoot);
timeTracker.trackSourceReadStartTime();
source.start(sourceConfig, jobRoot);
timeTracker.trackDestinationWriteStartTime();
// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeTracker),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof DestinationException) {
destinationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
} else {
destinationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt));
}
}
});
final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
replicationRunnableFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt));
} else if (ex.getCause() instanceof DestinationException) {
replicationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
} else {
replicationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt));
}
}
});
LOGGER.info("Waiting for source and destination threads to complete.");
// CompletableFuture#allOf waits until all futures finish before returning, even if one throws an
// exception. So in order to handle exceptions from a future immediately without needing to wait for
// the other future to finish, we first call CompletableFuture#anyOf.
CompletableFuture.anyOf(replicationThreadFuture, destinationOutputThreadFuture).get();
LOGGER.info("One of source or destination thread complete. Waiting on the other.");
CompletableFuture.allOf(replicationThreadFuture, destinationOutputThreadFuture).get();
LOGGER.info("Source and destination threads complete.");
} catch (final Exception e) {
hasFailed.set(true);
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Sync worker failed.", e);
} finally {
executors.shutdownNow();
}
final ReplicationStatus outputStatus;
// First check if the process was cancelled. Cancellation takes precedence over failures.
if (cancelled.get()) {
outputStatus = ReplicationStatus.CANCELLED;
}
// if the process was not cancelled but still failed, then it's an actual failure
else if (hasFailed.get()) {
outputStatus = ReplicationStatus.FAILED;
} else {
outputStatus = ReplicationStatus.COMPLETED;
}
replicate(jobRoot, destinationConfig, timeTracker, replicationRunnableFailureRef, destinationRunnableFailureRef, sourceConfig);
timeTracker.trackReplicationEndTime();
final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted())
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withReplicationStartTime(timeTracker.getReplicationStartTime())
.withReplicationEndTime(timeTracker.getReplicationEndTime())
.withSourceReadStartTime(timeTracker.getSourceReadStartTime())
.withSourceReadEndTime(timeTracker.getSourceReadEndTime())
.withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime())
.withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime());
if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
} else if (messageTracker.getTotalRecordsCommitted().isPresent()) {
totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get());
} else {
LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null");
totalSyncStats.setRecordsCommitted(null);
}
// assume every stream with stats is in streamToEmittedRecords map
final List<StreamSyncStats> streamSyncStats = messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> {
final SyncStats syncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
.withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
.withSourceStateMessagesEmitted(null)
.withDestinationStateMessagesEmitted(null);
if (outputStatus == ReplicationStatus.COMPLETED) {
syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream));
} else if (messageTracker.getStreamToCommittedRecords().isPresent()) {
syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream));
} else {
syncStats.setRecordsCommitted(null);
}
return new StreamSyncStats()
.withStreamName(stream)
.withStats(syncStats);
}).collect(Collectors.toList());
final ReplicationAttemptSummary summary = new ReplicationAttemptSummary()
.withStatus(outputStatus)
.withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted
.withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted
.withTotalStats(totalSyncStats)
.withStreamStats(streamSyncStats)
.withStartTime(startTime)
.withEndTime(System.currentTimeMillis());
final ReplicationOutput output = new ReplicationOutput()
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());
// only .setFailures() if a failure occurred or if there is an AirbyteErrorTraceMessage
final FailureReason sourceFailure = replicationRunnableFailureRef.get();
final FailureReason destinationFailure = destinationRunnableFailureRef.get();
final FailureReason traceMessageFailure = messageTracker.errorTraceMessageFailure(Long.valueOf(jobId), attempt);
final List<FailureReason> failures = new ArrayList<>();
if (traceMessageFailure != null) {
failures.add(traceMessageFailure);
}
if (sourceFailure != null) {
failures.add(sourceFailure);
}
if (destinationFailure != null) {
failures.add(destinationFailure);
}
if (!failures.isEmpty()) {
output.setFailures(failures);
}
if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
LOGGER.info("Source did not output any state messages");
}
if (messageTracker.getDestinationOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState());
final State state = messageTracker.getDestinationOutputState().get();
output.withState(state);
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
output.withState(syncInput.getState());
} else {
LOGGER.warn("State capture: No state retained.");
}
if (messageTracker.getUnreliableStateTimingMetrics()) {
metricReporter.trackStateMetricTrackerError();
}
final ObjectMapper mapper = new ObjectMapper();
LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary));
LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures));
LineGobbler.endSection("REPLICATION");
return output;
return getReplicationOutput(syncInput, destinationConfig, replicationRunnableFailureRef, destinationRunnableFailureRef, timeTracker);
} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException("Sync failed", e);
@@ -338,16 +164,125 @@ public class DefaultReplicationWorker implements ReplicationWorker {
}
private void replicate(Path jobRoot,
WorkerDestinationConfig destinationConfig,
ThreadedTimeTracker timeTracker,
AtomicReference<FailureReason> replicationRunnableFailureRef,
AtomicReference<FailureReason> destinationRunnableFailureRef,
WorkerSourceConfig sourceConfig) {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
destination.start(destinationConfig, jobRoot);
timeTracker.trackSourceReadStartTime();
source.start(sourceConfig, jobRoot);
timeTracker.trackDestinationWriteStartTime();
// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
// thrown
final CompletableFuture<?> readFromDstThread = CompletableFuture.runAsync(
readFromDstRunnable(destination, cancelled, messageTracker, mdc, timeTracker),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof DestinationException) {
destinationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
} else {
destinationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt));
}
}
});
final CompletableFuture<?> readSrcAndWriteDstThread = CompletableFuture.runAsync(
readFromSrcAndWriteToDstRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter,
timeTracker),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
replicationRunnableFailureRef.set(FailureHelper.sourceFailure(ex, Long.valueOf(jobId), attempt));
} else if (ex.getCause() instanceof DestinationException) {
replicationRunnableFailureRef.set(FailureHelper.destinationFailure(ex, Long.valueOf(jobId), attempt));
} else {
replicationRunnableFailureRef.set(FailureHelper.replicationFailure(ex, Long.valueOf(jobId), attempt));
}
}
});
LOGGER.info("Waiting for source and destination threads to complete.");
// CompletableFuture#allOf waits until all futures finish before returning, even if one throws an
// exception. So in order to handle exceptions from a future immediately without needing to wait for
// the other future to finish, we first call CompletableFuture#anyOf.
CompletableFuture.anyOf(readSrcAndWriteDstThread, readFromDstThread).get();
LOGGER.info("One of source or destination thread complete. Waiting on the other.");
CompletableFuture.allOf(readSrcAndWriteDstThread, readFromDstThread).get();
LOGGER.info("Source and destination threads complete.");
} catch (final Exception e) {
hasFailed.set(true);
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Sync worker failed.", e);
} finally {
executors.shutdownNow();
}
}
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteDestination destination,
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter,
final ThreadedTimeTracker timeHolder) {
private static Runnable readFromDstRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
try {
while (!cancelled.get() && !destination.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
try {
messageOptional = destination.attemptRead();
} catch (final Exception e) {
throw new DestinationException("Destination process read attempt failed", e);
}
if (messageOptional.isPresent()) {
LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get());
messageTracker.acceptFromDestination(messageOptional.get());
}
}
timeHolder.trackDestinationWriteEndTime();
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
} catch (final Exception e) {
if (!cancelled.get()) {
// Although this thread is closed first, it races with the destination's closure and can attempt one
// final read after the destination is closed before it's terminated.
// This read will fail and throw an exception. Because of this, throw exceptions only if the worker
// was not cancelled.
if (e instanceof DestinationException) {
// Surface Destination exceptions directly so that they can be classified properly by the worker
throw e;
} else {
throw new RuntimeException(e);
}
}
}
};
}
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource source,
final AirbyteDestination destination,
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter,
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
@@ -427,6 +362,159 @@ public class DefaultReplicationWorker implements ReplicationWorker {
};
}
private ReplicationOutput getReplicationOutput(StandardSyncInput syncInput,
WorkerDestinationConfig destinationConfig,
AtomicReference<FailureReason> replicationRunnableFailureRef,
AtomicReference<FailureReason> destinationRunnableFailureRef,
ThreadedTimeTracker timeTracker)
throws JsonProcessingException {
final ReplicationStatus outputStatus;
// First check if the process was cancelled. Cancellation takes precedence over failures.
if (cancelled.get()) {
outputStatus = ReplicationStatus.CANCELLED;
}
// if the process was not cancelled but still failed, then it's an actual failure
else if (hasFailed.get()) {
outputStatus = ReplicationStatus.FAILED;
} else {
outputStatus = ReplicationStatus.COMPLETED;
}
final SyncStats totalSyncStats = getTotalStats(timeTracker, outputStatus);
final List<StreamSyncStats> streamSyncStats = getPerStreamStats(outputStatus);
final ReplicationAttemptSummary summary = new ReplicationAttemptSummary()
.withStatus(outputStatus)
.withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted
.withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted
.withTotalStats(totalSyncStats)
.withStreamStats(streamSyncStats)
.withStartTime(timeTracker.getReplicationStartTime())
.withEndTime(System.currentTimeMillis());
final ReplicationOutput output = new ReplicationOutput()
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());
final List<FailureReason> failures = getFailureReasons(replicationRunnableFailureRef, destinationRunnableFailureRef,
output);
prepStateForLaterSaving(syncInput, output);
final ObjectMapper mapper = new ObjectMapper();
LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary));
LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures));
LineGobbler.endSection("REPLICATION");
return output;
}
private SyncStats getTotalStats(ThreadedTimeTracker timeTracker, ReplicationStatus outputStatus) {
final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted())
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
.withReplicationStartTime(timeTracker.getReplicationStartTime())
.withReplicationEndTime(timeTracker.getReplicationEndTime())
.withSourceReadStartTime(timeTracker.getSourceReadStartTime())
.withSourceReadEndTime(timeTracker.getSourceReadEndTime())
.withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime())
.withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime());
if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
} else if (messageTracker.getTotalRecordsCommitted().isPresent()) {
totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get());
} else {
LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null");
totalSyncStats.setRecordsCommitted(null);
}
return totalSyncStats;
}
private List<StreamSyncStats> getPerStreamStats(ReplicationStatus outputStatus) {
// assume every stream with stats is in streamToEmittedRecords map
return messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> {
final SyncStats syncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
.withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
.withSourceStateMessagesEmitted(null)
.withDestinationStateMessagesEmitted(null);
if (outputStatus == ReplicationStatus.COMPLETED) {
syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream));
} else if (messageTracker.getStreamToCommittedRecords().isPresent()) {
syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream));
} else {
syncStats.setRecordsCommitted(null);
}
return new StreamSyncStats()
.withStreamName(stream)
.withStats(syncStats);
}).collect(Collectors.toList());
}
/**
* Extracts state out to the {@link ReplicationOutput} so it can be later saved in the
* PersistStateActivity - State is NOT SAVED here.
*
* @param syncInput
* @param output
*/
private void prepStateForLaterSaving(StandardSyncInput syncInput, ReplicationOutput output) {
if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
LOGGER.info("Source did not output any state messages");
}
if (messageTracker.getDestinationOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState());
final State state = messageTracker.getDestinationOutputState().get();
output.withState(state);
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
output.withState(syncInput.getState());
} else {
LOGGER.warn("State capture: No state retained.");
}
if (messageTracker.getUnreliableStateTimingMetrics()) {
metricReporter.trackStateMetricTrackerError();
}
}
private List<FailureReason> getFailureReasons(AtomicReference<FailureReason> replicationRunnableFailureRef,
AtomicReference<FailureReason> destinationRunnableFailureRef,
ReplicationOutput output) {
// only .setFailures() if a failure occurred or if there is an AirbyteErrorTraceMessage
final FailureReason sourceFailure = replicationRunnableFailureRef.get();
final FailureReason destinationFailure = destinationRunnableFailureRef.get();
final FailureReason traceMessageFailure = messageTracker.errorTraceMessageFailure(Long.valueOf(jobId), attempt);
final List<FailureReason> failures = new ArrayList<>();
if (traceMessageFailure != null) {
failures.add(traceMessageFailure);
}
if (sourceFailure != null) {
failures.add(sourceFailure);
}
if (destinationFailure != null) {
failures.add(destinationFailure);
}
if (!failures.isEmpty()) {
output.setFailures(failures);
}
return failures;
}
private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
@@ -457,50 +545,6 @@ public class DefaultReplicationWorker implements ReplicationWorker {
}
}
@SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final ThreadedTimeTracker timeHolder) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Destination output thread started.");
try {
while (!cancelled.get() && !destination.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
try {
messageOptional = destination.attemptRead();
} catch (final Exception e) {
throw new DestinationException("Destination process read attempt failed", e);
}
if (messageOptional.isPresent()) {
LOGGER.info("State in DefaultReplicationWorker from destination: {}", messageOptional.get());
messageTracker.acceptFromDestination(messageOptional.get());
}
}
timeHolder.trackDestinationWriteEndTime();
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
} catch (final Exception e) {
if (!cancelled.get()) {
// Although this thread is closed first, it races with the destination's closure and can attempt one
// final read after the destination is closed before it's terminated.
// This read will fail and throw an exception. Because of this, throw exceptions only if the worker
// was not cancelled.
if (e instanceof DestinationException) {
// Surface Destination exceptions directly so that they can be classified properly by the worker
throw e;
} else {
throw new RuntimeException(e);
}
}
}
};
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() {