1
0
mirror of synced 2026-01-05 21:02:13 -05:00

[5889] Bmoric/log application name for all workers [2/2] (#7268)

This is adding the application name in the logs produced by the docker containers.

This is related to #5889.
This commit is contained in:
Benoit Moriceau
2021-10-27 17:03:06 -07:00
committed by GitHub
parent aa2caf1355
commit 9d30bb0e0d
13 changed files with 293 additions and 28 deletions

View File

@@ -5,6 +5,7 @@
package io.airbyte.commons.io;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.logging.MdcScope;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -21,13 +22,17 @@ public class LineGobbler implements VoidCallable {
private final static Logger LOGGER = LoggerFactory.getLogger(LineGobbler.class);
public static void gobble(final InputStream is, final Consumer<String> consumer) {
gobble(is, consumer, "generic");
gobble(is, consumer, "generic", MdcScope.DEFAULT);
}
public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller) {
public static void gobble(final InputStream is, final Consumer<String> consumer, final MdcScope mdcScope) {
gobble(is, consumer, "generic", mdcScope);
}
public static void gobble(final InputStream is, final Consumer<String> consumer, final String caller, final MdcScope mdcScope) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Map<String, String> mdc = MDC.getCopyOfContextMap();
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller);
final var gobbler = new LineGobbler(is, consumer, executor, mdc, caller, mdcScope);
executor.submit(gobbler);
}
@@ -36,24 +41,35 @@ public class LineGobbler implements VoidCallable {
private final ExecutorService executor;
private final Map<String, String> mdc;
private final String caller;
private final MdcScope containerLogMDC;
LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc) {
this(is, consumer, executor, mdc, "generic");
this(is, consumer, executor, mdc, "generic", MdcScope.DEFAULT);
}
LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller) {
final MdcScope mdcScope) {
this(is, consumer, executor, mdc, "generic", mdcScope);
}
LineGobbler(final InputStream is,
final Consumer<String> consumer,
final ExecutorService executor,
final Map<String, String> mdc,
final String caller,
final MdcScope mdcScope) {
this.is = IOs.newBufferedReader(is);
this.consumer = consumer;
this.executor = executor;
this.mdc = mdc;
this.caller = caller;
this.containerLogMDC = mdcScope;
}
@Override
@@ -62,7 +78,9 @@ public class LineGobbler implements VoidCallable {
try {
String line;
while ((line = is.readLine()) != null) {
consumer.accept(line);
try (containerLogMDC) {
consumer.accept(line);
}
}
} catch (final IOException i) {
LOGGER.warn("{} gobbler IOException: {}. Typically happens when cancelling a job.", caller, i.getMessage());

View File

@@ -4,6 +4,8 @@
package io.airbyte.commons.logging;
import com.google.common.annotations.VisibleForTesting;
public class LoggingHelper {
public enum Color {
@@ -31,7 +33,8 @@ public class LoggingHelper {
public static final String LOG_SOURCE_MDC_KEY = "log_source";
private static final String RESET = "\u001B[0m";
@VisibleForTesting
public static final String RESET = "\u001B[0m";
public static String applyColor(final Color color, final String msg) {
return color.getCode() + msg + RESET;

View File

@@ -4,7 +4,10 @@
package io.airbyte.commons.logging;
import io.airbyte.commons.logging.LoggingHelper.Color;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.MDC;
/**
@@ -25,6 +28,8 @@ import org.slf4j.MDC;
*/
public class MdcScope implements AutoCloseable {
public final static MdcScope DEFAULT = new Builder().build();
private final Map<String, String> originalContextMap;
public MdcScope(final Map<String, String> keyValuesToAdd) {
@@ -35,8 +40,41 @@ public class MdcScope implements AutoCloseable {
}
@Override
public void close() throws Exception {
public void close() {
MDC.setContextMap(originalContextMap);
}
public static class Builder {
private Optional<String> maybeLogPrefix = Optional.empty();
private Optional<Color> maybePrefixColor = Optional.empty();
public Builder setLogPrefix(final String logPrefix) {
this.maybeLogPrefix = Optional.ofNullable(logPrefix);
return this;
}
public Builder setPrefixColor(final Color color) {
this.maybePrefixColor = Optional.ofNullable(color);
return this;
}
public MdcScope build() {
final Map<String, String> extraMdcEntries = new HashMap<>();
maybeLogPrefix.stream().forEach(logPrefix -> {
final String potentiallyColoredLog = maybePrefixColor
.map(color -> LoggingHelper.applyColor(color, logPrefix))
.orElse(logPrefix);
extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog);
});
return new MdcScope(extraMdcEntries);
}
}
}

View File

@@ -54,17 +54,14 @@ public class MdcScopeTest {
});
} catch (final Exception e) {
e.printStackTrace();
}
}
@Test
@DisplayName("The MDC context is properly restored")
public void testMDCRestore() {
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {} catch (final Exception e) {
e.printStackTrace();
}
try (final MdcScope mdcScope = new MdcScope(modificationInMDC)) {}
final Map<String, String> mdcState = MDC.getCopyOfContextMap();
Assertions.assertThat(mdcState).containsAllEntriesOf(originalMap);

View File

@@ -8,6 +8,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
@@ -28,6 +31,10 @@ public class DbtTransformationRunner implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("dbt")
.setPrefixColor(Color.CYAN)
.build();
private final ProcessFactory processFactory;
private final NormalizationRunner normalizationRunner;
@@ -48,7 +55,7 @@ public class DbtTransformationRunner implements AutoCloseable {
* transform-config scripts (to translate Airbyte Catalogs into Dbt profiles file). Thus, we depend
* on the NormalizationRunner to configure the dbt project with the appropriate destination settings
* and pull the custom git repository into the workspace.
*
* <p>
* Once the workspace folder/files is setup to run, we invoke the custom transformation command as
* provided by the user to execute whatever extra transformation has been implemented.
*/
@@ -87,8 +94,8 @@ public class DbtTransformationRunner implements AutoCloseable {
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
dbtArguments);
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC);
WorkerUtils.wait(process);

View File

@@ -10,6 +10,9 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -27,6 +30,10 @@ import org.slf4j.LoggerFactory;
public class DefaultNormalizationRunner implements NormalizationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN)
.build();
private final DestinationType destinationType;
private final ProcessFactory processFactory;
@@ -109,8 +116,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
process = processFactory.create(jobId, attempt, jobRoot, normalizationImageName, false, files, null, resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args);
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC);
WorkerUtils.wait(process);

View File

@@ -9,6 +9,9 @@ import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
@@ -30,6 +33,10 @@ import org.slf4j.LoggerFactory;
public class DefaultAirbyteDestination implements AirbyteDestination {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("destination")
.setPrefixColor(Color.MAGENTA)
.build();
private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
@@ -41,7 +48,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
private Iterator<AirbyteMessage> messageIterator = null;
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory());
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC));
}
@@ -63,7 +70,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME,
Jsons.serialize(destinationConfig.getCatalog()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination");
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination", CONTAINER_LOG_MDC);
writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8));

View File

@@ -9,6 +9,9 @@ import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
@@ -35,6 +38,11 @@ public class DefaultAirbyteSource implements AirbyteSource {
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS);
private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);
private static final MdcScope CONTAINER_LOG_MDC = new Builder()
.setLogPrefix("source")
.setPrefixColor(Color.BLUE)
.build();
private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final HeartbeatMonitor heartbeatMonitor;
@@ -43,7 +51,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private Iterator<AirbyteMessage> messageIterator = null;
public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC), new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
}
@VisibleForTesting
@@ -67,7 +75,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
sourceConfig.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME,
sourceConfig.getState() == null ? null : Jsons.serialize(sourceConfig.getState().getState()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source");
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source", CONTAINER_LOG_MDC);
messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())

View File

@@ -6,6 +6,8 @@ package io.airbyte.workers.protocols.airbyte;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
@@ -28,16 +30,22 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteStreamFactory.class);
private final MdcScope containerLogMDC;
private final AirbyteProtocolPredicate protocolValidator;
private final Logger logger;
public DefaultAirbyteStreamFactory() {
this(new AirbyteProtocolPredicate(), LOGGER);
this(new Builder().build());
}
DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger) {
public DefaultAirbyteStreamFactory(final MdcScope containerLogMDC) {
this(new AirbyteProtocolPredicate(), LOGGER, containerLogMDC);
}
DefaultAirbyteStreamFactory(final AirbyteProtocolPredicate protocolPredicate, final Logger logger, final MdcScope containerLogMDC) {
protocolValidator = protocolPredicate;
this.logger = logger;
this.containerLogMDC = containerLogMDC;
}
@Override
@@ -50,7 +58,9 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {
// we log as info all the lines that are not valid json
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
logger.info(line);
try (containerLogMDC) {
logger.info(line);
}
}
return jsonLine.stream();
})
@@ -73,7 +83,9 @@ public class DefaultAirbyteStreamFactory implements AirbyteStreamFactory {
.filter(airbyteMessage -> {
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
internalLog(airbyteMessage.getLog());
try (containerLogMDC) {
internalLog(airbyteMessage.getLog());
}
}
return !isLog;
});

View File

@@ -4,7 +4,9 @@
package io.airbyte.workers.normalization;
import static org.junit.jupiter.api.Assertions.*;
import static io.airbyte.commons.logging.LoggingHelper.RESET;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -12,7 +14,10 @@ import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
@@ -25,6 +30,8 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -33,6 +40,17 @@ class DefaultNormalizationRunnerTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;
private static Path logJobRoot;
static {
try {
logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
LogClientSingleton.setJobMdc(logJobRoot);
} catch (final IOException e) {
e.printStackTrace();
}
}
private Path jobRoot;
private ProcessFactory processFactory;
private Process process;
@@ -64,6 +82,16 @@ class DefaultNormalizationRunnerTest {
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("hello".getBytes()));
}
@AfterEach
public void tearDown() throws IOException {
// The log file needs to be present and empty
final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
if (Files.exists(logFile)) {
Files.delete(logFile);
}
Files.createFile(logFile);
}
@Test
void test() throws Exception {
final NormalizationRunner runner =
@@ -74,6 +102,27 @@ class DefaultNormalizationRunnerTest {
assertTrue(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS));
}
@Test
void testLog() throws Exception {
final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
when(process.exitValue()).thenReturn(0);
assertTrue(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS));
final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
final Stream<String> logs = IOs.readFile(logPath).lines();
logs
.filter(line -> !line.contains("EnvConfigs(getEnvOrDefault)"))
.forEach(line -> {
org.assertj.core.api.Assertions.assertThat(line)
.startsWith(Color.GREEN.getCode() + "normalization" + RESET);
});
}
@Test
public void testClose() throws Exception {
when(process.isAlive()).thenReturn(true).thenReturn(false);

View File

@@ -4,6 +4,7 @@
package io.airbyte.workers.protocols.airbyte;
import static io.airbyte.commons.logging.LoggingHelper.RESET;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -14,8 +15,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.TestConfigHelpers;
import io.airbyte.workers.WorkerConstants;
@@ -31,6 +35,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -49,6 +55,17 @@ class DefaultAirbyteDestinationTest {
AirbyteMessageUtils.createStateMessage("checkpoint", "1"),
AirbyteMessageUtils.createStateMessage("checkpoint", "2"));
private static Path logJobRoot;
static {
try {
logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
LogClientSingleton.setJobMdc(logJobRoot);
} catch (final IOException e) {
e.printStackTrace();
}
}
private Path jobRoot;
private IntegrationLauncher integrationLauncher;
private Process process;
@@ -81,6 +98,16 @@ class DefaultAirbyteDestinationTest {
streamFactory = noop -> MESSAGES.stream();
}
@AfterEach
public void tearDown() throws IOException {
// The log file needs to be present and empty
final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
if (Files.exists(logFile)) {
Files.delete(logFile);
}
Files.createFile(logFile);
}
@SuppressWarnings("BusyWait")
@Test
public void testSuccessfulLifecycle() throws Exception {
@@ -120,6 +147,35 @@ class DefaultAirbyteDestinationTest {
verify(process).exitValue();
}
@Test
public void testTaggedLogs() throws Exception {
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher, streamFactory);
destination.start(DESTINATION_CONFIG, jobRoot);
final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue");
destination.accept(recordMessage);
final List<AirbyteMessage> messages = Lists.newArrayList();
messages.add(destination.attemptRead().get());
messages.add(destination.attemptRead().get());
when(process.isAlive()).thenReturn(false);
destination.notifyEndOfStream();
destination.close();
final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
final Stream<String> logs = IOs.readFile(logPath).lines();
logs.forEach(line -> {
org.assertj.core.api.Assertions.assertThat(line)
.startsWith(Color.MAGENTA.getCode() + "destination" + RESET);
});
}
@Test
public void testCloseNotifiesLifecycle() throws Exception {
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);

View File

@@ -4,6 +4,7 @@
package io.airbyte.workers.protocols.airbyte;
import static io.airbyte.commons.logging.LoggingHelper.RESET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -16,9 +17,12 @@ import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.config.State;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
@@ -36,6 +40,8 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -65,6 +71,17 @@ class DefaultAirbyteSourceTest {
AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"),
AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "yellow"));
private static Path logJobRoot;
static {
try {
logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
LogClientSingleton.setJobMdc(logJobRoot);
} catch (final IOException e) {
e.printStackTrace();
}
}
private Path jobRoot;
private IntegrationLauncher integrationLauncher;
private Process process;
@@ -92,11 +109,25 @@ class DefaultAirbyteSourceTest {
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8)));
streamFactory = noop -> MESSAGES.stream();
LogClientSingleton.setJobMdc(logJobRoot);
}
@AfterEach
public void tearDown() throws IOException {
// The log file needs to be present and empty
final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
if (Files.exists(logFile)) {
Files.delete(logFile);
}
Files.createFile(logFile);
}
@SuppressWarnings({"OptionalGetWithoutIsPresent", "BusyWait"})
@Test
public void testSuccessfulLifecycle() throws Exception {
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8)));
when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
@@ -127,6 +158,37 @@ class DefaultAirbyteSourceTest {
verify(process).exitValue();
}
@Test
public void testTaggedLogs() throws Exception {
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(("rewq").getBytes(StandardCharsets.UTF_8)));
when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory,
heartbeatMonitor);
source.start(SOURCE_CONFIG, jobRoot);
final List<AirbyteMessage> messages = Lists.newArrayList();
messages.add(source.attemptRead().get());
messages.add(source.attemptRead().get());
when(process.isAlive()).thenReturn(false);
source.close();
final Path logPath = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME);
final Stream<String> logs = IOs.readFile(logPath).lines();
logs
.filter(line -> !line.contains("EnvConfigs(getEnvOrDefault)"))
.forEach(line -> {
org.assertj.core.api.Assertions.assertThat(line)
.startsWith(Color.BLUE.getCode() + "source" + RESET);
});
}
@Test
public void testNonzeroExitCodeThrows() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);

View File

@@ -14,6 +14,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
@@ -120,7 +121,7 @@ class DefaultAirbyteStreamFactoryTest {
private Stream<AirbyteMessage> stringToMessageStream(final String inputString) {
final InputStream inputStream = new ByteArrayInputStream(inputString.getBytes());
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
return new DefaultAirbyteStreamFactory(protocolPredicate, logger).create(bufferedReader);
return new DefaultAirbyteStreamFactory(protocolPredicate, logger, new Builder().build()).create(bufferedReader);
}
}