1
0
mirror of synced 2025-12-19 18:14:56 -05:00

java-CDK fixes to the Junit interceptor (#35827)

This commit is contained in:
Stephane Geneix
2024-03-05 17:12:26 -08:00
committed by GitHub
parent d5d340ea00
commit b67d16d730
5 changed files with 101 additions and 21 deletions

View File

@@ -166,8 +166,9 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.14 | 2024-03-05 | [\#35827](https://github.com/airbytehq/airbyte/pull/35827) | improving the Junit interceptor. |
| 0.23.13 | 2024-03-04 | [\#35774](https://github.com/airbytehq/airbyte/pull/35774) | minor changes to the CDK test fixtures. |
| 0.23.15 | 2024-03-05 | [\#35827](https://github.com/airbytehq/airbyte/pull/35827) | improving the Junit interceptor. |
| 0.23.14 | 2024-03-05 | [\#35739](https://github.com/airbytehq/airbyte/pull/35739) | Add logging to the CDC queue size. Fix the ContainerFactory. |
| 0.23.13 | 2024-03-04 | [\#35774](https://github.com/airbytehq/airbyte/pull/35774) | minor changes to the CDK test fixtures. |
| 0.23.12 | 2024-03-01 | [\#35767](https://github.com/airbytehq/airbyte/pull/35767) | introducing a timeout for java tests. |
| 0.23.11 | 2024-03-01 | [\#35313](https://github.com/airbytehq/airbyte/pull/35313) | Preserve timezone offset in CSV writer for destinations |
| 0.23.10 | 2024-03-01 | [\#35303](https://github.com/airbytehq/airbyte/pull/35303) | Migration framework with DestinationState for softReset |

View File

@@ -1 +1 @@
version=0.23.14
version=0.23.15

View File

@@ -4,6 +4,16 @@
package io.airbyte.cdk.extensions;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import static java.util.regex.Pattern.UNICODE_CASE;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -11,19 +21,23 @@ import java.lang.reflect.Modifier;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Timeout.ThreadMode;
import org.junit.jupiter.api.extension.DynamicTestInvocationContext;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.InvocationInterceptor;
@@ -41,13 +55,11 @@ import org.slf4j.LoggerFactory;
*/
public class LoggingInvocationInterceptor implements InvocationInterceptor {
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(5);
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingInvocationInterceptor.class);
private static final String JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME = "JunitMethodExecutionTimeout";
private static final class LoggingInvocationInterceptorHandler implements InvocationHandler {
private static final Map<Integer, ExecutorService> executorByThread = new ConcurrentHashMap<>();
private static final Pattern methodPattern = Pattern.compile("intercept(.*)Method");
@Override
@@ -76,22 +88,33 @@ public class LoggingInvocationInterceptor implements InvocationInterceptor {
} else {
logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName);
}
Thread currentThread = Thread.currentThread();
TimeoutInteruptor timeoutTask = new TimeoutInteruptor(currentThread);
Instant start = Instant.now();
try {
final Object retVal;
Duration timeout = getTimeout(invocationContext);
if (timeout != null) {
LOGGER.info("Junit starting {} with timeout of {}", logLineSuffix, DurationFormatUtils.formatDurationWords(timeout.toMillis(), true, true));
retVal = Assertions.assertTimeoutPreemptively(timeout, invocation::proceed);
new Timer("TimeoutTimer-" + currentThread.getName(), true).schedule(timeoutTask, timeout.toMillis());
} else {
LOGGER.warn("Junit starting {} with no timeout", logLineSuffix);
retVal = invocation.proceed();
}
retVal = invocation.proceed();
long elapsedMs = Duration.between(start, Instant.now()).toMillis();
LOGGER.info("Junit completed {} in {}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true));
return retVal;
} catch (Throwable t) {
timeoutTask.cancel();
long elapsedMs = Duration.between(start, Instant.now()).toMillis();
if (timeoutTask.wasTriggered) {
Throwable t1 = t;
t = new TimeoutException(
"Execution was cancelled after %s. If you think your test should be given more time to complete, you can use the @Timeout annotation. If all the test of a connector are slow, "
+ " you can override the property 'JunitMethodExecutionTimeout' in your gradle.properties."
.formatted(DurationFormatUtils.formatDurationWords(elapsedMs, true, true)));
t.initCause(t1);
}
boolean belowCurrentCall = false;
List<String> stackToDisplay = new LinkedList<>();
for (String stackString : ExceptionUtils.getStackFrames(t)) {
@@ -110,25 +133,74 @@ public class LoggingInvocationInterceptor implements InvocationInterceptor {
LOGGER.error("Junit exception throw during {} after {}:\n{}", logLineSuffix, DurationFormatUtils.formatDurationWords(elapsedMs, true, true),
stackTrace);
throw t;
} finally {
timeoutTask.cancel();
}
}
private static class TimeoutInteruptor extends TimerTask {
private final Thread parentThread;
volatile boolean wasTriggered = false;
TimeoutInteruptor(Thread parentThread) {
this.parentThread = parentThread;
}
@Override
public void run() {
wasTriggered = true;
parentThread.interrupt();
}
public boolean cancel() {
return super.cancel();
}
}
private static final Pattern PATTERN = Pattern.compile("([1-9]\\d*) *((?:[nμm]?s)|m|h|d)?",
CASE_INSENSITIVE | UNICODE_CASE);
private static final Map<String, TimeUnit> UNITS_BY_ABBREVIATION;
static {
Map<String, TimeUnit> unitsByAbbreviation = new HashMap<>();
unitsByAbbreviation.put("ns", NANOSECONDS);
unitsByAbbreviation.put("μs", MICROSECONDS);
unitsByAbbreviation.put("ms", MILLISECONDS);
unitsByAbbreviation.put("s", SECONDS);
unitsByAbbreviation.put("m", MINUTES);
unitsByAbbreviation.put("h", HOURS);
unitsByAbbreviation.put("d", DAYS);
UNITS_BY_ABBREVIATION = Collections.unmodifiableMap(unitsByAbbreviation);
}
static Duration parseDuration(String text) throws DateTimeParseException {
Matcher matcher = PATTERN.matcher(text.trim());
if (matcher.matches()) {
long value = Long.parseLong(matcher.group(1));
String unitAbbreviation = matcher.group(2);
TimeUnit unit = unitAbbreviation == null ? SECONDS
: UNITS_BY_ABBREVIATION.get(unitAbbreviation.toLowerCase(Locale.ENGLISH));
return Duration.ofSeconds(unit.toSeconds(value));
}
throw new DateTimeParseException("Timeout duration is not in the expected format (<number> [ns|μs|ms|s|m|h|d])",
text, 0);
}
private static Duration getTimeout(ReflectiveInvocationContext<Method> invocationContext) {
Duration timeout = DEFAULT_TIMEOUT;
Duration timeout = null;
if (invocationContext.getExecutable()instanceof Method m) {
Timeout timeoutAnnotation = m.getAnnotation(Timeout.class);
if (timeoutAnnotation == null) {
timeoutAnnotation = invocationContext.getTargetClass().getAnnotation(Timeout.class);
}
if (timeoutAnnotation != null) {
if (timeoutAnnotation.threadMode() == ThreadMode.SAME_THREAD) {
return null;
}
timeout = Duration.ofMillis(timeoutAnnotation.unit().toMillis(timeoutAnnotation.value()));
}
}
if (timeout.compareTo(Duration.ofHours(1)) > 0) {
return DEFAULT_TIMEOUT;
if (timeout == null) {
timeout = parseDuration(System.getProperty(JUNIT_METHOD_EXECUTION_TIMEOUT_PROPERTY_NAME));
}
return timeout;
}

View File

@@ -41,16 +41,16 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.csv.CSVFormat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Timeout.ThreadMode;
import org.mockito.MockedConstruction;
@Timeout(value = 1,
threadMode = ThreadMode.SAME_THREAD)
@Timeout(value = 90,
unit = TimeUnit.SECONDS)
class S3CsvWriterTest {
public static final ConfiguredAirbyteStream CONFIGURED_STREAM = new ConfiguredAirbyteStream()

View File

@@ -125,6 +125,13 @@ allprojects {
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
}
}
String junitMethodExecutionTimeout
if (project.hasProperty('JunitMethodExecutionTimeout')) {
junitMethodExecutionTimeout = project.property('JunitMethodExecutionTimeout').toString()
} else {
junitMethodExecutionTimeout = '1 m'
}
systemProperty 'JunitMethodExecutionTimeout', junitMethodExecutionTimeout
}
dependencies {