From 9d30bb0e0d38d1ef2e4bdd7fc7bd0d4aaa295f71 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 27 Oct 2021 17:03:06 -0700 Subject: [PATCH] [5889] Bmoric/log application name for all workers [2/2] (#7268) This is adding the application name in the logs produced by the docker containers. This is related to #5889. --- .../io/airbyte/commons/io/LineGobbler.java | 30 +++++++-- .../commons/logging/LoggingHelper.java | 5 +- .../io/airbyte/commons/logging/MdcScope.java | 40 +++++++++++- .../airbyte/commons/logging/MdcScopeTest.java | 7 +-- .../workers/DbtTransformationRunner.java | 13 +++- .../DefaultNormalizationRunner.java | 11 +++- .../airbyte/DefaultAirbyteDestination.java | 11 +++- .../airbyte/DefaultAirbyteSource.java | 12 +++- .../airbyte/DefaultAirbyteStreamFactory.java | 20 ++++-- .../DefaultNormalizationRunnerTest.java | 51 ++++++++++++++- .../DefaultAirbyteDestinationTest.java | 56 +++++++++++++++++ .../airbyte/DefaultAirbyteSourceTest.java | 62 +++++++++++++++++++ .../DefaultAirbyteStreamFactoryTest.java | 3 +- 13 files changed, 293 insertions(+), 28 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java index 29e055ce0de..6e87ce1f451 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java @@ -5,6 +5,7 @@ package io.airbyte.commons.io; import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.logging.MdcScope; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -21,13 +22,17 @@ public class LineGobbler implements VoidCallable { private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class); public static void gobble(final InputStream is, final Consumer consumer) { - gobble(is, consumer, "generic"); + gobble(is, consumer, "generic", MdcScope.DEFAULT); } - public static void gobble(final InputStream is, final Consumer consumer, final String caller) { + public static void gobble(final InputStream is, final Consumer consumer, final MdcScope mdcScope) { + gobble(is, consumer, "generic", mdcScope); + } + + public static void gobble(final InputStream is, final Consumer consumer, final String caller, final MdcScope mdcScope) { final ExecutorService executor = Executors.newSingleThreadExecutor(); final Map mdc = MDC.getCopyOfContextMap(); - final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller); + final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller, mdcScope); executor.submit(gobbler); } @@ -36,24 +41,35 @@ public class LineGobbler implements VoidCallable { private final ExecutorService executor; private final Map mdc; private final String caller; + private final MdcScope containerLogMDC; LineGobbler(final InputStream is, final Consumer consumer, final ExecutorService executor, final Map mdc) { - this(is, consumer, executor, mdc, "generic"); + this(is, consumer, executor, mdc, "generic", MdcScope.DEFAULT); } LineGobbler(final InputStream is, final Consumer consumer, final ExecutorService executor, final Map mdc, - final String caller) { + final MdcScope mdcScope) { + this(is, consumer, executor, mdc, "generic", mdcScope); + } + + LineGobbler(final InputStream is, + final Consumer consumer, + final ExecutorService executor, + final Map mdc, + final String caller, + final MdcScope mdcScope) { this.is = IOs.newBufferedReader(is); this.consumer = consumer; this.executor = executor; this.mdc = mdc; this.caller = caller; + this.containerLogMDC = mdcScope; } @Override @@ -62,7 +78,9 @@ public class LineGobbler implements VoidCallable { try { String line; while ((line = is.readLine()) != null) { - consumer.accept(line); + try (containerLogMDC) { + consumer.accept(line); + } } } catch (final IOException i) { LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage()); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java index 1975461edbe..bf29cec7e59 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java @@ -4,6 +4,8 @@ package io.airbyte.commons.logging; +import com.google.common.annotations.VisibleForTesting; + public class LoggingHelper { public enum Color { @@ -31,7 +33,8 @@ public class LoggingHelper { public static final String LOG_SOURCE_MDC_KEY = "log_source"; - private static final String RESET = "\u001B[0m"; + @VisibleForTesting + public static final String RESET = "\u001B[0m"; public static String applyColor(final Color color, final String msg) { return color.getCode() + msg + RESET; diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java b/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java index 82f71c987a9..190c1e9ebb1 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java @@ -4,7 +4,10 @@ package io.airbyte.commons.logging; +import io.airbyte.commons.logging.LoggingHelper.Color; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.slf4j.MDC; /** @@ -25,6 +28,8 @@ import org.slf4j.MDC; */ public class MdcScope implements AutoCloseable { + public final static MdcScope DEFAULT = new Builder().build(); + private final Map originalContextMap; public MdcScope(final Map keyValuesToAdd) { @@ -35,8 +40,41 @@ public class MdcScope implements AutoCloseable { } @Override - public void close() throws Exception { + public void close() { MDC.setContextMap(originalContextMap); } + public static class Builder { + + private Optional maybeLogPrefix = Optional.empty(); + private Optional maybePrefixColor = Optional.empty(); + + public Builder setLogPrefix(final String logPrefix) { + this.maybeLogPrefix = Optional.ofNullable(logPrefix); + + return this; + } + + public Builder setPrefixColor(final Color color) { + this.maybePrefixColor = Optional.ofNullable(color); + + return this; + } + + public MdcScope build() { + final Map extraMdcEntries = new HashMap<>(); + + maybeLogPrefix.stream().forEach(logPrefix -> { + final String potentiallyColoredLog = maybePrefixColor + .map(color -> LoggingHelper.applyColor(color, logPrefix)) + .orElse(logPrefix); + + extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog); + }); + + return new MdcScope(extraMdcEntries); + } + + } + } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/logging/MdcScopeTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/logging/MdcScopeTest.java index 4d985d5a6c6..e52901e65e5 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/logging/MdcScopeTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/logging/MdcScopeTest.java @@ -54,17 +54,14 @@ public class MdcScopeTest { }); - } catch (final Exception e) { - e.printStackTrace(); } } @Test @DisplayName("The MDC context is properly restored") public void testMDCRestore() { - try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {} catch (final Exception e) { - e.printStackTrace(); - } + try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {} + final Map mdcState = MDC.getCopyOfContextMap(); Assertions.assertThat(mdcState).containsAllEntriesOf(originalMap); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java index 6e09ef9bceb..249d5eae4a8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java @@ -8,6 +8,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.LineGobbler; +import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.OperatorDbt; import io.airbyte.config.ResourceRequirements; @@ -28,6 +31,10 @@ public class DbtTransformationRunner implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class); private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh"; + private static final MdcScope CONTAINER_LOG_MDC = new Builder() + .setLogPrefix("dbt") + .setPrefixColor(Color.CYAN) + .build(); private final ProcessFactory processFactory; private final NormalizationRunner normalizationRunner; @@ -48,7 +55,7 @@ public class DbtTransformationRunner implements AutoCloseable { * transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend * on the NormalizationRunner to configure the dbt project with the appropriate destination settings * and pull the custom git repository into the workspace. - * + *

* Once the workspace folder/files is setup to run, we invoke the custom transformation command as * provided by the user to execute whatever extra transformation has been implemented. */ @@ -87,8 +94,8 @@ public class DbtTransformationRunner implements AutoCloseable { Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP), dbtArguments); - LineGobbler.gobble(process.getInputStream(), LOGGER::info); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error); + LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC); + LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC); WorkerUtils.wait(process); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 63f12fe2802..39cbc52562d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -10,6 +10,9 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.config.OperatorDbt; import io.airbyte.config.ResourceRequirements; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -27,6 +30,10 @@ import org.slf4j.LoggerFactory; public class DefaultNormalizationRunner implements NormalizationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class); + private static final MdcScope CONTAINER_LOG_MDC = new Builder() + .setLogPrefix("normalization") + .setPrefixColor(Color.GREEN) + .build(); private final DestinationType destinationType; private final ProcessFactory processFactory; @@ -109,8 +116,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner { process = processFactory.create(jobId, attempt, jobRoot, normalizationImageName, false, files, null, resourceRequirements, Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args); - LineGobbler.gobble(process.getInputStream(), LOGGER::info); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error); + LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC); + LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC); WorkerUtils.wait(process); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index 9bf76989307..de4bacfcdc9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -9,6 +9,9 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.config.WorkerDestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -30,6 +33,10 @@ import org.slf4j.LoggerFactory; public class DefaultAirbyteDestination implements AirbyteDestination { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class); + private static final MdcScope CONTAINER_LOG_MDC = new Builder() + .setLogPrefix("destination") + .setPrefixColor(Color.MAGENTA) + .build(); private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; @@ -41,7 +48,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination { private Iterator messageIterator = null; public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher) { - this(integrationLauncher, new DefaultAirbyteStreamFactory()); + this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC)); } @@ -63,7 +70,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination { WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(destinationConfig.getCatalog())); // stdout logs are logged elsewhere since stdout also contains data - LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination"); + LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination", CONTAINER_LOG_MDC); writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index 2b17b4050fd..fadf68e44ef 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -9,6 +9,9 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.config.WorkerSourceConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -35,6 +38,11 @@ public class DefaultAirbyteSource implements AirbyteSource { private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS); private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES); + private static final MdcScope CONTAINER_LOG_MDC = new Builder() + .setLogPrefix("source") + .setPrefixColor(Color.BLUE) + .build(); + private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; private final HeartbeatMonitor heartbeatMonitor; @@ -43,7 +51,7 @@ public class DefaultAirbyteSource implements AirbyteSource { private Iterator messageIterator = null; public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { - this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); + this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); } @VisibleForTesting @@ -67,7 +75,7 @@ public class DefaultAirbyteSource implements AirbyteSource { sourceConfig.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME, sourceConfig.getState() == null ? null : Jsons.serialize(sourceConfig.getState().getState())); // stdout logs are logged elsewhere since stdout also contains data - LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source"); + LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source", CONTAINER_LOG_MDC); messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream())) .peek(message -> heartbeatMonitor.beat()) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactory.java index 69b32f64be9..e0c8c89e9da 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactory.java @@ -6,6 +6,8 @@ package io.airbyte.workers.protocols.airbyte; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.protocol.models.AirbyteLogMessage; import io.airbyte.protocol.models.AirbyteMessage; import java.io.BufferedReader; @@ -28,16 +30,22 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteStreamFactory.class); + private final MdcScope containerLogMDC; private final AirbyteProtocolPredicate protocolValidator; private final Logger logger; public DefaultAirbyteStreamFactory() { - this(new AirbyteProtocolPredicate(), LOGGER); + this(new Builder().build()); } - DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger) { + public DefaultAirbyteStreamFactory(final MdcScope containerLogMDC) { + this(new AirbyteProtocolPredicate(), LOGGER, containerLogMDC); + } + + DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger, final MdcScope containerLogMDC) { protocolValidator = protocolPredicate; this.logger = logger; + this.containerLogMDC = containerLogMDC; } @Override @@ -50,7 +58,9 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory { // we log as info all the lines that are not valid json // some sources actually log their process on stdout, we // want to make sure this info is available in the logs. - logger.info(line); + try (containerLogMDC) { + logger.info(line); + } } return jsonLine.stream(); }) @@ -73,7 +83,9 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory { .filter(airbyteMessage -> { final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG; if (isLog) { - internalLog(airbyteMessage.getLog()); + try (containerLogMDC) { + internalLog(airbyteMessage.getLog()); + } } return !isLog; }); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index 3c70af24a5f..554f91ad90a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -4,7 +4,9 @@ package io.airbyte.workers.normalization; -import static org.junit.jupiter.api.Assertions.*; +import static io.airbyte.commons.logging.LoggingHelper.RESET; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -12,7 +14,10 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; +import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerException; @@ -25,6 +30,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -33,6 +40,17 @@ class DefaultNormalizationRunnerTest { private static final String JOB_ID = "0"; private static final int JOB_ATTEMPT = 0; + private static Path logJobRoot; + + static { + try { + logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test"); + LogClientSingleton.setJobMdc(logJobRoot); + } catch (final IOException e) { + e.printStackTrace(); + } + } + private Path jobRoot; private ProcessFactory processFactory; private Process process; @@ -64,6 +82,16 @@ class DefaultNormalizationRunnerTest { when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("hello".getBytes())); } + @AfterEach + public void tearDown() throws IOException { + // The log file needs to be present and empty + final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + if (Files.exists(logFile)) { + Files.delete(logFile); + } + Files.createFile(logFile); + } + @Test void test() throws Exception { final NormalizationRunner runner = @@ -74,6 +102,27 @@ class DefaultNormalizationRunnerTest { assertTrue(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS)); } + @Test + void testLog() throws Exception { + + final NormalizationRunner runner = + new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + + when(process.exitValue()).thenReturn(0); + + assertTrue(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS)); + + final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + final Stream logs = IOs.readFile(logPath).lines(); + + logs + .filter(line -> !line.contains("EnvConfigs(getEnvOrDefault)")) + .forEach(line -> { + org.assertj.core.api.Assertions.assertThat(line) + .startsWith(Color.GREEN.getCode() + "normalization" + RESET); + }); + } + @Test public void testClose() throws Exception { when(process.isAlive()).thenReturn(true).thenReturn(false); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java index bbf38be85d4..52d0b958fa7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.protocols.airbyte; +import static io.airbyte.commons.logging.LoggingHelper.RESET; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -14,8 +15,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.Lists; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.workers.TestConfigHelpers; import io.airbyte.workers.WorkerConstants; @@ -31,6 +35,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.List; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,6 +55,17 @@ class DefaultAirbyteDestinationTest { AirbyteMessageUtils.createStateMessage("checkpoint", "1"), AirbyteMessageUtils.createStateMessage("checkpoint", "2")); + private static Path logJobRoot; + + static { + try { + logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test"); + LogClientSingleton.setJobMdc(logJobRoot); + } catch (final IOException e) { + e.printStackTrace(); + } + } + private Path jobRoot; private IntegrationLauncher integrationLauncher; private Process process; @@ -81,6 +98,16 @@ class DefaultAirbyteDestinationTest { streamFactory = noop -> MESSAGES.stream(); } + @AfterEach + public void tearDown() throws IOException { + // The log file needs to be present and empty + final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + if (Files.exists(logFile)) { + Files.delete(logFile); + } + Files.createFile(logFile); + } + @SuppressWarnings("BusyWait") @Test public void testSuccessfulLifecycle() throws Exception { @@ -120,6 +147,35 @@ class DefaultAirbyteDestinationTest { verify(process).exitValue(); } + @Test + public void testTaggedLogs() throws Exception { + + final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher, streamFactory); + destination.start(DESTINATION_CONFIG, jobRoot); + + final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"); + destination.accept(recordMessage); + + final List messages = Lists.newArrayList(); + + messages.add(destination.attemptRead().get()); + messages.add(destination.attemptRead().get()); + + when(process.isAlive()).thenReturn(false); + + destination.notifyEndOfStream(); + + destination.close(); + + final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + final Stream logs = IOs.readFile(logPath).lines(); + + logs.forEach(line -> { + org.assertj.core.api.Assertions.assertThat(line) + .startsWith(Color.MAGENTA.getCode() + "destination" + RESET); + }); + } + @Test public void testCloseNotifiesLifecycle() throws Exception { final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index 4f296984a50..412f4ec3c39 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.protocols.airbyte; +import static io.airbyte.commons.logging.LoggingHelper.RESET; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -16,9 +17,12 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.LoggingHelper.Color; import io.airbyte.config.State; import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -36,6 +40,8 @@ import java.nio.file.Path; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,6 +71,17 @@ class DefaultAirbyteSourceTest { AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"), AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "yellow")); + private static Path logJobRoot; + + static { + try { + logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test"); + LogClientSingleton.setJobMdc(logJobRoot); + } catch (final IOException e) { + e.printStackTrace(); + } + } + private Path jobRoot; private IntegrationLauncher integrationLauncher; private Process process; @@ -92,11 +109,25 @@ class DefaultAirbyteSourceTest { when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8))); streamFactory = noop -> MESSAGES.stream(); + + LogClientSingleton.setJobMdc(logJobRoot); + } + + @AfterEach + public void tearDown() throws IOException { + // The log file needs to be present and empty + final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + if (Files.exists(logFile)) { + Files.delete(logFile); + } + Files.createFile(logFile); } @SuppressWarnings({"OptionalGetWithoutIsPresent", "BusyWait"}) @Test public void testSuccessfulLifecycle() throws Exception { + when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8))); + when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); @@ -127,6 +158,37 @@ class DefaultAirbyteSourceTest { verify(process).exitValue(); } + @Test + public void testTaggedLogs() throws Exception { + + when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(("rewq").getBytes(StandardCharsets.UTF_8))); + + when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); + + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, + heartbeatMonitor); + source.start(SOURCE_CONFIG, jobRoot); + + final List messages = Lists.newArrayList(); + + messages.add(source.attemptRead().get()); + messages.add(source.attemptRead().get()); + + when(process.isAlive()).thenReturn(false); + + source.close(); + + final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); + final Stream logs = IOs.readFile(logPath).lines(); + + logs + .filter(line -> !line.contains("EnvConfigs(getEnvOrDefault)")) + .forEach(line -> { + org.assertj.core.api.Assertions.assertThat(line) + .startsWith(Color.BLUE.getCode() + "source" + RESET); + }); + } + @Test public void testNonzeroExitCodeThrows() throws Exception { final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactoryTest.java index 74077fb1105..596f9e0f11f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactoryTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteStreamFactoryTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.protocol.models.AirbyteLogMessage; import io.airbyte.protocol.models.AirbyteMessage; import java.io.BufferedReader; @@ -120,7 +121,7 @@ class DefaultAirbyteStreamFactoryTest { private Stream stringToMessageStream(final String inputString) { final InputStream inputStream = new ByteArrayInputStream(inputString.getBytes()); final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - return new DefaultAirbyteStreamFactory(protocolPredicate, logger).create(bufferedReader); + return new DefaultAirbyteStreamFactory(protocolPredicate, logger, new Builder().build()).create(bufferedReader); } }