Compare commits

...

4 Commits

Author SHA1 Message Date
Roman Acevedo
3d0916093e feat(system): add Unit test model and taskfixture 2025-04-28 15:31:00 +02:00
Roman Acevedo
22369b67af Merge branch 'develop' into feat/test 2025-04-22 17:27:13 +02:00
Roman Acevedo
18fbd528dc Merge branch 'develop' into feat/test 2025-04-17 15:47:54 +02:00
Loïc Mathieu
8ee4b2de90 feat(*): Flow tests 2025-03-17 11:22:29 +01:00
12 changed files with 414 additions and 8 deletions

View File

@@ -20,6 +20,7 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String REPLAY = SYSTEM_PREFIX + "replay";
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test";
/**
* Static helper method for converting a list of labels to a nested map.

View File

@@ -22,6 +22,7 @@ import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
@@ -112,6 +113,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@Setter
String traceParent;
@With
@Nullable
List<TaskFixture> fixtures;
/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
@@ -210,7 +215,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.deleted,
this.metadata,
this.scheduleDate,
this.traceParent
this.traceParent,
this.fixtures
);
}
@@ -234,7 +240,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.deleted,
this.metadata,
this.scheduleDate,
this.traceParent
this.traceParent,
this.fixtures
);
}
@@ -271,7 +278,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.deleted,
this.metadata,
this.scheduleDate,
this.traceParent
this.traceParent,
this.fixtures
);
}
@@ -295,7 +303,8 @@ public class Execution implements DeletedInterface, TenantInterface {
this.deleted,
this.metadata,
this.scheduleDate,
this.traceParent
this.traceParent,
this.fixtures
);
}
@@ -728,6 +737,16 @@ public class Execution implements DeletedInterface, TenantInterface {
);
}
public Optional<TaskFixture> getFixtureForTaskRun(TaskRun taskRun) {
if (this.fixtures == null) {
return Optional.empty();
}
return this.fixtures.stream()
.filter(fixture -> Objects.equals(fixture.getId(), taskRun.getTaskId()) && Objects.equals(fixture.getValue(), taskRun.getValue()))
.findFirst();
}
/**
* Create a new attempt for failed worker execution
*

View File

@@ -15,6 +15,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.services.*;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.trace.propagation.RunContextTextMapSetter;
import io.kestra.core.utils.ListUtils;
@@ -770,7 +771,7 @@ public class ExecutorService {
Map<Boolean, List<WorkerTask>> workerTasks = executor.getExecution()
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
.filter(taskRun -> taskRun.getState().getCurrent().isCreated() && executor.getExecution().getFixtureForTaskRun(taskRun).isEmpty())
.map(throwFunction(taskRun -> {
Task task = executor.getFlow().findTaskByTaskId(taskRun.getTaskId());
RunContext runContext = runContextFactory.of(executor.getFlow(), task, executor.getExecution(), taskRun);
@@ -826,7 +827,31 @@ public class ExecutorService {
)
.collect(Collectors.groupingBy(workerTask -> workerTask.getTaskRun().getState().isFailed() || workerTask.getTaskRun().getState().getCurrent() == State.Type.CANCELLED));
if (workerTasks.isEmpty()) {
// mock WorkerTaskResult for mocked execution
// submit TaskRun when receiving created, must be done after the state execution store
boolean hasMockedWorkerTask = false;
record FixtureAndTaskRun(TaskFixture fixture, TaskRun taskRun) {}
if (executor.getExecution().getFixtures() != null) {
List<WorkerTaskResult> workerTaskResults = executor.getExecution()
.getTaskRunList()
.stream()
.filter(taskRun -> taskRun.getState().getCurrent().isCreated())
.flatMap(taskRun -> executor.getExecution().getFixtureForTaskRun(taskRun).stream().map(fixture -> new FixtureAndTaskRun(fixture, taskRun)))
.map(fixtureAndTaskRun -> WorkerTaskResult.builder()
.taskRun(fixtureAndTaskRun.taskRun()
.withState(Optional.ofNullable(fixtureAndTaskRun.fixture().getState()).orElse(State.Type.SUCCESS))
.withOutputs(variablesService.of(StorageContext.forTask(fixtureAndTaskRun.taskRun), fixtureAndTaskRun.fixture().getOutputs()))
)
.build()
)
.toList();
hasMockedWorkerTask = !workerTaskResults.isEmpty();
this.addWorkerTaskResults(executor, executor.getFlow(), workerTaskResults);
}
if (workerTasks.isEmpty() || hasMockedWorkerTask) {
return executor;
}

View File

@@ -85,6 +85,10 @@ public class RunnerUtils {
Execution execution = Execution.newExecution(flow, inputs, labels, Optional.empty());
return runOne(execution, flow, duration);
}
public Execution runOne(Execution execution, Flow flow, Duration duration) throws TimeoutException, QueueException {
return this.awaitExecution(isTerminatedExecution(execution, flow), throwRunnable(() -> {
this.executionQueue.emit(execution);
}), duration);

View File

@@ -0,0 +1,73 @@
package io.kestra.core.test;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasSource;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.test.flow.UnitTest;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TestSuite implements HasUID, TenantInterface, DeletedInterface, HasSource {
@NotNull
private String id;
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
private String description;
@NotNull
private String namespace;
@NotNull
private String flowId;
private String source;
@NotNull
@NotEmpty
private List<UnitTest> testCases;
@JsonProperty("deleted")
boolean isDeleted = Boolean.FALSE;
@Builder.Default
private Boolean disabled = Boolean.FALSE;
@Override
@JsonIgnore
public String uid() {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public TestSuite delete() {
return this.toBuilder().isDeleted(true).build();
}
@Override
public String source() {
return this.getSource();
}
}

View File

@@ -0,0 +1,35 @@
package io.kestra.core.test.flow;
import io.kestra.core.models.property.Property;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import java.util.List;
@Getter
@Builder
public class Assertion {
@NotNull
private Property<Object> value;
private String taskId;
private Property<String> errorMessage;
private Property<String> description;
private Property<String> endsWith;
private Property<String> startsWith;
private Property<String> contains;
private Property<String> equalsTo;
private Property<String> notEqualsTo;
private Property<Double> greaterThan;
private Property<Double> greaterThanOrEqualTo;
private Property<Double> lessThan;
private Property<Double> lesThanOrEqualTo;
private Property<List<String>> in;
private Property<List<String>> notIn;
private Property<Boolean> isNull;
private Property<Boolean> isNotNull;
}

View File

@@ -0,0 +1,17 @@
package io.kestra.core.test.flow;
import lombok.Builder;
import lombok.Getter;
import java.util.List;
import java.util.Map;
@Getter
@Builder
public class Fixtures {
private Map<String, Object> inputs;
private List<TaskFixture> tasks;
private TriggerFixture trigger;
}

View File

@@ -0,0 +1,25 @@
package io.kestra.core.test.flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import java.util.Map;
@Getter
@Builder
public class TaskFixture {
@NotNull
private String id;
private String value;
@Builder.Default
private State.Type state = State.Type.SUCCESS;
private Map<String, Object> outputs;
private Property<String> description;
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.test.flow;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import java.util.Map;
@Getter
@Builder
public class TriggerFixture {
@NotNull
private String id;
@NotNull
private String type;
private Map<String, Object> variables;
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.test.flow;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import java.util.List;
@Getter
@Builder
public class UnitTest {
@NotNull
private String id;
@NotNull
private String type;
@Builder.Default
private boolean disabled = false;
private String description;
private Fixtures fixtures;
@NotNull
private List<Assertion> assertions;
}

View File

@@ -0,0 +1,159 @@
package io.kestra.core.runners;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.assertj.core.api.AbstractObjectAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
@KestraTest(startRunner = true)
public class TestSuiteTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
protected RunnerUtils runnerUtils;
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ApplicationContext applicationContext;
@Test
@LoadFlows({"flows/valids/return.yaml"})
public void withoutAnyTaskFixture() throws QueueException, TimeoutException {
var fixtures = List.<TaskFixture>of();
var executionResult = runReturnFlow(fixtures);
assertThat(executionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertOutputForTask(executionResult, "task-id")
.isEqualTo("task-id");
assertOutputForTask(executionResult, "flow-id")
.isEqualTo("return");
assertOutputForTask(executionResult, "date")
.satisfies(output -> {
assertThat(output).asString().isNotBlank();
assertThat(ZonedDateTime.parse((String) output)).isCloseTo(ZonedDateTime.now(), within(300, ChronoUnit.SECONDS));
});
}
@Test
@LoadFlows({"flows/valids/return.yaml"})
public void taskFixture() throws TimeoutException, QueueException {
var fixtures = List.of(
TaskFixture.builder()
.id("date")
.build()
);
var executionResult = runReturnFlow(fixtures);
assertThat(executionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertOutputForTask(executionResult, "task-id")
.isEqualTo("task-id");
assertOutputForTask(executionResult, "flow-id")
.isEqualTo("return");
assertOutputForTask(executionResult, "date")
.isNull();
}
@Test
@LoadFlows({"flows/valids/return.yaml"})
public void twoTaskFixturesOverridingOutput() throws QueueException, TimeoutException {
var fixtures = List.of(
TaskFixture.builder()
.id("date")
.outputs(Map.of("value", "my-mocked-output-value"))
.build(),
TaskFixture.builder()
.id("flow-id")
.outputs(Map.of("value", "my-mocked-output-flow-id"))
.build()
);
var executionResult = runReturnFlow(fixtures);
assertThat(executionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertOutputForTask(executionResult, "task-id")
.isEqualTo("task-id");
assertOutputForTask(executionResult, "flow-id")
.isEqualTo("my-mocked-output-flow-id");
assertOutputForTask(executionResult, "date")
.isEqualTo("my-mocked-output-value");
}
@Test
@LoadFlows({"flows/valids/return.yaml"})
public void taskFixturesWithWarningState() throws QueueException, TimeoutException {
var fixtures = List.of(
TaskFixture.builder()
.id("date")
.state(State.Type.WARNING)
.build()
);
var executionResult = runReturnFlow(fixtures);
assertThat(executionResult.getState().getCurrent()).isEqualTo(State.Type.WARNING);
assertTask(executionResult, "task-id")
.extracting(TaskRun::getState).extracting(State::getCurrent)
.isEqualTo(State.Type.SUCCESS);
assertTask(executionResult, "flow-id")
.extracting(TaskRun::getState).extracting(State::getCurrent)
.isEqualTo(State.Type.SUCCESS);
assertTask(executionResult, "date")
.extracting(TaskRun::getState).extracting(State::getCurrent)
.isEqualTo(State.Type.WARNING);
}
private Execution runReturnFlow(List<TaskFixture> fixtures) throws TimeoutException, QueueException {
var flow = flowRepository.findById(null, "io.kestra.tests", "return", Optional.empty()).orElseThrow();
var execution = Execution.builder()
.id(IdUtils.create())
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.fixtures(fixtures)
.state(new State())
.build();
return runnerUtils.runOne(execution, flow, null);
}
private static AbstractObjectAssert<?, Object> assertOutputForTask(Execution executionResult, String taskId) {
return assertTask(executionResult, taskId)
.extracting(TaskRun::getOutputs).extracting(x -> x.get("value"));
}
private static ObjectAssert<TaskRun> assertTask(Execution executionResult, String taskId) {
return assertThat(executionResult.getTaskRunList()).filteredOn(x -> taskId.equals(x.getTaskId())).hasSize(1).first();
}
}

View File

@@ -27,6 +27,7 @@ import io.kestra.core.services.*;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ListUtils;
@@ -689,8 +690,8 @@ public class ExecutionController {
private final URI url;
// This is not nice, but we cannot use @AllArgsConstructor as it would open a bunch of necessary changes on the Execution class.
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent);
ExecutionResponse(String tenantId, String id, String namespace, String flowId, Integer flowRevision, List<TaskRun> taskRunList, Map<String, Object> inputs, Map<String, Object> outputs, List<Label> labels, Map<String, Object> variables, State state, String parentId, String originalId, ExecutionTrigger trigger, boolean deleted, ExecutionMetadata metadata, Instant scheduleDate, String traceParent, List<TaskFixture> fixtures, URI url) {
super(tenantId, id, namespace, flowId, flowRevision, taskRunList, inputs, outputs, labels, variables, state, parentId, originalId, trigger, deleted, metadata, scheduleDate, traceParent, fixtures);
this.url = url;
}
@@ -715,6 +716,7 @@ public class ExecutionController {
execution.getMetadata(),
execution.getScheduleDate(),
execution.getTraceParent(),
execution.getFixtures(),
url
);
}