test: unflaky FileChangedEventListener and PluginDefaultServiceTest, debug log on JdbcServiceLivenessCoordinatorTest

* test: parallelize AbstractRunnerTest

* test: add TestsUtils.randomTenant(..) function

* test: i think i found a bug

* revert debug

* test: add comment on potential bug, make test pass

* test: fix test metadata

* test: unflaky PluginDefaultServiceTest by separating class

* test: add log on JdbcServiceLivenessCoordinatorTest to debug

* test: cleanup debug log

* fix
This commit is contained in:
Roman Acevedo
2025-09-15 17:07:37 +02:00
committed by GitHub
parent 7feb571fb3
commit 1791127acb
7 changed files with 158 additions and 81 deletions

View File

@@ -40,5 +40,6 @@ dependencies {
implementation project(":worker")
//test
testImplementation project(':tests')
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -262,6 +262,8 @@ public class FileChangedEventListener {
}
private String getTenantIdFromPath(Path path) {
// FIXME there is probably a bug here when a tenant has '_' in its name,
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -4,11 +4,11 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.*;
import org.junitpioneer.jupiter.RetryingTest;
import java.io.IOException;
import java.nio.file.Files;
@@ -19,7 +19,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -57,10 +56,11 @@ class FileChangedEventListenerTest {
}
}
@RetryingTest(5) // Flaky on CI but always pass locally
@Test
void test() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
// remove the flow if it already exists
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
// create a basic flow
String flow = """
@@ -73,14 +73,14 @@ class FileChangedEventListenerTest {
message: Hello World! 🚀
""";
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -88,16 +88,17 @@ class FileChangedEventListenerTest {
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
}
@RetryingTest(5) // Flaky on CI but always pass locally
@Test
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
// create a flow with plugin default
String pluginDefault = """
@@ -113,14 +114,14 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
@@ -128,7 +129,7 @@ class FileChangedEventListenerTest {
// delete both files
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);

View File

@@ -0,0 +1,79 @@
package io.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.services.PluginDefaultServiceTest.DefaultPrecedenceTester;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@Slf4j
@KestraTest
class PluginDefaultServiceOverrideTest {
@Inject
private PluginDefaultService pluginDefaultService;
@org.junit.jupiter.api.parallel.Execution(ExecutionMode.SAME_THREAD)
@ParameterizedTest
@MethodSource
void flowDefaultsOverrideGlobalDefaults(boolean flowDefaultForced, boolean globalDefaultForced, String fooValue, String barValue, String bazValue) throws FlowProcessingException {
final DefaultPrecedenceTester task = DefaultPrecedenceTester.builder()
.id("test")
.type(DefaultPrecedenceTester.class.getName())
.propBaz("taskValue")
.build();
final PluginDefault flowDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), flowDefaultForced, ImmutableMap.of(
"propBar", "flowValue",
"propBaz", "flowValue"
));
final PluginDefault globalDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), globalDefaultForced, ImmutableMap.of(
"propFoo", "globalValue",
"propBar", "globalValue",
"propBaz", "globalValue"
));
var tenant = TestsUtils.randomTenant(PluginDefaultServiceOverrideTest.class.getSimpleName());
final Flow flowWithPluginDefault = Flow.builder()
.tenantId(tenant)
.tasks(Collections.singletonList(task))
.pluginDefaults(List.of(flowDefault))
.build();
final PluginGlobalDefaultConfiguration pluginGlobalDefaultConfiguration = new PluginGlobalDefaultConfiguration();
pluginGlobalDefaultConfiguration.defaults = List.of(globalDefault);
var previousGlobalDefault = pluginDefaultService.pluginGlobalDefault;
pluginDefaultService.pluginGlobalDefault = pluginGlobalDefaultConfiguration;
final Flow injected = pluginDefaultService.injectAllDefaults(flowWithPluginDefault, true);
pluginDefaultService.pluginGlobalDefault = previousGlobalDefault;
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropFoo(), is(fooValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBar(), is(barValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBaz(), is(bazValue));
}
private static Stream<Arguments> flowDefaultsOverrideGlobalDefaults() {
return Stream.of(
Arguments.of(false, false, "globalValue", "flowValue", "taskValue"),
Arguments.of(false, true, "globalValue", "globalValue", "globalValue"),
Arguments.of(true, false, "globalValue", "flowValue", "flowValue"),
Arguments.of(true, true, "globalValue", "flowValue", "flowValue")
);
}
}

View File

@@ -1,12 +1,11 @@
package io.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
@@ -19,6 +18,7 @@ import io.kestra.core.models.triggers.PollingTriggerInterface;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.TriggerOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.condition.Expression;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
@@ -31,20 +31,13 @@ import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
@@ -72,7 +65,8 @@ class PluginDefaultServiceTest {
@Test
void shouldInjectGivenFlowWithNullSource() throws FlowProcessingException {
// Given
FlowInterface flow = GenericFlow.fromYaml(MAIN_TENANT, TEST_LOG_FLOW_SOURCE);
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
FlowInterface flow = GenericFlow.fromYaml(tenant, TEST_LOG_FLOW_SOURCE);
// When
FlowWithSource result = pluginDefaultService.injectAllDefaults(flow, true);
@@ -132,56 +126,8 @@ class PluginDefaultServiceTest {
), result);
}
@org.junit.jupiter.api.parallel.Execution(ExecutionMode.SAME_THREAD)
@ParameterizedTest
@MethodSource
void flowDefaultsOverrideGlobalDefaults(boolean flowDefaultForced, boolean globalDefaultForced, String fooValue, String barValue, String bazValue) throws FlowProcessingException {
final DefaultPrecedenceTester task = DefaultPrecedenceTester.builder()
.id("test")
.type(DefaultPrecedenceTester.class.getName())
.propBaz("taskValue")
.build();
final PluginDefault flowDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), flowDefaultForced, ImmutableMap.of(
"propBar", "flowValue",
"propBaz", "flowValue"
));
final PluginDefault globalDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), globalDefaultForced, ImmutableMap.of(
"propFoo", "globalValue",
"propBar", "globalValue",
"propBaz", "globalValue"
));
final Flow flowWithPluginDefault = Flow.builder()
.tasks(Collections.singletonList(task))
.pluginDefaults(List.of(flowDefault))
.build();
final PluginGlobalDefaultConfiguration pluginGlobalDefaultConfiguration = new PluginGlobalDefaultConfiguration();
pluginGlobalDefaultConfiguration.defaults = List.of(globalDefault);
var previousGlobalDefault = pluginDefaultService.pluginGlobalDefault;
pluginDefaultService.pluginGlobalDefault = pluginGlobalDefaultConfiguration;
final Flow injected = pluginDefaultService.injectAllDefaults(flowWithPluginDefault, true);
pluginDefaultService.pluginGlobalDefault = previousGlobalDefault;
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropFoo(), is(fooValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBar(), is(barValue));
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBaz(), is(bazValue));
}
private static Stream<Arguments> flowDefaultsOverrideGlobalDefaults() {
return Stream.of(
Arguments.of(false, false, "globalValue", "flowValue", "taskValue"),
Arguments.of(false, true, "globalValue", "globalValue", "globalValue"),
Arguments.of(true, false, "globalValue", "flowValue", "flowValue"),
Arguments.of(true, true, "globalValue", "flowValue", "flowValue")
);
}
@Test
public void injectFlowAndGlobals() throws FlowProcessingException {
public void injectFlowAndGlobals() throws FlowProcessingException, JsonProcessingException {
String source = String.format("""
id: default-test
namespace: io.kestra.tests
@@ -217,8 +163,8 @@ class PluginDefaultServiceTest {
DefaultTriggerTester.class.getName(),
Expression.class.getName()
);
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(null, source, false);
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(tenant, source, false);
assertThat(((DefaultTester) injected.getTasks().getFirst()).getValue(), is(1));
assertThat(((DefaultTester) injected.getTasks().getFirst()).getSet(), is(666));
@@ -263,7 +209,8 @@ class PluginDefaultServiceTest {
""";
// When
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(null, source, false);
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(tenant, source, false);
// Then
assertThat(((DefaultTester) injected.getTasks().getFirst()).getSet(), is(2));
@@ -301,7 +248,8 @@ class PluginDefaultServiceTest {
""";
// When
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(null, source, false);
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
FlowWithSource injected = pluginDefaultService.parseFlowWithAllDefaults(tenant, source, false);
// Then
assertThat(((DefaultTester) injected.getTasks().getFirst()).getSet(), is(666));
@@ -311,7 +259,8 @@ class PluginDefaultServiceTest {
@Test
void shouldInjectFlowDefaultsGivenAlias() throws FlowProcessingException {
// Given
GenericFlow flow = GenericFlow.fromYaml(MAIN_TENANT, """
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
GenericFlow flow = GenericFlow.fromYaml(tenant, """
id: default-test
namespace: io.kestra.tests
@@ -335,7 +284,8 @@ class PluginDefaultServiceTest {
@Test
void shouldInjectFlowDefaultsGivenType() throws FlowProcessingException {
GenericFlow flow = GenericFlow.fromYaml(MAIN_TENANT, """
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
GenericFlow flow = GenericFlow.fromYaml(tenant, """
id: default-test
namespace: io.kestra.tests
@@ -358,7 +308,8 @@ class PluginDefaultServiceTest {
@Test
public void shouldNotInjectDefaultsGivenExistingTaskValue() throws FlowProcessingException {
// Given
GenericFlow flow = GenericFlow.fromYaml(MAIN_TENANT, """
var tenant = TestsUtils.randomTenant(PluginDefaultServiceTest.class.getSimpleName());
GenericFlow flow = GenericFlow.fromYaml(tenant, """
id: default-test
namespace: io.kestra.tests

View File

@@ -34,6 +34,7 @@ import org.junit.jupiter.api.TestInstance;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -142,7 +143,9 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
worker.run();
var workerTaskResultQueueAppendLog = new ArrayList<WorkerTaskResult>();// to debug flaky test
Flux<WorkerTaskResult> receive = TestsUtils.receive(workerTaskResultQueue, either -> {
workerTaskResultQueueAppendLog.add(either.getLeft());
if (either.getLeft().getTaskRun().getState().getCurrent() == Type.SUCCESS) {
resubmitLatch.countDown();
}
@@ -161,7 +164,9 @@ public abstract class JdbcServiceLivenessCoordinatorTest {
Worker newWorker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 1, "workerGroupKey");
newWorker.run();
boolean resubmitLatchAwait = resubmitLatch.await(10, TimeUnit.SECONDS);
assertThat(resubmitLatchAwait).isTrue();
assertThat(resubmitLatchAwait)
.withFailMessage(() -> "shouldReEmitTasksToTheSameWorkerGroup: resubmitLatchAwait was not OK, workerTaskResultQueue content: " + TestsUtils.stringify(workerTaskResultQueueAppendLog))
.isTrue();
WorkerTaskResult workerTaskResult = receive.blockLast();
assertThat(workerTaskResult).isNotNull();
assertThat(workerTaskResult.getTaskRun().getState().getCurrent()).isEqualTo(Type.SUCCESS);

View File

@@ -21,6 +21,7 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.JacksonMapper;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.File;
@@ -28,6 +29,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
@@ -37,12 +39,39 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Slf4j
abstract public class TestsUtils {
private static final ObjectMapper mapper = JacksonMapper.ofYaml();
/**
* there is at least one bug in {@link io.kestra.cli.services.FileChangedEventListener#getTenantIdFromPath(Path)} forbidding use to use '_' character
* @param prefix
* @return
*/
public static String randomTenant(String... prefix) {
var list = List.of(prefix);
if (list.isEmpty()) {
throw new IllegalArgumentException("tenant prefix must not be empty");
}
var tenantRegex = "^[a-z0-9][a-z0-9_-]*";
var validTenantPrefixes = list.stream()
.map(s -> s.replace(".", "-"))
.map(String::toLowerCase)
.peek(p -> {
if (!p.matches(tenantRegex)) {
throw new IllegalArgumentException("random tenant prefix %s should match tenant regex %s".formatted(p, tenantRegex));
}
}).toList();
String[] parts = Stream
.concat(validTenantPrefixes.stream(), Stream.of(IdUtils.create().toLowerCase()))
.toArray(String[]::new);
return IdUtils.fromPartsAndSeparator('-',parts);
}
public static <T> T map(String path, Class<T> cls) throws IOException {
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
@@ -231,4 +260,13 @@ abstract public class TestsUtils {
public static <T> Property<List<T>> propertyFromList(List<T> list) throws JsonProcessingException {
return Property.ofExpression(JacksonMapper.ofJson().writeValueAsString(list));
}
public static String stringify(Object object) {
try {
return JacksonMapper.ofJson().writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("failed to serialize object to json string", e);
return object !=null ? object.toString() : "null";
}
}
}