Compare commits

...

34 Commits

Author SHA1 Message Date
Sandip Mandal
f5a0dcc024 chore(core): make sure kv listing is filterable (#11536)
Closes https://github.com/kestra-io/kestra/issues/11413.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-29 09:30:09 +02:00
Satvik Kushwaha
5c079b8b6b chore(namespaces): update page title on single namespace page (#11551)
Closes https://github.com/kestra-io/kestra/issues/11428.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-29 09:21:26 +02:00
Barthélémy Ledoux
343d6b4eb9 refactor(plugins): update documentation to use typescript and composition api (#11543) 2025-09-27 09:33:26 +01:00
Kenneth Rabe
d34d547412 fix(pebble): correct return format of timestampMicro 2025-09-26 16:51:35 +02:00
Nicolas K.
7a542a24e2 fix(executor): remove debug log (#11548)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-09-26 15:03:08 +02:00
Nicolas K.
5b1db68752 fix(test): flaky test with unwanted repeat test annotation (#11547)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-09-26 14:50:26 +02:00
Nicolas K.
5b07b643d3 fix(test): disable flaky test and add configuration to the ELS indexe… (#11539)
* fix(test): disable flaky test and add configuration to the ELS indexer poll duration

* fix(test): retry a flaky test and fix a flaky

* feat(test): disable a test until we have time to fix the bug

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-09-26 14:19:20 +02:00
Barthélémy Ledoux
0e059772e4 chore: remove posthog in dev mode (#11540) 2025-09-26 10:49:27 +01:00
Loïc Mathieu
f72e294e54 chore(system): log machine information at startup
This will log this kind of line at startup, helping to understand possible infrastructure limitation by looking at the starting logs.

```
14:38:17.018 INFO  main         i.k.c.c.s.AbstractServerCommand Machine information: 16 available cpu(s), 2048MB max memory, Java version 21.0.5+11-LTS
```
2025-09-26 10:55:05 +02:00
Loïc Mathieu
98dd884149 chore(executions): always log errors from the executor
- Logs errors from the Executor catched execution
- Logs errors from the Scheduler catched execution
- Avoid most places where the warning "unable to change state already..." could occur
- Log using the run context logger flow issues from executable tasks so they appears inside execution logs
2025-09-26 10:43:05 +02:00
Loïc Mathieu
26c4f080fd chore(deps): use the version of bcpkix-jdk18on from the platform 2025-09-26 10:42:47 +02:00
yuri1969
01293de91c fix(core): enable runIf at execution updating tasks 2025-09-25 10:23:13 +02:00
Mustafa Tarek
892b69f10e fix(core): Add warning logs for mismatched (Parent-Subflow) inputs (#11431)
* fix(core): Add warning logs for mismatched (Parent-Subflow) inputs for subflow plugin.

* feat: add check and log to FlowInputOutput.java

* enhancement: avoid unnecessary input validation in ExecutableUtils.subflowExecution() when no mismatches exist
2025-09-25 10:08:37 +02:00
yuri1969
6f70d4d275 fix(core): amend test
Adjusted to e1d2c30e which made the execution fail on empty value.
2025-09-25 09:49:19 +02:00
yuri1969
b41d2e456f fix(core): do not allow empty labels
* Filtered empty  entries on Labels task.
* Checking empty Flow labels via validation.
* Adjusted UI to disallow setting empty labels.
2025-09-25 09:49:19 +02:00
UncleBigBay
5ec08eda8c feat (layout): new sidebar total collapse behaviour (#11471)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-09-25 12:06:24 +05:30
dependabot[bot]
7ed6b883ff build(deps): bump io.micronaut.openapi:micronaut-openapi-bom
Bumps [io.micronaut.openapi:micronaut-openapi-bom](https://github.com/micronaut-projects/micronaut-openapi) from 6.18.0 to 6.18.1.
- [Release notes](https://github.com/micronaut-projects/micronaut-openapi/releases)
- [Commits](https://github.com/micronaut-projects/micronaut-openapi/compare/v6.18.0...v6.18.1)

---
updated-dependencies:
- dependency-name: io.micronaut.openapi:micronaut-openapi-bom
  dependency-version: 6.18.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:22:28 +02:00
dependabot[bot]
eb166c9321 build(deps): bump jakarta.mail:jakarta.mail-api from 2.1.4 to 2.1.5
Bumps [jakarta.mail:jakarta.mail-api](https://github.com/jakartaee/mail-api) from 2.1.4 to 2.1.5.
- [Release notes](https://github.com/jakartaee/mail-api/releases)
- [Commits](https://github.com/jakartaee/mail-api/compare/2.1.4...2.1.5)

---
updated-dependencies:
- dependency-name: jakarta.mail:jakarta.mail-api
  dependency-version: 2.1.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:22:05 +02:00
dependabot[bot]
57aad1b931 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.13 to 0.39.0.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.13...v0.39.0)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.39.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:21:47 +02:00
dependabot[bot]
60fe5b5c76 build(deps): bump org.apache.logging.log4j:log4j-to-slf4j
Bumps org.apache.logging.log4j:log4j-to-slf4j from 2.25.1 to 2.25.2.

---
updated-dependencies:
- dependency-name: org.apache.logging.log4j:log4j-to-slf4j
  dependency-version: 2.25.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:21:29 +02:00
dependabot[bot]
98c69b53bb build(deps): bump software.amazon.awssdk:bom from 2.33.11 to 2.34.2
Bumps software.amazon.awssdk:bom from 2.33.11 to 2.34.2.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.34.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:21:10 +02:00
dependabot[bot]
d5d38559b4 build(deps): bump com.github.oshi:oshi-core from 6.8.3 to 6.9.0
Bumps [com.github.oshi:oshi-core](https://github.com/oshi/oshi) from 6.8.3 to 6.9.0.
- [Release notes](https://github.com/oshi/oshi/releases)
- [Changelog](https://github.com/oshi/oshi/blob/master/CHANGELOG.md)
- [Commits](https://github.com/oshi/oshi/compare/oshi-parent-6.8.3...oshi-parent-6.9.0)

---
updated-dependencies:
- dependency-name: com.github.oshi:oshi-core
  dependency-version: 6.9.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:20:50 +02:00
dependabot[bot]
4273ddc4f6 build(deps): bump org.apache.httpcomponents.core5:httpcore5-h2
Bumps [org.apache.httpcomponents.core5:httpcore5-h2](https://github.com/apache/httpcomponents-core) from 5.3.5 to 5.3.6.
- [Changelog](https://github.com/apache/httpcomponents-core/blob/rel/v5.3.6/RELEASE_NOTES.txt)
- [Commits](https://github.com/apache/httpcomponents-core/compare/rel/v5.3.5...rel/v5.3.6)

---
updated-dependencies:
- dependency-name: org.apache.httpcomponents.core5:httpcore5-h2
  dependency-version: 5.3.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:20:10 +02:00
dependabot[bot]
980c573a30 build(deps): bump org.eclipse.angus:jakarta.mail from 2.0.4 to 2.0.5
Bumps org.eclipse.angus:jakarta.mail from 2.0.4 to 2.0.5.

---
updated-dependencies:
- dependency-name: org.eclipse.angus:jakarta.mail
  dependency-version: 2.0.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:18:43 +02:00
dependabot[bot]
27109015f9 build(deps): bump org.projectlombok:lombok from 1.18.40 to 1.18.42
Bumps [org.projectlombok:lombok](https://github.com/projectlombok/lombok) from 1.18.40 to 1.18.42.
- [Changelog](https://github.com/projectlombok/lombok/blob/master/doc/changelog.markdown)
- [Commits](https://github.com/projectlombok/lombok/compare/v1.18.40...v1.18.42)

---
updated-dependencies:
- dependency-name: org.projectlombok:lombok
  dependency-version: 1.18.42
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:18:15 +02:00
dependabot[bot]
eba7d4f375 build(deps): bump bouncycastleVersion from 1.81 to 1.82
Bumps `bouncycastleVersion` from 1.81 to 1.82.

Updates `org.bouncycastle:bcprov-jdk18on` from 1.81 to 1.82
- [Changelog](https://github.com/bcgit/bc-java/blob/main/docs/releasenotes.html)
- [Commits](https://github.com/bcgit/bc-java/commits)

Updates `org.bouncycastle:bcpg-jdk18on` from 1.81 to 1.82
- [Changelog](https://github.com/bcgit/bc-java/blob/main/docs/releasenotes.html)
- [Commits](https://github.com/bcgit/bc-java/commits)

Updates `org.bouncycastle:bcpkix-jdk18on` from 1.81 to 1.82
- [Changelog](https://github.com/bcgit/bc-java/blob/main/docs/releasenotes.html)
- [Commits](https://github.com/bcgit/bc-java/commits)

---
updated-dependencies:
- dependency-name: org.bouncycastle:bcprov-jdk18on
  dependency-version: '1.82'
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.bouncycastle:bcpg-jdk18on
  dependency-version: '1.82'
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.bouncycastle:bcpkix-jdk18on
  dependency-version: '1.82'
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 16:17:53 +02:00
dependabot[bot]
655a1172ee build(deps): bump org.assertj:assertj-core from 3.27.4 to 3.27.6
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.27.4 to 3.27.6.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.27.4...assertj-build-3.27.6)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-version: 3.27.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 15:45:31 +02:00
dependabot[bot]
6e49a85acd build(deps): bump org.owasp.dependencycheck from 12.1.3 to 12.1.5
Bumps org.owasp.dependencycheck from 12.1.3 to 12.1.5.

---
updated-dependencies:
- dependency-name: org.owasp.dependencycheck
  dependency-version: 12.1.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-24 15:44:40 +02:00
Barthélémy Ledoux
4515bad6bd fix(flows): delete flows should work (#11469) 2025-09-24 09:35:47 +02:00
Loïc Mathieu
226dbd30c9 fix(tests): fix test flow namespace and id 2025-09-24 09:19:31 +02:00
mustafatarek
6b0c190edc feat: added test case covering ForEach Iteration 2025-09-24 09:19:31 +02:00
mustafatarek
c64df40a36 refactor: change iteration to start with 0 2025-09-24 09:19:31 +02:00
mustafatarek
8af22d1bb2 fix(core): fix ForEach plugin task.iteration property to show the correct number of Iteration 2025-09-24 09:19:31 +02:00
Nicolas K.
b294457953 feat(tests): rework runner utils to not use the queue during testing (#11380)
* feat(tests): rework runner utils to not use the queue during testing

* feat(tests): rework runner utils to not use the queue during testing

* test: rework RetryCaseTest to not rely on executionQueue

* fix(tests): don't catch the Queue exception

* fix(tests): don't catch the Queue exception

* fix compile

* fix(test): concurrency error and made runner test parallel ready

* fix(tests): remove test instance

* feat(tests): use Test Runner Utils

* fix(tests): flaky tests

* fix(test): flaky tests

* feat(tests): rework runner utils to not use the queue during testing

* feat(tests): rework runner utils to not use the queue during testing

* test: rework RetryCaseTest to not rely on executionQueue

* fix(tests): don't catch the Queue exception

* fix(tests): don't catch the Queue exception

* fix compile

* fix(test): concurrency error and made runner test parallel ready

* fix(tests): remove test instance

* feat(tests): use Test Runner Utils

* fix(tests): flaky tests

* fix(test): flaky tests

* fix(tests): flaky set test

* fix(tests): remove RunnerUtils

* fix(tests): fix flaky

* feat(test): rework runner tests to remove the queue usage

* feat(test): fix a flaky and remove parallelism from mysql test suit

* fix(tests): flaky tests

* clean(tests): unwanted test

* add debug exec when fail

* feat(tests): add thread to mysql thread pool

* fix(test): flaky and disable a test

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
Co-authored-by: Roman Acevedo <roman.acevedo62@gmail.com>
2025-09-24 08:18:02 +02:00
108 changed files with 1573 additions and 1955 deletions

View File

@@ -37,7 +37,7 @@ plugins {
id "com.vanniktech.maven.publish" version "0.34.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.3" apply false
id "org.owasp.dependencycheck" version "12.1.5" apply false
}
idea {

View File

@@ -2,19 +2,27 @@ package io.kestra.cli.commands.servers;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.contexts.KestraContext;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
abstract public class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
@Slf4j
public abstract class AbstractServerCommand extends AbstractCommand implements ServerCommandInterface {
@CommandLine.Option(names = {"--port"}, description = "The port to bind")
Integer serverPort;
@Override
public Integer call() throws Exception {
log.info("Machine information: {} available cpu(s), {}MB max memory, Java version {}", Runtime.getRuntime().availableProcessors(), maxMemoryInMB(), Runtime.version());
this.shutdownHook(true, () -> KestraContext.getContext().shutdown());
return super.call();
}
private long maxMemoryInMB() {
return Runtime.getRuntime().maxMemory() / 1024 / 1024;
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 8;
}

View File

@@ -18,6 +18,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junitpioneer.jupiter.RetryingTest;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
@@ -94,7 +95,7 @@ class FileChangedEventListenerTest {
);
}
@Test
@RetryingTest(2)
void testWithPluginDefault() throws IOException, TimeoutException {
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
// remove the flow if it already exists

View File

@@ -84,7 +84,7 @@ dependencies {
testImplementation "org.testcontainers:testcontainers:1.21.3"
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
testImplementation "org.bouncycastle:bcpkix-jdk18on:1.81"
testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -2,12 +2,13 @@ package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.NotEmpty;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
public record Label(@NotEmpty String key, @NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels
@@ -41,7 +42,7 @@ public record Label(@NotNull String key, @NotNull String value) {
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
.filter(label -> label.value() != null && !label.value().isEmpty() && label.key() != null && !label.key().isEmpty())
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
@@ -56,6 +57,7 @@ public record Label(@NotNull String key, @NotNull String value) {
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.filter(getEntryNotEmptyPredicate())
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
@@ -70,6 +72,7 @@ public record Label(@NotNull String key, @NotNull String value) {
if (map == null || map.isEmpty()) return List.of();
return map.entrySet()
.stream()
.filter(getEntryNotEmptyPredicate())
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.toList();
}
@@ -88,4 +91,14 @@ public record Label(@NotNull String key, @NotNull String value) {
}
return map;
}
/**
* Provides predicate for not empty entries.
*
* @return The non-empty filter
*/
public static Predicate<Map.Entry<String, String>> getEntryNotEmptyPredicate() {
return entry -> entry.getKey() != null && !entry.getKey().isEmpty() &&
entry.getValue() != null && !entry.getValue().isEmpty();
}
}

View File

@@ -865,20 +865,18 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param e the exception raise
* @return new taskRun with updated attempt with logs
*/
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun,
TaskRunAttempt lastAttempt, Exception e) {
private FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) {
TaskRun failed = taskRun
.withAttempts(
Stream
.concat(
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
Stream.of(lastAttempt.getState().isFailed() ? lastAttempt : lastAttempt.withState(State.Type.FAILED))
)
.toList()
);
return new FailedTaskRunWithLog(
taskRun
.withAttempts(
Stream
.concat(
taskRun.getAttempts().stream().limit(taskRun.getAttempts().size() - 1),
Stream.of(lastAttempt
.withState(State.Type.FAILED))
)
.toList()
)
.withState(State.Type.FAILED),
failed.getState().isFailed() ? failed : failed.withState(State.Type.FAILED),
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(taskRun, kind))
);
}

View File

@@ -62,6 +62,7 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
@Valid
List<Label> labels;
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)

View File

@@ -5,10 +5,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
@@ -29,6 +26,7 @@ import org.apache.commons.lang3.stream.Streams;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import static io.kestra.core.trace.Tracer.throwCallable;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -153,17 +151,24 @@ public final class ExecutableUtils {
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
.orElseThrow(() -> {
String msg = "Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'";
runContext.logger().error(msg);
return new IllegalStateException(msg);
});
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
String msg = "Cannot execute a flow which is disabled";
runContext.logger().error(msg);
throw new IllegalStateException(msg);
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
String msg = "Cannot execute an invalid flow: " + fwe.getException();
runContext.logger().error(msg);
throw new IllegalStateException(msg);
}
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
}
@@ -201,7 +206,20 @@ public final class ExecutableUtils {
.build()
)
.withScheduleDate(scheduleOnDate);
if(execution.getInputs().size()<inputs.size()) {
Map<String,Object>resolvedInputs=execution.getInputs();
for (var inputKey : inputs.keySet()) {
if (!resolvedInputs.containsKey(inputKey)) {
runContext.logger().warn(
"Input {} was provided by parent execution {} for subflow {}.{} but isn't declared at the subflow inputs",
inputKey,
currentExecution.getId(),
currentTask.subflowId().namespace(),
currentTask.subflowId().flowId()
);
}
}
}
// inject the traceparent into the new execution
propagator.ifPresent(pg -> pg.inject(Context.current(), execution, ExecutionTextMapSetter.INSTANCE));

View File

@@ -49,15 +49,7 @@ import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
@@ -231,6 +223,19 @@ public class FlowInputOutput {
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
})
.collect(HashMap::new, (m,v)-> m.put(v.getKey(), v.getValue()), HashMap::putAll);
if (resolved.size() < data.size()) {
RunContext runContext = runContextFactory.of(flow, execution);
for (var inputKey : data.keySet()) {
if (!resolved.containsKey(inputKey)) {
runContext.logger().warn(
"Input {} was provided for workflow {}.{} but isn't declared in the workflow inputs",
inputKey,
flow.getNamespace(),
flow.getId()
);
}
}
}
return MapUtils.flattenToNestedMap(resolved);
}
@@ -313,15 +318,15 @@ public class FlowInputOutput {
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
resolvable.isDefault(true);
}
// validate and parse input value
if (value == null) {
if (input.getRequired()) {
@@ -350,7 +355,7 @@ public class FlowInputOutput {
return resolvable.get();
}
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
@@ -367,7 +372,7 @@ public class FlowInputOutput {
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
};
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
@@ -376,7 +381,7 @@ public class FlowInputOutput {
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
@@ -453,7 +458,7 @@ public class FlowInputOutput {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
return Optional.of(new AbstractMap.SimpleEntry<>(
@@ -530,17 +535,17 @@ public class FlowInputOutput {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
@@ -583,9 +588,9 @@ public class FlowInputOutput {
}
public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
}
public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
}

View File

@@ -500,7 +500,7 @@ public class FlowableUtils {
ArrayList<ResolvedTask> result = new ArrayList<>();
int index = 0;
int iteration = 0;
for (Object current : distinctValue) {
try {
String resolvedValue = current instanceof String stringValue ? stringValue : MAPPER.writeValueAsString(current);
@@ -508,7 +508,7 @@ public class FlowableUtils {
result.add(ResolvedTask.builder()
.task(task)
.value(resolvedValue)
.iteration(index++)
.iteration(iteration)
.parentId(parentTaskRun.getId())
.build()
);
@@ -516,6 +516,7 @@ public class FlowableUtils {
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
iteration++;
}
return result;

View File

@@ -30,6 +30,6 @@ public class TimestampMicroFilter extends AbstractDate implements Filter {
ZoneId zoneId = zoneId(timeZone);
ZonedDateTime date = convert(input, zoneId, existingFormat);
return String.valueOf(TimeUnit.SECONDS.toNanos(date.toEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(date.getNano()));
return String.valueOf(TimeUnit.SECONDS.toMicros(date.toEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(date.getNano()));
}
}

View File

@@ -46,16 +46,19 @@ public class VersionProvider {
this.date = loadTime(gitProperties);
this.version = loadVersion(buildProperties, gitProperties);
// check the version in the settings and update if needed, we did't use it would allow us to detect incompatible update later if needed
if (settingRepository.isPresent()) {
Optional<Setting> versionSetting = settingRepository.get().findByKey(Setting.INSTANCE_VERSION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(this.version)) {
settingRepository.get().save(Setting.builder()
.key(Setting.INSTANCE_VERSION)
.value(this.version)
.build()
);
}
// check the version in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
settingRepository.ifPresent(
settingRepositoryInterface -> persistVersion(settingRepositoryInterface, version));
}
private static synchronized void persistVersion(SettingRepositoryInterface settingRepositoryInterface, String version) {
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_VERSION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(version)) {
settingRepositoryInterface.save(Setting.builder()
.key(Setting.INSTANCE_VERSION)
.value(version)
.build()
);
}
}

View File

@@ -155,6 +155,7 @@ public class Labels extends Task implements ExecutionUpdatableTask {
newLabels.putAll(labelsAsMap);
return execution.withLabels(newLabels.entrySet().stream()
.filter(Label.getEntryNotEmptyPredicate())
.map(entry -> new Label(
entry.getKey(),
entry.getValue()

View File

@@ -1,19 +1,32 @@
package io.kestra.core.models;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class LabelTest {
@Inject
private ModelValidator modelValidator;
@Test
void shouldGetNestedMapGivenDistinctLabels() {
Map<String, Object> result = Label.toNestedMap(List.of(
new Label(Label.USERNAME, "test"),
new Label(Label.CORRELATION_ID, "id"))
new Label(Label.CORRELATION_ID, "id"),
new Label("", "bar"),
new Label(null, "bar"),
new Label("foo", ""),
new Label("baz", null)
)
);
assertThat(result).isEqualTo(
@@ -34,6 +47,18 @@ class LabelTest {
);
}
@Test
void toNestedMapShouldIgnoreEmptyOrNull() {
Map<String, Object> result = Label.toNestedMap(List.of(
new Label("", "bar"),
new Label(null, "bar"),
new Label("foo", ""),
new Label("baz", null))
);
assertThat(result).isEmpty();
}
@Test
void shouldGetMapGivenDistinctLabels() {
Map<String, String> result = Label.toMap(List.of(
@@ -59,6 +84,18 @@ class LabelTest {
);
}
@Test
void toMapShouldIgnoreEmptyOrNull() {
Map<String, String> result = Label.toMap(List.of(
new Label("", "bar"),
new Label(null, "bar"),
new Label("foo", ""),
new Label("baz", null))
);
assertThat(result).isEmpty();
}
@Test
void shouldDuplicateLabelsWithKeyOrderKept() {
List<Label> result = Label.deduplicate(List.of(
@@ -73,4 +110,28 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id")
);
}
@Test
void deduplicateShouldIgnoreEmptyAndNull() {
List<Label> result = Label.deduplicate(List.of(
new Label("", "bar"),
new Label(null, "bar"),
new Label("foo", ""),
new Label("baz", null))
);
assertThat(result).isEmpty();
}
@Test
void shouldValidateEmpty() {
Optional<ConstraintViolationException> validLabelResult = modelValidator.isValid(new Label("foo", "bar"));
assertThat(validLabelResult.isPresent()).isFalse();
Optional<ConstraintViolationException> emptyValueLabelResult = modelValidator.isValid(new Label("foo", ""));
assertThat(emptyValueLabelResult.isPresent()).isTrue();
Optional<ConstraintViolationException> emptyKeyLabelResult = modelValidator.isValid(new Label("", "bar"));
assertThat(emptyKeyLabelResult.isPresent()).isTrue();
}
}

View File

@@ -7,12 +7,11 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueException;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.services.GraphService;
import io.kestra.core.utils.GraphUtils;
@@ -45,7 +44,7 @@ class FlowGraphTest {
private TriggerRepositoryInterface triggerRepositoryInterface;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
void simple() throws IllegalVariableEvaluationException, IOException {

View File

@@ -1,281 +0,0 @@
package io.kestra.core.repositories;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.Helpers;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@KestraTest
public abstract class AbstractLoadedFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;
protected static final String TENANT = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
private static final AtomicBoolean IS_INIT = new AtomicBoolean();
@BeforeEach
protected synchronized void init() throws IOException, URISyntaxException {
initFlows(repositoryLoader);
}
protected static synchronized void initFlows(LocalFlowRepositoryLoader repo) throws IOException, URISyntaxException {
if (!IS_INIT.get()){
TestsUtils.loads(TENANT, repo);
IS_INIT.set(true);
}
}
@Test
void findAll() {
List<Flow> save = flowRepository.findAll(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSource() {
List<FlowWithSource> save = flowRepository.findAllWithSource(TENANT);
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllForAllTenants() {
List<Flow> save = flowRepository.findAllForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findAllWithSourceForAllTenants() {
List<FlowWithSource> save = flowRepository.findAllWithSourceForAllTenants();
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 24);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefixWithSource() {
List<FlowWithSource> save = flowRepository.findByNamespacePrefixWithSource(TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationPartial() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT - 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating at MAX-1, it should return MAX-1")
.isEqualTo(Helpers.FLOWS_COUNT - 1);
}
@Test
void find_paginationGreaterThanExisting() {
assertThat(flowRepository.find(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(Pageable.from(1, (int) Helpers.FLOWS_COUNT + 1, Sort.UNSORTED), TENANT, null)
.size())
.describedAs("When paginating requesting a larger amount than existing, it should return existing MAX")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_prefixMatchingAllNamespaces() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.STARTS_WITH).value("io.kestra.tests").build()
)
).size())
.describedAs("When filtering on NAMESPACE START_WITH a pattern that match all, it should return all")
.isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void find_aSpecifiedNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests2").build()
)
).size()).isEqualTo(1L);
}
@Test
void find_aSpecificSubNamespace() {
assertThat(flowRepository.find(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
assertThat(flowRepository.findWithSource(
Pageable.UNPAGED,
TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests.minimal.bis").build()
)
).size())
.isEqualTo(1L);
}
@Test
void find_aSpecificLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(
Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("country", "FR")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_aSpecificFlowByNamespaceAndLabel() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key2", "value2")).build()
)
).size())
.isEqualTo(1);
}
@Test
void find_noResult_forAnUnknownNamespace() {
assertThat(
flowRepository.find(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
assertThat(
flowRepository.findWithSource(Pageable.UNPAGED, TENANT,
List.of(
QueryFilter.builder().field(QueryFilter.Field.NAMESPACE).operation(QueryFilter.Op.EQUALS).value("io.kestra.tests").build(),
QueryFilter.builder().field(QueryFilter.Field.LABELS).operation(QueryFilter.Op.EQUALS).value(Map.of("key1", "value2")).build()
)
).size())
.isEqualTo(0);
}
@Test
protected void findSpecialChars() {
ArrayListTotal<SearchResult<Flow>> save = flowRepository.findSourceCode(Pageable.unpaged(), "https://api.chucknorris.io", TENANT, null);
assertThat((long) save.size()).isEqualTo(2L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(9L);
}
@Test
void shouldReturnForGivenQueryWildCardFilters() {
List<QueryFilter> filters = List.of(
QueryFilter.builder().field(QueryFilter.Field.QUERY).operation(QueryFilter.Op.EQUALS).value("*").build()
);
ArrayListTotal<Flow> flows = flowRepository.find(Pageable.from(1, 10), TENANT, filters);
assertThat(flows.size()).isEqualTo(10);
assertThat(flows.getTotal()).isEqualTo(Helpers.FLOWS_COUNT);
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
@@ -29,15 +28,17 @@ import static org.assertj.core.api.Assertions.assertThat;
// must be per-class to allow calling once init() which took a lot of time
public abstract class AbstractRunnerTest {
public static final String TENANT_1 = "tenant1";
public static final String TENANT_2 = "tenant2";
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logsQueue;
@Inject
private RestartCaseTest restartCaseTest;
protected RestartCaseTest restartCaseTest;
@Inject
protected FlowTriggerCaseTest flowTriggerCaseTest;
@@ -49,13 +50,13 @@ public abstract class AbstractRunnerTest {
private PluginDefaultsCaseTest pluginDefaultsCaseTest;
@Inject
private FlowCaseTest flowCaseTest;
protected FlowCaseTest flowCaseTest;
@Inject
private WorkingDirectoryTest.Suite workingDirectoryTest;
@Inject
private PauseTest.Suite pauseTest;
protected PauseTest.Suite pauseTest;
@Inject
private SkipExecutionCaseTest skipExecutionCaseTest;
@@ -67,10 +68,10 @@ public abstract class AbstractRunnerTest {
protected LoopUntilCaseTest loopUntilTestCaseTest;
@Inject
private FlowConcurrencyCaseTest flowConcurrencyCaseTest;
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Inject
private ScheduleDateCaseTest scheduleDateCaseTest;
protected ScheduleDateCaseTest scheduleDateCaseTest;
@Inject
protected FlowInputOutput flowIO;
@@ -79,7 +80,7 @@ public abstract class AbstractRunnerTest {
private SLATestCase slaTestCase;
@Inject
private ChangeStateTestCase changeStateTestCase;
protected ChangeStateTestCase changeStateTestCase;
@Inject
private AfterExecutionTestCase afterExecutionTestCase;
@@ -172,7 +173,7 @@ public abstract class AbstractRunnerTest {
@Test
@LoadFlows({"flows/valids/restart_local_errors.yaml"})
void restartFailedThenFailureWithLocalErrors() throws Exception {
protected void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}
@@ -195,12 +196,12 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/trigger-flow-listener-no-inputs.yaml",
@LoadFlows(value = {"flows/valids/trigger-flow-listener-no-inputs.yaml",
"flows/valids/trigger-flow-listener.yaml",
"flows/valids/trigger-flow-listener-namespace-condition.yaml",
"flows/valids/trigger-flow.yaml"})
"flows/valids/trigger-flow.yaml"}, tenantId = "listener-tenant")
void flowTrigger() throws Exception {
flowTriggerCaseTest.trigger();
flowTriggerCaseTest.trigger("listener-tenant");
}
@Test // flaky on CI but never fail locally
@@ -210,13 +211,11 @@ public abstract class AbstractRunnerTest {
flowTriggerCaseTest.triggerWithPause();
}
@FlakyTest
@Disabled
@Test
@LoadFlows({"flows/valids/trigger-flow-listener-with-concurrency-limit.yaml",
"flows/valids/trigger-flow-with-concurrency-limit.yaml"})
@LoadFlows(value = {"flows/valids/trigger-flow-listener-with-concurrency-limit.yaml",
"flows/valids/trigger-flow-with-concurrency-limit.yaml"}, tenantId = "trigger-tenant")
void flowTriggerWithConcurrencyLimit() throws Exception {
flowTriggerCaseTest.triggerWithConcurrencyLimit();
flowTriggerCaseTest.triggerWithConcurrencyLimit("trigger-tenant");
}
@Test
@@ -228,11 +227,11 @@ public abstract class AbstractRunnerTest {
}
@Test // Flaky on CI but never locally even with 100 repetitions
@LoadFlows({"flows/valids/trigger-flow-listener-namespace-condition.yaml",
@LoadFlows(value = {"flows/valids/trigger-flow-listener-namespace-condition.yaml",
"flows/valids/trigger-multiplecondition-flow-c.yaml",
"flows/valids/trigger-multiplecondition-flow-d.yaml"})
"flows/valids/trigger-multiplecondition-flow-d.yaml"}, tenantId = "condition-tenant")
void multipleConditionTriggerFailed() throws Exception {
multipleConditionTriggerCaseTest.failed();
multipleConditionTriggerCaseTest.failed("condition-tenant");
}
@Test
@@ -245,11 +244,11 @@ public abstract class AbstractRunnerTest {
@Disabled
@Test
@LoadFlows({"flows/valids/flow-trigger-preconditions-flow-listen.yaml",
@LoadFlows(value = {"flows/valids/flow-trigger-preconditions-flow-listen.yaml",
"flows/valids/flow-trigger-preconditions-flow-a.yaml",
"flows/valids/flow-trigger-preconditions-flow-b.yaml"})
"flows/valids/flow-trigger-preconditions-flow-b.yaml"}, tenantId = TENANT_1)
void flowTriggerPreconditionsMergeOutputs() throws Exception {
multipleConditionTriggerCaseTest.flowTriggerPreconditionsMergeOutputs();
multipleConditionTriggerCaseTest.flowTriggerPreconditionsMergeOutputs(TENANT_1);
}
@Test
@@ -279,19 +278,19 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/switch.yaml",
@LoadFlows(value = {"flows/valids/switch.yaml",
"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml"})
"flows/valids/task-flow-inherited-labels.yaml"}, tenantId = TENANT_1)
void flowWaitFailed() throws Exception {
flowCaseTest.waitFailed();
flowCaseTest.waitFailed(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/switch.yaml",
@LoadFlows(value = {"flows/valids/switch.yaml",
"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml"})
"flows/valids/task-flow-inherited-labels.yaml"}, tenantId = TENANT_2)
public void invalidOutputs() throws Exception {
flowCaseTest.invalidOutputs();
flowCaseTest.invalidOutputs(TENANT_2);
}
@Test
@@ -301,9 +300,9 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows(value = {"flows/valids/working-directory.yaml"}, tenantId = "tenant1")
@LoadFlows(value = {"flows/valids/working-directory.yaml"}, tenantId = TENANT_1)
public void workerFailed() throws Exception {
workingDirectoryTest.failed("tenant1", runnerUtils);
workingDirectoryTest.failed(TENANT_1, runnerUtils);
}
@Test
@@ -354,7 +353,6 @@ public abstract class AbstractRunnerTest {
skipExecutionCaseTest.skipExecution();
}
@Disabled
@Test
@LoadFlows({"flows/valids/for-each-item-subflow.yaml",
"flows/valids/for-each-item.yaml"})
@@ -363,12 +361,11 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/for-each-item.yaml"})
@LoadFlows(value = {"flows/valids/for-each-item.yaml"}, tenantId = TENANT_1)
protected void forEachItemEmptyItems() throws Exception {
forEachItemCaseTest.forEachItemEmptyItems();
forEachItemCaseTest.forEachItemEmptyItems(TENANT_1);
}
@Disabled
@Test
@LoadFlows({"flows/valids/for-each-item-subflow-failed.yaml",
"flows/valids/for-each-item-failed.yaml"})
@@ -384,16 +381,16 @@ public abstract class AbstractRunnerTest {
}
@Test // flaky on CI but always pass locally even with 100 iterations
@LoadFlows({"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"})
@LoadFlows(value = {"flows/valids/restart-for-each-item.yaml", "flows/valids/restart-child.yaml"}, tenantId = TENANT_1)
void restartForEachItem() throws Exception {
forEachItemCaseTest.restartForEachItem();
forEachItemCaseTest.restartForEachItem(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/for-each-item-subflow.yaml",
"flows/valids/for-each-item-in-if.yaml"})
@LoadFlows(value = {"flows/valids/for-each-item-subflow.yaml",
"flows/valids/for-each-item-in-if.yaml"}, tenantId = TENANT_1)
protected void forEachItemInIf() throws Exception {
forEachItemCaseTest.forEachItemInIf();
forEachItemCaseTest.forEachItemInIf(TENANT_1);
}
@Test
@@ -434,12 +431,11 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1)
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1);
}
@Disabled
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
@@ -453,9 +449,9 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"})
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1)
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow();
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1);
}
@Test
@@ -510,9 +506,9 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/minimal.yaml"})
@LoadFlows(value = {"flows/valids/minimal.yaml"}, tenantId = TENANT_1)
void shouldScheduleOnDate() throws Exception {
scheduleDateCaseTest.shouldScheduleOnDate();
scheduleDateCaseTest.shouldScheduleOnDate(TENANT_1);
}
@Test
@@ -534,15 +530,15 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/sla-execution-condition.yaml"})
@LoadFlows(value = {"flows/valids/sla-execution-condition.yaml"}, tenantId = TENANT_1)
void executionConditionSLAShouldCancel() throws Exception {
slaTestCase.executionConditionSLAShouldCancel();
slaTestCase.executionConditionSLAShouldCancel(TENANT_1);
}
@Test
@LoadFlows({"flows/valids/sla-execution-condition.yaml"})
@LoadFlows(value = {"flows/valids/sla-execution-condition.yaml"}, tenantId = TENANT_2)
void executionConditionSLAShouldLabel() throws Exception {
slaTestCase.executionConditionSLAShouldLabel();
slaTestCase.executionConditionSLAShouldLabel(TENANT_2);
}
@Test
@@ -562,15 +558,15 @@ public abstract class AbstractRunnerTest {
}
@Test
@ExecuteFlow("flows/valids/failed-first.yaml")
@ExecuteFlow(value = "flows/valids/failed-first.yaml", tenantId = TENANT_1)
public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
changeStateTestCase.changeStateShouldEndsInSuccess(execution);
}
@Test
@LoadFlows({"flows/valids/failed-first.yaml", "flows/valids/subflow-parent-of-failed.yaml"})
@LoadFlows(value = {"flows/valids/failed-first.yaml", "flows/valids/subflow-parent-of-failed.yaml"}, tenantId = TENANT_2)
public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
changeStateTestCase.changeStateInSubflowShouldEndsParentFlowInSuccess();
changeStateTestCase.changeStateInSubflowShouldEndsParentFlowInSuccess(TENANT_2);
}
@Test

View File

@@ -3,25 +3,18 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@Singleton
public class ChangeStateTestCase {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private FlowRepositoryInterface flowRepository;
@@ -29,11 +22,7 @@ public class ChangeStateTestCase {
private ExecutionService executionService;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
public void changeStateShouldEndsInSuccess(Execution execution) throws Exception {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
@@ -41,73 +30,40 @@ public class ChangeStateTestCase {
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// await for the last execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().getCurrent() == State.Type.SUCCESS) {
lastExecution.set(exec);
latch.countDown();
}
});
Flow flow = flowRepository.findByExecution(execution);
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);
Execution lastExecution = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), markedAs);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(lastExecution.get().getTaskRunList()).hasSize(2);
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(lastExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(lastExecution.getTaskRunList()).hasSize(2);
assertThat(lastExecution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void changeStateInSubflowShouldEndsParentFlowInSuccess() throws Exception {
// await for the subflow execution
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();
Flux<Execution> receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if ("failed-first".equals(exec.getFlowId()) && exec.getState().getCurrent() == State.Type.FAILED) {
lastExecution.set(exec);
latch.countDown();
}
});
public void changeStateInSubflowShouldEndsParentFlowInSuccess(String tenantId) throws Exception {
// run the parent flow
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-of-failed");
Execution execution = runnerUtils.runOne(tenantId, NAMESPACE, "subflow-parent-of-failed");
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// assert on the subflow
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(lastExecution.get().getTaskRunList()).hasSize(1);
assertThat(lastExecution.get().getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// await for the parent execution
CountDownLatch parentLatch = new CountDownLatch(1);
AtomicReference<Execution> lastParentExecution = new AtomicReference<>();
receivedExecutions = TestsUtils.receive(executionQueue, either -> {
Execution exec = either.getLeft();
if (execution.getId().equals(exec.getId()) && exec.getState().isTerminated()) {
lastParentExecution.set(exec);
parentLatch.countDown();
}
});
Execution lastExecution = runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.FAILED), tenantId, NAMESPACE, "failed-first");
assertThat(lastExecution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(lastExecution.getTaskRunList()).hasSize(1);
assertThat(lastExecution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// restart the subflow
Flow flow = flowRepository.findByExecution(lastExecution.get());
Execution markedAs = executionService.markAs(lastExecution.get(), flow, lastExecution.get().getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);
Flow flow = flowRepository.findByExecution(lastExecution);
Execution markedAs = executionService.markAs(lastExecution, flow, lastExecution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
runnerUtils.emitAndAwaitExecution(e -> e.getState().isTerminated(), markedAs);
//We wait for the subflow execution to pass from failed to success
Execution lastParentExecution = runnerUtils.awaitFlowExecution(e ->
e.getTaskRunList().getFirst().getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "subflow-parent-of-failed");
// assert for the parent flow
assertThat(parentLatch.await(10, TimeUnit.SECONDS)).isTrue();
receivedExecutions.blockLast();
assertThat(lastParentExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED); // FIXME should be success but it's FAILED on unit tests
assertThat(lastParentExecution.get().getTaskRunList()).hasSize(1);
assertThat(lastParentExecution.get().getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(lastParentExecution.getState().getCurrent()).isEqualTo(State.Type.FAILED); // FIXME should be success but it's FAILED on unit tests
assertThat(lastParentExecution.getTaskRunList()).hasSize(1);
assertThat(lastParentExecution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
}

View File

@@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class EmptyVariablesTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;

View File

@@ -58,7 +58,7 @@ class ExecutionServiceTest {
LogRepositoryInterface logRepository;
@Inject
RunnerUtils runnerUtils;
TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/restart_last_failed.yaml"})

View File

@@ -3,20 +3,15 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.History;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.reporter.model.Count;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import java.io.File;
import java.io.FileInputStream;
@@ -26,24 +21,21 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowConcurrencyCaseTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;
@@ -51,400 +43,168 @@ public class FlowConcurrencyCaseTest {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private ExecutionService executionService;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
public void flowConcurrencyCancel() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
CountDownLatch latch1 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
}
public void flowConcurrencyFail() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-fail");
public void flowConcurrencyFail() throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.FAILED);
CountDownLatch latch1 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
}
public void flowConcurrencyQueue() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueue() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch3.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertTrue(latch3.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueuePause() throws TimeoutException, QueueException, InterruptedException {
AtomicReference<String> firstExecutionId = new AtomicReference<>();
var firstExecutionResult = new AtomicReference<Execution>();
var secondExecutionResult = new AtomicReference<Execution>();
CountDownLatch firstExecutionLatch = new CountDownLatch(1);
CountDownLatch secondExecutionLatch = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (!"flow-concurrency-queue-pause".equals(e.getLeft().getFlowId())){
return;
}
String currentId = e.getLeft().getId();
Type currentState = e.getLeft().getState().getCurrent();
if (firstExecutionId.get() == null) {
firstExecutionId.set(currentId);
}
if (currentId.equals(firstExecutionId.get())) {
if (currentState == State.Type.SUCCESS) {
firstExecutionResult.set(e.getLeft());
firstExecutionLatch.countDown();
}
} else {
if (currentState == State.Type.SUCCESS) {
secondExecutionResult.set(e.getLeft());
secondExecutionLatch.countDown();
}
}
});
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-pause");
public void flowConcurrencyQueuePause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause");
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-pause", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
Execution firstExecutionResult = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
assertThat(execution1.getState().isPaused()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertTrue(firstExecutionLatch.await(10, TimeUnit.SECONDS));
assertTrue(secondExecutionLatch.await(10, TimeUnit.SECONDS));
receive.blockLast();
assertThat(firstExecutionResult.get().getId()).isEqualTo(execution1.getId());
assertThat(firstExecutionResult.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.get().getId()).isEqualTo(execution2.getId());
assertThat(secondExecutionResult.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(secondExecutionResult.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
assertThat(firstExecutionResult.getId()).isEqualTo(execution1.getId());
assertThat(firstExecutionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.getId()).isEqualTo(execution2.getId());
assertThat(secondExecutionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyCancelPause() throws TimeoutException, QueueException, InterruptedException {
AtomicReference<String> firstExecutionId = new AtomicReference<>();
var firstExecutionResult = new AtomicReference<Execution>();
var secondExecutionResult = new AtomicReference<Execution>();
CountDownLatch firstExecLatch = new CountDownLatch(1);
CountDownLatch secondExecLatch = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (!"flow-concurrency-cancel-pause".equals(e.getLeft().getFlowId())){
return;
}
String currentId = e.getLeft().getId();
Type currentState = e.getLeft().getState().getCurrent();
if (firstExecutionId.get() == null) {
firstExecutionId.set(currentId);
}
if (currentId.equals(firstExecutionId.get())) {
if (currentState == State.Type.SUCCESS) {
firstExecutionResult.set(e.getLeft());
firstExecLatch.countDown();
}
} else {
if (currentState == State.Type.CANCELLED) {
secondExecutionResult.set(e.getLeft());
secondExecLatch.countDown();
}
}
});
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel-pause");
public void flowConcurrencyCancelPause() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause");
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel-pause", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
Execution firstExecutionResult = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
assertThat(execution1.getState().isPaused()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertTrue(firstExecLatch.await(10, TimeUnit.SECONDS));
assertTrue(secondExecLatch.await(10, TimeUnit.SECONDS));
receive.blockLast();
assertThat(firstExecutionResult.get().getId()).isEqualTo(execution1.getId());
assertThat(firstExecutionResult.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.get().getId()).isEqualTo(execution2.getId());
assertThat(secondExecutionResult.get().getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
assertThat(firstExecutionResult.getId()).isEqualTo(execution1.getId());
assertThat(firstExecutionResult.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(secondExecutionResult.getId()).isEqualTo(execution2.getId());
assertThat(secondExecutionResult.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
assertThat(secondExecutionResult.getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
}
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
URI file = storageUpload();
public void flowConcurrencyWithForEachItem(String tenantId) throws QueueException, URISyntaxException, IOException {
URI file = storageUpload(tenantId);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
Execution forEachItem = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
Set<String> executionIds = new HashSet<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
executionIds.add(e.getLeft().getId());
}
});
// wait a little to be sure there are not too many executions started
Thread.sleep(500);
assertThat(executionIds).hasSize(1);
receive.blockLast();
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
Execution terminated = runnerUtils.awaitExecution(e -> e.getState().isTerminated(),forEachItem);
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
List<Execution> executions = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-queue");
assertThat(executions).extracting(e -> e.getState().getCurrent()).containsOnly(Type.SUCCESS);
assertThat(executions.stream()
.map(e -> e.getState().getHistories())
.flatMap(List::stream)
.map(History::getState)
.toList()).contains(Type.QUEUED);
}
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE,
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(2);
AtomicReference<Execution> failedExecution = new AtomicReference<>();
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
failedExecution.set(e.getLeft());
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
latch3.countDown();
}
}
});
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertThat(failedExecution.get()).isNotNull();
// here the first fail and the second is now running.
// we restart the first one, it should be queued then fail again.
Execution restarted = executionService.restart(failedExecution.get(), null);
executionQueue.emit(restarted);
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
Execution restarted = executionService.restart(failedExecution, null);
Execution executionResult1 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted);
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
assertTrue(latch3.await(1, TimeUnit.MINUTES));
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
// it should have been queued after restarted
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
assertThat(executionResult1.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
public void flowConcurrencyQueueAfterExecution() throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
Execution executionResult1 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution1);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch3.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertTrue(latch3.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencySubflow() throws TimeoutException, QueueException, InterruptedException {
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch canceledLatch = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
successLatch.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.CANCELLED) {
canceledLatch.countDown();
}
}
public void flowConcurrencySubflow(String tenantId) throws TimeoutException, QueueException {
runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-subflow", null, null, Duration.ofSeconds(30));
runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.SUCCESS);
// assert we have one canceled subflow and one in success
assertTrue(canceledLatch.await(1, TimeUnit.MINUTES));
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
receive.blockLast();
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
// run another execution to be sure that everything work (purge is correctly done)
CountDownLatch newSuccessLatch = new CountDownLatch(1);
Flux<Execution> secondReceive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
newSuccessLatch.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
Execution execution3 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
// assert we have two successful subflow
assertTrue(newSuccessLatch.await(1, TimeUnit.MINUTES));
secondReceive.blockLast();
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
}
private URI storageUpload() throws URISyntaxException, IOException {
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
tenantId,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)

View File

@@ -2,82 +2,61 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowTriggerCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
public static final String NAMESPACE = "io.kestra.tests.trigger";
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
public void trigger() throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<Execution> flowListener = new AtomicReference<>();
AtomicReference<Execution> flowListenerNoInput = new AtomicReference<>();
AtomicReference<Execution> flowListenerNamespace = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (flowListenerNoInput.get() == null && execution.getFlowId().equals("trigger-flow-listener-no-inputs")) {
flowListenerNoInput.set(execution);
countDownLatch.countDown();
} else if (flowListener.get() == null && execution.getFlowId().equals("trigger-flow-listener")) {
flowListener.set(execution);
countDownLatch.countDown();
} else if (flowListenerNamespace.get() == null && execution.getFlowId().equals("trigger-flow-listener-namespace-condition")) {
flowListenerNamespace.set(execution);
countDownLatch.countDown();
}
}
});
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger", "trigger-flow");
public void trigger(String tenantId) throws InterruptedException, TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(tenantId, NAMESPACE, "trigger-flow");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertTrue(countDownLatch.await(15, TimeUnit.SECONDS), "remaining countdown: %s".formatted(countDownLatch.getCount()));
receive.blockLast();
Execution flowListenerNoInput = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE,
"trigger-flow-listener-no-inputs");
Execution flowListener = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE,
"trigger-flow-listener");
Execution flowListenerNamespace = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE,
"trigger-flow-listener-namespace-condition");
assertThat(flowListener.get().getTaskRunList().size()).isEqualTo(1);
assertThat(flowListener.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowListener.get().getTaskRunList().getFirst().getOutputs().get("value")).isEqualTo("childs: from parents: " + execution.getId());
assertThat(flowListener.get().getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(flowListener.get().getTrigger().getVariables().get("namespace")).isEqualTo("io.kestra.tests.trigger");
assertThat(flowListener.get().getTrigger().getVariables().get("flowId")).isEqualTo("trigger-flow");
assertThat(flowListenerNoInput.get().getTaskRunList().size()).isEqualTo(1);
assertThat(flowListenerNoInput.get().getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(flowListenerNoInput.get().getTrigger().getVariables().get("namespace")).isEqualTo("io.kestra.tests.trigger");
assertThat(flowListenerNoInput.get().getTrigger().getVariables().get("flowId")).isEqualTo("trigger-flow");
assertThat(flowListenerNoInput.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowListener.getTaskRunList().size()).isEqualTo(1);
assertThat(flowListener.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowListener.getTaskRunList().getFirst().getOutputs().get("value")).isEqualTo("childs: from parents: " + execution.getId());
assertThat(flowListener.getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(flowListener.getTrigger().getVariables().get("namespace")).isEqualTo(NAMESPACE);
assertThat(flowListener.getTrigger().getVariables().get("flowId")).isEqualTo("trigger-flow");
assertThat(flowListenerNamespace.get().getTaskRunList().size()).isEqualTo(1);
assertThat(flowListenerNamespace.get().getTrigger().getVariables().get("namespace")).isEqualTo("io.kestra.tests.trigger");
assertThat(flowListenerNoInput.getTaskRunList().size()).isEqualTo(1);
assertThat(flowListenerNoInput.getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(flowListenerNoInput.getTrigger().getVariables().get("namespace")).isEqualTo(NAMESPACE);
assertThat(flowListenerNoInput.getTrigger().getVariables().get("flowId")).isEqualTo("trigger-flow");
assertThat(flowListenerNoInput.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(flowListenerNamespace.getTaskRunList().size()).isEqualTo(1);
assertThat(flowListenerNamespace.getTrigger().getVariables().get("namespace")).isEqualTo(NAMESPACE);
// it will be triggered for 'trigger-flow' or any of the 'trigger-flow-listener*', so we only assert that it's one of them
assertThat(flowListenerNamespace.get().getTrigger().getVariables().get("flowId"))
assertThat(flowListenerNamespace.getTrigger().getVariables().get("flowId"))
.satisfiesAnyOf(
arg -> assertThat(arg).isEqualTo("trigger-flow"),
arg -> assertThat(arg).isEqualTo("trigger-flow-listener-no-inputs"),
@@ -85,56 +64,43 @@ public class FlowTriggerCaseTest {
);
}
public void triggerWithPause() throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(4);
List<Execution> flowListeners = new ArrayList<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId().equals("trigger-flow-listener-with-pause")) {
flowListeners.add(execution);
countDownLatch.countDown();
}
});
public void triggerWithPause() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.pause", "trigger-flow-with-pause");
assertThat(execution.getTaskRunList().size()).isEqualTo(3);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertTrue(countDownLatch.await(15, TimeUnit.SECONDS));
receive.blockLast();
List<Execution> triggeredExec = runnerUtils.awaitFlowExecutionNumber(
4,
MAIN_TENANT,
"io.kestra.tests.trigger.pause",
"trigger-flow-listener-with-pause");
assertThat(flowListeners.size()).isEqualTo(4);
assertThat(flowListeners.get(0).getOutputs().get("status")).isEqualTo("RUNNING");
assertThat(flowListeners.get(1).getOutputs().get("status")).isEqualTo("PAUSED");
assertThat(flowListeners.get(2).getOutputs().get("status")).isEqualTo("RUNNING");
assertThat(flowListeners.get(3).getOutputs().get("status")).isEqualTo("SUCCESS");
assertThat(triggeredExec.size()).isEqualTo(4);
List<Execution> sortedExecs = triggeredExec.stream()
.sorted(Comparator.comparing(e -> e.getState().getEndDate().orElse(Instant.now())))
.toList();
assertThat(sortedExecs.get(0).getOutputs().get("status")).isEqualTo("RUNNING");
assertThat(sortedExecs.get(1).getOutputs().get("status")).isEqualTo("PAUSED");
assertThat(sortedExecs.get(2).getOutputs().get("status")).isEqualTo("RUNNING");
assertThat(sortedExecs.get(3).getOutputs().get("status")).isEqualTo("SUCCESS");
}
public void triggerWithConcurrencyLimit() throws QueueException, TimeoutException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Execution> flowListeners = new ArrayList<>();
public void triggerWithConcurrencyLimit(String tenantId) throws QueueException, TimeoutException {
Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, "io.kestra.tests.trigger.concurrency", "trigger-flow-with-concurrency-limit");
Execution execution2 = runnerUtils.runOne(tenantId, "io.kestra.tests.trigger.concurrency", "trigger-flow-with-concurrency-limit");
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId().equals("trigger-flow-listener-with-concurrency-limit")) {
flowListeners.add(execution);
countDownLatch.countDown();
}
});
List<Execution> triggeredExec = runnerUtils.awaitFlowExecutionNumber(
5,
tenantId,
"io.kestra.tests.trigger.concurrency",
"trigger-flow-listener-with-concurrency-limit");
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests.trigger.concurrency", "trigger-flow-with-concurrency-limit");
Execution execution2 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests.trigger.concurrency", "trigger-flow-with-concurrency-limit");
assertTrue(countDownLatch.await(15, TimeUnit.SECONDS));
receive.blockLast();
assertThat(flowListeners.size()).isEqualTo(5);
assertThat(flowListeners.stream().anyMatch(e -> e.getOutputs().get("status").equals("RUNNING") && e.getOutputs().get("executionId").equals(execution1.getId()))).isTrue();
assertThat(flowListeners.stream().anyMatch(e -> e.getOutputs().get("status").equals("SUCCESS") && e.getOutputs().get("executionId").equals(execution1.getId()))).isTrue();
assertThat(flowListeners.stream().anyMatch(e -> e.getOutputs().get("status").equals("QUEUED") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
assertThat(flowListeners.stream().anyMatch(e -> e.getOutputs().get("status").equals("RUNNING") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
assertThat(flowListeners.stream().anyMatch(e -> e.getOutputs().get("status").equals("SUCCESS") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
assertThat(triggeredExec.size()).isEqualTo(5);
assertThat(triggeredExec.stream().anyMatch(e -> e.getOutputs().get("status").equals("RUNNING") && e.getOutputs().get("executionId").equals(execution1.getId()))).isTrue();
assertThat(triggeredExec.stream().anyMatch(e -> e.getOutputs().get("status").equals("SUCCESS") && e.getOutputs().get("executionId").equals(execution1.getId()))).isTrue();
assertThat(triggeredExec.stream().anyMatch(e -> e.getOutputs().get("status").equals("QUEUED") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
assertThat(triggeredExec.stream().anyMatch(e -> e.getOutputs().get("status").equals("RUNNING") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
assertThat(triggeredExec.stream().anyMatch(e -> e.getOutputs().get("status").equals("SUCCESS") && e.getOutputs().get("executionId").equals(execution2.getId()))).isTrue();
}
}

View File

@@ -48,7 +48,7 @@ public class InputsTest {
private QueueInterface<LogEntry> logQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
public static Map<String, Object> inputs = ImmutableMap.<String, Object>builder()
.put("string", "myString")

View File

@@ -24,7 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class ListenersTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private LocalFlowRepositoryLoader repositoryLoader;

View File

@@ -1,243 +1,168 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.queues.QueueException;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import java.time.Duration;
import java.util.List;
import io.micronaut.data.model.Pageable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class MultipleConditionTriggerCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
public static final String NAMESPACE = "io.kestra.tests.trigger";
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
protected FlowRepositoryInterface flowRepository;
@Inject
protected ExecutionRepositoryInterface executionRepository;
@Inject
protected ApplicationContext applicationContext;
public void trigger() throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
ConcurrentHashMap<String, Execution> ended = new ConcurrentHashMap<>();
List<String> watchedExecutions = List.of("trigger-multiplecondition-flow-a",
"trigger-multiplecondition-flow-b",
"trigger-multiplecondition-listener"
);
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (watchedExecutions.contains(execution.getFlowId()) && execution.getState().getCurrent() == State.Type.SUCCESS) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
}
});
// first one
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger",
"trigger-multiplecondition-flow-a", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "trigger-multiplecondition-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// wait a little to be sure that the trigger is not launching execution
Thread.sleep(1000);
assertThat(ended.size()).isEqualTo(1);
ArrayListTotal<Execution> flowBExecutions = executionRepository.findByFlowId(MAIN_TENANT,
NAMESPACE, "trigger-multiplecondition-flow-b", Pageable.UNPAGED);
ArrayListTotal<Execution> listenerExecutions = executionRepository.findByFlowId(MAIN_TENANT,
NAMESPACE, "trigger-multiplecondition-listener", Pageable.UNPAGED);
assertThat(flowBExecutions).isEmpty();
assertThat(listenerExecutions).isEmpty();
// second one
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger",
"trigger-multiplecondition-flow-b", Duration.ofSeconds(60));
execution = runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "trigger-multiplecondition-flow-b");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
receive.blockLast();
assertThat(ended.size()).isEqualTo(3);
Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.trigger",
"trigger-multiplecondition-listener").orElseThrow();
Execution triggerExecution = ended.entrySet()
.stream()
.filter(e -> e.getValue().getFlowId().equals(flow.getId()))
.findFirst()
.map(Map.Entry::getValue)
.orElseThrow();
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, NAMESPACE, "trigger-multiplecondition-listener");
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggerExecution.getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(triggerExecution.getTrigger().getVariables().get("namespace")).isEqualTo("io.kestra.tests.trigger");
assertThat(triggerExecution.getTrigger().getVariables().get("namespace")).isEqualTo(
NAMESPACE);
assertThat(triggerExecution.getTrigger().getVariables().get("flowId")).isEqualTo("trigger-multiplecondition-flow-b");
}
public void failed() throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> listener = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("trigger-flow-listener-namespace-condition")
&& execution.getState().getCurrent().isTerminated()) {
listener.set(execution);
countDownLatch.countDown();
}
});
public void failed(String tenantId) throws InterruptedException, TimeoutException, QueueException {
// first one
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger",
"trigger-multiplecondition-flow-c", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne(tenantId, NAMESPACE,
"trigger-multiplecondition-flow-c");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait a little to be sure that the trigger is not launching execution
Thread.sleep(1000);
assertThat(listener.get()).isNull();
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(tenantId, NAMESPACE,
"trigger-multiplecondition-flow-d", Pageable.UNPAGED);
assertThat(byFlowId).isEmpty();
// second one
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger",
"trigger-multiplecondition-flow-d", Duration.ofSeconds(60));
execution = runnerUtils.runOne(tenantId, NAMESPACE,
"trigger-multiplecondition-flow-d");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
tenantId, NAMESPACE, "trigger-flow-listener-namespace-condition");
// trigger was not done
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
receive.blockLast();
assertThat(listener.get()).isNotNull();
assertThat(listener.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void flowTriggerPreconditions()
throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> flowTrigger = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId()
.equals("flow-trigger-preconditions-flow-listen")) {
flowTrigger.set(execution);
countDownLatch.countDown();
}
});
public void flowTriggerPreconditions() throws TimeoutException, QueueException {
// flowA
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-a", Duration.ofSeconds(60));
"flow-trigger-preconditions-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// flowB: we trigger it two times, as flow-trigger-flow-preconditions-flow-listen is configured with resetOnSuccess: false it should be triggered two times
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-a", Duration.ofSeconds(60));
"flow-trigger-preconditions-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-b", Duration.ofSeconds(60));
"flow-trigger-preconditions-flow-b");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
receive.blockLast();
assertThat(flowTrigger.get()).isNotNull();
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.preconditions", "flow-trigger-preconditions-flow-listen");
Execution triggerExecution = flowTrigger.get();
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggerExecution.getTrigger().getVariables().get("outputs")).isNotNull();
assertThat((Map<String, Object>) triggerExecution.getTrigger().getVariables().get("outputs")).containsEntry("some", "value");
}
public void flowTriggerPreconditionsMergeOutputs() throws QueueException, TimeoutException, InterruptedException {
public void flowTriggerPreconditionsMergeOutputs(String tenantId) throws QueueException, TimeoutException {
// we do the same as in flowTriggerPreconditions() but we trigger flows in the opposite order to be sure that outputs are merged
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> flowTrigger = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId()
.equals("flow-trigger-preconditions-flow-listen")) {
flowTrigger.set(execution);
countDownLatch.countDown();
}
});
// flowB
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-b", Duration.ofSeconds(60));
Execution execution = runnerUtils.runOne(tenantId, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-b");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// flowA
execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-a", Duration.ofSeconds(60));
execution = runnerUtils.runOne(tenantId, "io.kestra.tests.trigger.preconditions",
"flow-trigger-preconditions-flow-a");
assertThat(execution.getTaskRunList().size()).isEqualTo(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
receive.blockLast();
assertThat(flowTrigger.get()).isNotNull();
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
tenantId, "io.kestra.tests.trigger.preconditions", "flow-trigger-preconditions-flow-listen");
Execution triggerExecution = flowTrigger.get();
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggerExecution.getTrigger().getVariables().get("outputs")).isNotNull();
assertThat((Map<String, Object>) triggerExecution.getTrigger().getVariables().get("outputs")).containsEntry("some", "value");
}
public void flowTriggerOnPaused()
throws InterruptedException, TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> flowTrigger = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getState().getCurrent() == State.Type.SUCCESS && execution.getFlowId()
.equals("flow-trigger-paused-listen")) {
flowTrigger.set(execution);
countDownLatch.countDown();
}
});
public void flowTriggerOnPaused() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests.trigger.paused",
"flow-trigger-paused-flow", Duration.ofSeconds(60));
"flow-trigger-paused-flow");
assertThat(execution.getTaskRunList().size()).isEqualTo(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// trigger is done
assertTrue(countDownLatch.await(1, TimeUnit.SECONDS));
receive.blockLast();
assertThat(flowTrigger.get()).isNotNull();
Execution triggerExecution = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().equals(Type.SUCCESS),
MAIN_TENANT, "io.kestra.tests.trigger.paused", "flow-trigger-paused-listen");
Execution triggerExecution = flowTrigger.get();
assertThat(triggerExecution.getTaskRunList().size()).isEqualTo(1);
assertThat(triggerExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}

View File

@@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@Singleton
public class PluginDefaultsCaseTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
public void taskDefaults() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "plugin-defaults", Duration.ofSeconds(60));

View File

@@ -4,29 +4,19 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
@@ -35,38 +25,30 @@ public class RestartCaseTest {
private FlowRepositoryInterface flowRepository;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private ExecutionService executionService;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
public void restartFailedThenSuccess() throws Exception {
Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "restart_last_failed").orElseThrow();
Execution firstExecution = runnerUtils.runOne(MAIN_TENANT, flow.getNamespace(), flow.getId(), Duration.ofSeconds(60));
Execution firstExecution = runnerUtils.runOne(MAIN_TENANT, flow.getNamespace(), flow.getId());
assertThat(firstExecution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(firstExecution.getTaskRunList()).hasSize(3);
assertThat(firstExecution.getTaskRunList().get(2).getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(3);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
executionQueue.emit(restartedExec);
}),
Duration.ofSeconds(60)
restartedExec
);
assertThat(finishedRestartedExecution).isNotNull();
@@ -93,19 +75,16 @@ public class RestartCaseTest {
assertThat(firstExecution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.getTaskRunList().getFirst().getAttempts().size() == 2,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(1);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
}),
Duration.ofSeconds(60)
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(1);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.getTaskRunList().getFirst().getAttempts().size() == 2,
restartedExec
);
assertThat(finishedRestartedExecution).isNotNull();
@@ -128,19 +107,16 @@ public class RestartCaseTest {
assertThat(firstExecution.getTaskRunList().get(3).getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.findTaskRunsByTaskId("failStep").stream().findFirst().get().getAttempts().size() == 2,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
}),
Duration.ofSeconds(60)
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(4);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
execution -> execution.getState().getCurrent() == State.Type.FAILED && execution.findTaskRunsByTaskId("failStep").stream().findFirst().get().getAttempts().size() == 2,
restartedExec
);
assertThat(finishedRestartedExecution).isNotNull();
@@ -163,21 +139,19 @@ public class RestartCaseTest {
assertThat(firstExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
// wait
Execution restartedExec = executionService.replay(firstExecution, firstExecution.findTaskRunByTaskIdAndValue("2_end", List.of()).getId(), null);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getState().getHistories()).hasSize(4);
assertThat(restartedExec.getTaskRunList()).hasSize(20);
assertThat(restartedExec.getTaskRunList().get(19).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getId()).isNotEqualTo(firstExecution.getId());
assertThat(restartedExec.getTaskRunList().get(1).getId()).isNotEqualTo(firstExecution.getTaskRunList().get(1).getId());
Execution finishedRestartedExecution = runnerUtils.awaitChildExecution(
flow,
firstExecution,
throwRunnable(() -> {
Execution restartedExec = executionService.replay(firstExecution, firstExecution.findTaskRunByTaskIdAndValue("2_end", List.of()).getId(), null);
executionQueue.emit(restartedExec);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getState().getHistories()).hasSize(4);
assertThat(restartedExec.getTaskRunList()).hasSize(20);
assertThat(restartedExec.getTaskRunList().get(19).getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restartedExec.getId()).isNotEqualTo(firstExecution.getId());
assertThat(restartedExec.getTaskRunList().get(1).getId()).isNotEqualTo(firstExecution.getTaskRunList().get(1).getId());
}),
restartedExec,
Duration.ofSeconds(60)
);
@@ -195,71 +169,58 @@ public class RestartCaseTest {
Execution restart = executionService.restart(execution, null);
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution restartEnded = runnerUtils.awaitExecution(
Execution restartEnded = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
throwRunnable(() -> executionQueue.emit(restart)),
Duration.ofSeconds(120)
restart,
Duration.ofSeconds(60)
);
assertThat(restartEnded.getState().getCurrent()).isEqualTo(State.Type.FAILED);
Execution newRestart = executionService.restart(restartEnded, null);
restartEnded = runnerUtils.awaitExecution(
restartEnded = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED,
throwRunnable(() -> executionQueue.emit(newRestart)),
Duration.ofSeconds(120)
newRestart,
Duration.ofSeconds(60)
);
assertThat(restartEnded.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
public void restartSubflow() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<Execution> receiveSubflows = TestsUtils.receive(executionQueue, either -> {
Execution subflowExecution = either.getLeft();
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isFailed()) {
countDownLatch.countDown();
}
});
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "restart-parent");
assertThat(execution.getTaskRunList()).hasSize(3);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
// here we must have 1 failed subflows
assertTrue(countDownLatch.await(1, TimeUnit.MINUTES));
receiveSubflows.blockLast();
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().isFailed(), MAIN_TENANT, "io.kestra.tests", "restart-child");
// there is 3 values so we must restart it 3 times to end the 3 subflows
CountDownLatch successLatch = new CountDownLatch(3);
receiveSubflows = TestsUtils.receive(executionQueue, either -> {
Execution subflowExecution = either.getLeft();
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isSuccess()) {
successLatch.countDown();
}
});
Execution restarted1 = executionService.restart(execution, null);
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED && e.getFlowId().equals("restart-parent"),
throwRunnable(() -> executionQueue.emit(restarted1)),
restarted1,
Duration.ofSeconds(10)
);
Execution restarted2 = executionService.restart(execution, null);
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.FAILED && e.getFlowId().equals("restart-parent"),
throwRunnable(() -> executionQueue.emit(restarted2)),
restarted2,
Duration.ofSeconds(10)
);
Execution restarted3 = executionService.restart(execution, null);
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-parent"),
throwRunnable(() -> executionQueue.emit(restarted3)),
restarted3,
Duration.ofSeconds(10)
);
assertThat(execution.getTaskRunList()).hasSize(6);
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
receiveSubflows.blockLast();
List<Execution> childExecutions = runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, "io.kestra.tests", "restart-child");
List<Execution> successfulRestart = childExecutions.stream()
.filter(e -> e.getState().getCurrent().equals(Type.SUCCESS)).toList();
assertThat(successfulRestart).hasSize(3);
}
public void restartFailedWithFinally() throws Exception {
@@ -272,18 +233,15 @@ public class RestartCaseTest {
assertThat(firstExecution.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
executionQueue.emit(restartedExec);
}),
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess(),
restartedExec,
Duration.ofSeconds(60)
);
@@ -309,21 +267,18 @@ public class RestartCaseTest {
assertThat(firstExecution.getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.FAILED);
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();
assertThat(restartedExec.getId()).isEqualTo(firstExecution.getId());
assertThat(restartedExec.getParentId()).isNull();
assertThat(restartedExec.getTaskRunList().size()).isEqualTo(2);
assertThat(restartedExec.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
executionQueue.emit(restartedExec);
}),
Execution finishedRestartedExecution = runnerUtils.emitAndAwaitExecution(
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess(),
restartedExec,
Duration.ofSeconds(60)
);
assertThat(finishedRestartedExecution).isNotNull();
assertThat(finishedRestartedExecution.getId()).isEqualTo(firstExecution.getId());
assertThat(finishedRestartedExecution.getParentId()).isNull();

View File

@@ -98,7 +98,7 @@ class RunContextTest {
private FlowInputOutput flowIO;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
protected LocalFlowRepositoryLoader repositoryLoader;

View File

@@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@Singleton
public class SLATestCase {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
public void maxDurationSLAShouldFail() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "sla-max-duration-fail");
@@ -36,14 +36,14 @@ public class SLATestCase {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void executionConditionSLAShouldCancel() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "sla-execution-condition", null, (f, e) -> Map.of("string", "CANCEL"));
public void executionConditionSLAShouldCancel(String tenantId) throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(tenantId, "io.kestra.tests", "sla-execution-condition", null, (f, e) -> Map.of("string", "CANCEL"));
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
}
public void executionConditionSLAShouldLabel() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "sla-execution-condition", null, (f, e) -> Map.of("string", "LABEL"));
public void executionConditionSLAShouldLabel(String tenantId) throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(tenantId, "io.kestra.tests", "sla-execution-condition", null, (f, e) -> Map.of("string", "LABEL"));
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getLabels()).contains(new Label("sla", "violated"));

View File

@@ -3,54 +3,31 @@ package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.Type;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class ScheduleDateCaseTest {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
private TestRunnerUtils runnerUtils;
public void shouldScheduleOnDate() throws QueueException, InterruptedException {
public void shouldScheduleOnDate(String tenantId) throws QueueException {
ZonedDateTime scheduleOn = ZonedDateTime.now().plusSeconds(1);
Flow flow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests", "minimal").orElseThrow();
Flow flow = flowRepository.findById(tenantId, "io.kestra.tests", "minimal").orElseThrow();
Execution execution = Execution.newExecution(flow, null, null, Optional.of(scheduleOn));
this.executionQueue.emit(execution);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertThat(execution.getScheduleDate()).isEqualTo(scheduleOn.toInstant());
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.CREATED);
CountDownLatch latch1 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution.getId())) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution);
}
}

View File

@@ -32,7 +32,7 @@ public class SkipExecutionCaseTest {
protected QueueInterface<Execution> executionQueue;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private ExecutionRepositoryInterface executionRepository;

View File

@@ -30,7 +30,7 @@ public class TaskCacheTest {
static final AtomicInteger COUNTER = new AtomicInteger(0);
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@BeforeEach
void resetCounter() {

View File

@@ -33,7 +33,7 @@ public class TaskWithAllowFailureTest {
private FlowInputOutput flowIO;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@ExecuteFlow("flows/valids/task-allow-failure-runnable.yml")

View File

@@ -9,6 +9,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.storages.StorageInterface;
import jakarta.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -34,7 +35,7 @@ public class TaskWithAllowWarningTest {
private FlowInputOutput flowIO;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@ExecuteFlow("flows/valids/task-allow-warning-runnable.yml")
@@ -54,6 +55,7 @@ public class TaskWithAllowWarningTest {
}
@Test
@Disabled("This test does not test failing in subflow foreach as the subflow is not called, needs to be rework before reactivation")
@LoadFlows({"flows/valids/task-allow-warning-executable-foreachitem.yml"})
void executableTask_ForEachItem() throws TimeoutException, QueueException, URISyntaxException, IOException {
URI file = storageUpload();

View File

@@ -6,6 +6,8 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
@@ -31,4 +33,15 @@ class TaskWithRunIfTest {
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
}
@Test
@ExecuteFlow("flows/valids/task-runif-executionupdating.yml")
void executionUpdatingTask(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
assertThat(execution.findTaskRunsByTaskId("unsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("setVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getVariables()).containsEntry("list", List.of(42));
}
}

View File

@@ -38,7 +38,7 @@ class TestSuiteTest {
protected QueueInterface<Execution> executionQueue;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
protected FlowRepositoryInterface flowRepository;

View File

@@ -14,10 +14,13 @@ import java.util.Date;
import java.util.Map;
import jakarta.inject.Inject;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@TestInstance(Lifecycle.PER_CLASS)
class DateFilterTest {
public static final ZonedDateTime NOW = ZonedDateTime.parse("2013-09-08T16:19:12.123456+01");
@@ -144,7 +147,7 @@ class DateFilterTest {
)
);
assertThat(render).isEqualTo("1378653552000123456");
assertThat(render).isEqualTo("1378653552123456");
}
@Test

View File

@@ -12,7 +12,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.annotation.MockBean;
@@ -41,7 +41,7 @@ public class SecretFunctionTest {
QueueInterface<LogEntry> logQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private SecretService secretService;

View File

@@ -8,7 +8,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Map;
@@ -19,7 +19,7 @@ import org.junit.jupiter.api.Test;
public class FailTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/fail-on-switch.yaml"})

View File

@@ -5,7 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.Await;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class ResumeTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private ExecutionRepositoryInterface executionRepository;

View File

@@ -6,7 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.executions.Execution;
@@ -14,7 +14,6 @@ import io.kestra.core.models.flows.State;
import java.util.concurrent.TimeoutException;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
@@ -22,7 +21,7 @@ class AllowFailureTest {
@Inject
private FlowInputOutput flowIO;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Test
@ExecuteFlow("flows/valids/allow-failure.yaml")

View File

@@ -7,7 +7,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.LoadFlows;
import jakarta.inject.Inject;
@@ -31,7 +31,7 @@ class CorrelationIdTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/subflow-parent.yaml",

View File

@@ -12,7 +12,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
@@ -33,7 +33,7 @@ public class DagTest {
ModelValidator modelValidator;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;

View File

@@ -4,6 +4,7 @@ import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.TestsUtils;
import org.junit.jupiter.api.Test;
import io.kestra.core.exceptions.InternalException;
@@ -13,7 +14,6 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import java.time.Duration;
import java.util.*;
@@ -34,7 +34,7 @@ public class EachSequentialTest {
QueueInterface<LogEntry> logQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@ExecuteFlow("flows/valids/each-sequential.yaml")
@@ -92,7 +92,7 @@ public class EachSequentialTest {
EachSequentialTest.eachNullTest(runnerUtils, logQueue);
}
public static void eachNullTest(RunnerUtils runnerUtils, QueueInterface<LogEntry> logQueue) throws TimeoutException, QueueException {
public static void eachNullTest(TestRunnerUtils runnerUtils, QueueInterface<LogEntry> logQueue) throws TimeoutException, QueueException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(logQueue, either -> logs.add(either.getLeft()));

View File

@@ -6,9 +6,8 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -24,7 +23,7 @@ class FinallyTest {
public static final String NAMESPACE = "io.kestra.tests";
private static final String TENANT_ID = "tenant1";
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;

View File

@@ -4,96 +4,69 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@Singleton
public class FlowCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
QueueInterface<Execution> executionQueue;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
public void waitSuccess() throws Exception {
this.run("OK", State.Type.SUCCESS, State.Type.SUCCESS, 2, "default > amazing", true);
}
public void waitFailed() throws Exception {
this.run("THIRD", State.Type.FAILED, State.Type.FAILED, 4, "Error Trigger ! error-t1", true);
public void waitFailed(String tenantId) throws Exception {
this.run("THIRD", State.Type.FAILED, State.Type.FAILED, 4, "Error Trigger ! error-t1", true, tenantId);
}
public void invalidOutputs() throws Exception {
this.run("FIRST", State.Type.FAILED, State.Type.SUCCESS, 2, null, true);
public void invalidOutputs(String tenantId) throws Exception {
this.run("FIRST", State.Type.FAILED, State.Type.SUCCESS, 2, null, true, tenantId);
}
public void noLabels() throws Exception {
this.run("OK", State.Type.SUCCESS, State.Type.SUCCESS, 2, "default > amazing", false);
public void noLabels(String tenantId) throws Exception {
this.run("OK", State.Type.SUCCESS, State.Type.SUCCESS, 2, "default > amazing", false, tenantId);
}
public void oldTaskName() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("minimal") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"subflow-old-task-name"
);
countDownLatch.await(1, TimeUnit.MINUTES);
receive.blockLast();
Execution triggered = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().isTerminated(), MAIN_TENANT, "io.kestra.tests",
"minimal");
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("executionId")).isEqualTo(triggered.get().getId());
assertThat(triggered.get().getTrigger().getType()).isEqualTo("io.kestra.core.tasks.flows.Subflow");
assertThat(triggered.get().getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(triggered.get().getTrigger().getVariables().get("flowId")).isEqualTo(execution.getFlowId());
assertThat(triggered.get().getTrigger().getVariables().get("namespace")).isEqualTo(execution.getNamespace());
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("executionId")).isEqualTo(triggered.getId());
assertThat(triggered.getTrigger().getType()).isEqualTo("io.kestra.core.tasks.flows.Subflow");
assertThat(triggered.getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(triggered.getTrigger().getVariables().get("flowId")).isEqualTo(execution.getFlowId());
assertThat(triggered.getTrigger().getVariables().get("namespace")).isEqualTo(execution.getNamespace());
}
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs, boolean testInherited)
throws Exception {
run(input, fromState, triggerState, count, outputs, testInherited, MAIN_TENANT);
}
@SuppressWarnings({"ResultOfMethodCallIgnored", "unchecked"})
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs, boolean testInherited) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("switch") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs, boolean testInherited, String tenantId) throws Exception {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
tenantId,
"io.kestra.tests",
testInherited ? "task-flow" : "task-flow-inherited-labels",
null,
@@ -102,8 +75,8 @@ public class FlowCaseTest {
testInherited ? List.of(new Label("mainFlowExecutionLabel", "execFoo")) : List.of()
);
countDownLatch.await(1, TimeUnit.MINUTES);
receive.blockLast();
Execution triggered = runnerUtils.awaitFlowExecution(
e -> e.getState().getCurrent().isTerminated(), tenantId, "io.kestra.tests", "switch");
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getTaskRunList().getFirst().getAttempts()).hasSize(1);
@@ -114,27 +87,27 @@ public class FlowCaseTest {
assertThat(((Map<String, String>) execution.getTaskRunList().getFirst().getOutputs().get("outputs")).get("extracted")).contains(outputs);
}
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("executionId")).isEqualTo(triggered.get().getId());
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("executionId")).isEqualTo(triggered.getId());
if (outputs != null) {
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("state")).isEqualTo(triggered.get().getState().getCurrent().name());
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("state")).isEqualTo(triggered.getState().getCurrent().name());
}
assertThat(triggered.get().getTrigger().getType()).isEqualTo("io.kestra.plugin.core.flow.Subflow");
assertThat(triggered.get().getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(triggered.get().getTrigger().getVariables().get("flowId")).isEqualTo(execution.getFlowId());
assertThat(triggered.get().getTrigger().getVariables().get("namespace")).isEqualTo(execution.getNamespace());
assertThat(triggered.getTrigger().getType()).isEqualTo("io.kestra.plugin.core.flow.Subflow");
assertThat(triggered.getTrigger().getVariables().get("executionId")).isEqualTo(execution.getId());
assertThat(triggered.getTrigger().getVariables().get("flowId")).isEqualTo(execution.getFlowId());
assertThat(triggered.getTrigger().getVariables().get("namespace")).isEqualTo(execution.getNamespace());
assertThat(triggered.get().getTaskRunList()).hasSize(count);
assertThat(triggered.get().getState().getCurrent()).isEqualTo(triggerState);
assertThat(triggered.getTaskRunList()).hasSize(count);
assertThat(triggered.getState().getCurrent()).isEqualTo(triggerState);
if (testInherited) {
assertThat(triggered.get().getLabels().size()).isEqualTo(6);
assertThat(triggered.get().getLabels()).contains(new Label(Label.CORRELATION_ID, execution.getId()), new Label("mainFlowExecutionLabel", "execFoo"), new Label("mainFlowLabel", "flowFoo"), new Label("launchTaskLabel", "launchFoo"), new Label("switchFlowLabel", "switchFoo"), new Label("overriding", "child"));
assertThat(triggered.getLabels().size()).isEqualTo(6);
assertThat(triggered.getLabels()).contains(new Label(Label.CORRELATION_ID, execution.getId()), new Label("mainFlowExecutionLabel", "execFoo"), new Label("mainFlowLabel", "flowFoo"), new Label("launchTaskLabel", "launchFoo"), new Label("switchFlowLabel", "switchFoo"), new Label("overriding", "child"));
} else {
assertThat(triggered.get().getLabels().size()).isEqualTo(4);
assertThat(triggered.get().getLabels()).contains(new Label(Label.CORRELATION_ID, execution.getId()), new Label("launchTaskLabel", "launchFoo"), new Label("switchFlowLabel", "switchFoo"), new Label("overriding", "child"));
assertThat(triggered.get().getLabels()).doesNotContain(new Label("inherited", "label"));
assertThat(triggered.getLabels().size()).isEqualTo(4);
assertThat(triggered.getLabels()).contains(new Label(Label.CORRELATION_ID, execution.getId()), new Label("launchTaskLabel", "launchFoo"), new Label("switchFlowLabel", "switchFoo"), new Label("overriding", "child"));
assertThat(triggered.getLabels()).doesNotContain(new Label("inherited", "label"));
}
}
}

View File

@@ -5,9 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@KestraTest(startRunner = true)
class FlowTest {
@Inject
@@ -24,25 +22,25 @@ class FlowTest {
@Test
@LoadFlows(value = {"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml",
"flows/valids/switch.yaml"})
"flows/valids/switch.yaml"}, tenantId = "tenant1")
void waitFailed() throws Exception {
flowCaseTest.waitFailed();
flowCaseTest.waitFailed("tenant1");
}
@Test
@LoadFlows({"flows/valids/task-flow.yaml",
@LoadFlows(value = {"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml",
"flows/valids/switch.yaml"})
"flows/valids/switch.yaml"}, tenantId = "tenant2")
void invalidOutputs() throws Exception {
flowCaseTest.invalidOutputs();
flowCaseTest.invalidOutputs("tenant2");
}
@Test
@LoadFlows({"flows/valids/task-flow.yaml",
@LoadFlows(value = {"flows/valids/task-flow.yaml",
"flows/valids/task-flow-inherited-labels.yaml",
"flows/valids/switch.yaml"})
"flows/valids/switch.yaml"}, tenantId = "tenant3")
void noLabels() throws Exception {
flowCaseTest.noLabels();
flowCaseTest.noLabels("tenant3");
}
@Test

View File

@@ -4,20 +4,17 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import java.io.BufferedReader;
import java.io.File;
@@ -31,34 +28,25 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.models.flows.State.Type.FAILED;
import static io.kestra.core.models.flows.State.Type.SUCCESS;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
@Singleton
public class ForEachItemCaseTest {
static final String TEST_NAMESPACE = "io.kestra.tests";
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
@Inject
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;
@@ -66,28 +54,19 @@ public class ForEachItemCaseTest {
@Inject
private ExecutionService executionService;
@Inject
private ExecutionRepositoryInterface executionRepository;
@SuppressWarnings("unchecked")
public void forEachItem() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void forEachItem() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(MAIN_TENANT);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, MAIN_TENANT, TEST_NAMESPACE, "for-each-item-subflow");
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(4);
@@ -103,19 +82,20 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(1);
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
Execution triggered = triggeredExecs.getLast();
assertThat(triggered.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-subflow");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(1);
Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent()).isTrue();
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
}
public void forEachItemEmptyItems() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = emptyItems();
public void forEachItemEmptyItems(String tenantId) throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = emptyItems(tenantId);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item", null,
Execution execution = runnerUtils.runOne(tenantId, TEST_NAMESPACE, "for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
@@ -127,21 +107,8 @@ public class ForEachItemCaseTest {
}
@SuppressWarnings("unchecked")
public void forEachItemNoWait() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow-sleep")) {
if (execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
}
});
URI file = storageUpload();
public void forEachItemNoWait() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(MAIN_TENANT);
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-no-wait", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
@@ -149,7 +116,9 @@ public class ForEachItemCaseTest {
// assert that not all subflows ran (depending on the speed of execution, there can be some)
// be careful that it's racy.
assertThat(countDownLatch.getCount()).isGreaterThan(0L);
ArrayListTotal<Execution> subFlowExecs = executionRepository.findByFlowId(MAIN_TENANT,
TEST_NAMESPACE, "for-each-item-subflow-sleep", Pageable.UNPAGED);
assertThat(subFlowExecs.size()).isLessThanOrEqualTo(26);
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(4);
@@ -165,38 +134,27 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// wait for the 26 flows to ends
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).as("Remaining count was " + countDownLatch.getCount()).isTrue();
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, MAIN_TENANT, TEST_NAMESPACE, "for-each-item-subflow-sleep");
Execution triggered = triggeredExecs.getLast();
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-sleep");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(2);
assertThat(triggered.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-subflow-sleep");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-no-wait/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(2);
}
@SuppressWarnings("unchecked")
public void forEachItemFailed() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow-failed") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void forEachItemFailed() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(MAIN_TENANT);
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-failed", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(60));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, MAIN_TENANT, TEST_NAMESPACE, "for-each-item-subflow-failed");
Execution triggered = triggeredExecs.getLast();
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(3);
@@ -212,34 +170,23 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("FAILED")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(FAILED);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-failed");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(1);
assertThat(triggered.getState().getCurrent()).isEqualTo(FAILED);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-subflow-failed");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(1);
}
@SuppressWarnings("unchecked")
public void forEachItemWithSubflowOutputs() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-outputs-subflow") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void forEachItemWithSubflowOutputs() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(MAIN_TENANT);
Map<String, Object> inputs = Map.of("file", file.toString());
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-outputs", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, MAIN_TENANT, TEST_NAMESPACE, "for-each-item-outputs-subflow");
Execution triggered = triggeredExecs.getLast();
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(5);
@@ -256,10 +203,10 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-outputs-subflow");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(1);
assertThat(triggered.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-outputs-subflow");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(1);
// asserts for subflow merged outputs
Map<String, Object> mergeTaskOutputs = execution.getTaskRunList().get(3).getOutputs();
@@ -272,84 +219,39 @@ public class ForEachItemCaseTest {
}
}
public void restartForEachItem() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(6);
Flux<Execution> receiveSubflows = TestsUtils.receive(executionQueue, either -> {
Execution subflowExecution = either.getLeft();
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isFailed()) {
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void restartForEachItem(String tenantId) throws Exception {
URI file = storageUpload(tenantId);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 20);
final Execution failedExecution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "restart-for-each-item", null,
final Execution failedExecution = runnerUtils.runOne(tenantId, TEST_NAMESPACE, "restart-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
assertThat(failedExecution.getTaskRunList()).hasSize(3);
assertThat(failedExecution.getState().getCurrent()).isEqualTo(FAILED);
// here we must have 1 failed subflows
assertTrue(countDownLatch.await(1, TimeUnit.MINUTES), "first run of flow should have FAILED");
receiveSubflows.blockLast();
Await.until(
() -> "first FAILED run of flow should have been persisted",
() -> getPersistedExecution(MAIN_TENANT, failedExecution.getId())
.map(exec -> exec.getState().getCurrent() == FAILED)
.orElse(false),
Duration.of(100, TimeUnit.MILLISECONDS.toChronoUnit()),
Duration.of(10, TimeUnit.SECONDS.toChronoUnit())
);
CountDownLatch successLatch = new CountDownLatch(6);
receiveSubflows = TestsUtils.receive(executionQueue, either -> {
Execution subflowExecution = either.getLeft();
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isSuccess()) {
successLatch.countDown();
}
});
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(6, tenantId, TEST_NAMESPACE, "restart-child");
assertThat(triggeredExecs).extracting(e -> e.getState().getCurrent()).containsOnly(FAILED);
Execution restarted = executionService.restart(failedExecution, null);
final Execution successExecution = runnerUtils.awaitExecution(
final Execution successExecution = runnerUtils.emitAndAwaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-for-each-item"),
throwRunnable(() -> executionQueue.emit(restarted)),
Duration.ofSeconds(20)
restarted
);
assertThat(successExecution.getTaskRunList()).hasSize(4);
assertTrue(successLatch.await(1, TimeUnit.MINUTES), "second run of flow should have SUCCESS");
receiveSubflows.blockLast();
triggeredExecs = runnerUtils.awaitFlowExecutionNumber(6, tenantId, TEST_NAMESPACE, "restart-child");
assertThat(triggeredExecs).extracting(e -> e.getState().getCurrent()).containsOnly(SUCCESS);
}
private Optional<Execution> getPersistedExecution(String tenant, String executionId) {
try {
return Optional.of(executionService.getExecution(tenant, executionId, false));
} catch (NoSuchElementException e) {
return Optional.empty();
}
}
public void forEachItemInIf() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void forEachItemInIf(String tenantId) throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(tenantId);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-in-if", null,
Execution execution = runnerUtils.runOne(tenantId, TEST_NAMESPACE, "for-each-item-in-if", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertTrue(countDownLatch.await(20, TimeUnit.SECONDS), "Remaining countdown: %s".formatted(countDownLatch.getCount()));
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, tenantId, TEST_NAMESPACE, "for-each-item-subflow");
Execution triggered = triggeredExecs.getLast();
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(5);
@@ -363,36 +265,25 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-in-if/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(1);
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(triggered.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-subflow");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-in-if/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(1);
Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent()).isTrue();
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
}
public void forEachItemWithAfterExecution() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow-after-execution") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
public void forEachItemWithAfterExecution() throws TimeoutException, URISyntaxException, IOException, QueueException {
URI file = storageUpload(MAIN_TENANT);
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-after-execution", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
receive.blockLast();
List<Execution> triggeredExecs = runnerUtils.awaitFlowExecutionNumber(26, MAIN_TENANT, TEST_NAMESPACE, "for-each-item-subflow-after-execution");
Execution triggered = triggeredExecs.getLast();
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(5);
@@ -408,33 +299,33 @@ public class ForEachItemCaseTest {
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(2);
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(triggered.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
assertThat((String) triggered.getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.getTaskRunList()).hasSize(2);
Optional<Label> correlationId = triggered.getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent()).isTrue();
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
}
private URI storageUpload() throws URISyntaxException, IOException {
private URI storageUpload(String tenantId) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
tenantId,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private URI emptyItems() throws URISyntaxException, IOException {
private URI emptyItems(String tenantId) throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
return storageInterface.put(
MAIN_TENANT,
tenantId,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)

View File

@@ -2,12 +2,16 @@ package io.kestra.plugin.core.flow;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.List;
@KestraTest(startRunner = true)
class ForEachTest {
@@ -60,4 +64,13 @@ class ForEachTest {
void nested(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@Test
@ExecuteFlow("flows/valids/foreach-iteration.yaml")
void iteration(Execution execution) throws InternalException {
List<TaskRun> seconds = execution.findTaskRunsByTaskId("second");
assertThat(seconds).hasSize(2);
assertThat(seconds.get(0).getIteration()).isEqualTo(0);
assertThat(seconds.get(1).getIteration()).isEqualTo(1);
}
}

View File

@@ -6,7 +6,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -23,7 +23,7 @@ class IfTest {
private static final String TENANT_ID = "true";
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows(value = {"flows/valids/if-condition.yaml"}, tenantId = TENANT_ID)

View File

@@ -3,10 +3,9 @@ package io.kestra.plugin.core.flow;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@@ -16,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class LoopUntilCaseTest {
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
public void waitfor() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "waitfor");

View File

@@ -10,7 +10,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeoutException;
@KestraTest(startRunner = true)
class ParallelTest {
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;

View File

@@ -8,10 +8,8 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.http.MediaType;
@@ -23,7 +21,6 @@ import io.micronaut.http.server.netty.multipart.NettyCompletedFileUpload;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.multipart.*;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Disabled;
@@ -40,7 +37,6 @@ import java.util.Map;
import java.util.concurrent.TimeoutException;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -48,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class PauseTest {
@Inject
RunnerUtils runnerUtils;
TestRunnerUtils runnerUtils;
@Inject
Suite suite;
@@ -156,11 +152,7 @@ public class PauseTest {
@Inject
StorageInterface storageInterface;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
public void run(RunnerUtils runnerUtils) throws Exception {
public void run(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-test", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
@@ -176,16 +168,15 @@ public class PauseTest {
State.Type.RUNNING
);
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> executionQueue.emit(restarted)),
Duration.ofSeconds(5)
restarted
);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void runDelay(RunnerUtils runnerUtils) throws Exception {
public void runDelay(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-delay", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
@@ -193,9 +184,9 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(1);
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
e ->
e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
execution
);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
@@ -203,7 +194,7 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(3);
}
public void runDurationFromInput(RunnerUtils runnerUtils) throws Exception {
public void runDurationFromInput(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-duration-from-input", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
@@ -211,9 +202,9 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(1);
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
e ->
e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
execution
);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).isEqualTo(1L);
@@ -221,14 +212,14 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(3);
}
public void runParallelDelay(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
public void runParallelDelay(TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "each-parallel-pause", Duration.ofSeconds(30));
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(7);
}
public void runTimeout(RunnerUtils runnerUtils) throws Exception {
public void runTimeout(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-timeout", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
@@ -237,8 +228,7 @@ public class PauseTest {
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.FAILED,
() -> {},
Duration.ofSeconds(5)
execution
);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).as("Task runs were: " + execution.getTaskRunList().toString()).isEqualTo(1L);
@@ -247,7 +237,7 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(1);
}
public void runTimeoutAllowFailure(RunnerUtils runnerUtils) throws Exception {
public void runTimeoutAllowFailure(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-timeout-allow-failure", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
@@ -256,8 +246,7 @@ public class PauseTest {
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.WARNING,
() -> {},
Duration.ofSeconds(5)
execution
);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).as("Task runs were: " + execution.getTaskRunList().toString()).isEqualTo(1L);
@@ -266,7 +255,7 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(3);
}
public void runEmptyTasks(RunnerUtils runnerUtils) throws Exception {
public void runEmptyTasks(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause_no_tasks", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
@@ -282,17 +271,16 @@ public class PauseTest {
State.Type.RUNNING
);
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> executionQueue.emit(restarted)),
Duration.ofSeconds(10)
restarted
);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@SuppressWarnings("unchecked")
public void runOnResume(RunnerUtils runnerUtils) throws Exception {
public void runOnResume(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
@@ -315,10 +303,9 @@ public class PauseTest {
null
).block();
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> executionQueue.emit(restarted)),
Duration.ofSeconds(10)
restarted
);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
@@ -329,7 +316,7 @@ public class PauseTest {
assertThat(CharStreams.toString(new InputStreamReader(storageInterface.get(MAIN_TENANT, null, URI.create((String) outputs.get("data")))))).isEqualTo(executionId);
}
public void runOnResumeMissingInputs(String tenantId, RunnerUtils runnerUtils) throws Exception {
public void runOnResumeMissingInputs(String tenantId, TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(tenantId, "io.kestra.tests", "pause_on_resume", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository.findByExecution(execution);
@@ -344,7 +331,7 @@ public class PauseTest {
}
@SuppressWarnings("unchecked")
public void runOnResumeOptionalInputs(RunnerUtils runnerUtils) throws Exception {
public void runOnResumeOptionalInputs(TestRunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause_on_resume_optional", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
Flow flow = flowRepository.findByExecution(execution);
@@ -353,10 +340,9 @@ public class PauseTest {
Execution restarted = executionService.resume(execution, flow, State.Type.RUNNING, Pause.Resumed.now());
execution = runnerUtils.awaitExecution(
execution = runnerUtils.emitAndAwaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> executionQueue.emit(restarted)),
Duration.ofSeconds(10)
restarted
);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
@@ -365,7 +351,7 @@ public class PauseTest {
assertThat(outputs.get("asked")).isEqualTo("MISSING");
}
public void runDurationWithBehavior(String tenantId, RunnerUtils runnerUtils, Pause.Behavior behavior) throws Exception {
public void runDurationWithBehavior(String tenantId, TestRunnerUtils runnerUtils, Pause.Behavior behavior) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(tenantId, "io.kestra.tests", "pause-behavior", null, (unused, _unused) -> Map.of("behavior", behavior), Duration.ofSeconds(30));
String executionId = execution.getId();
@@ -374,8 +360,7 @@ public class PauseTest {
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent().isTerminated(),
() -> {},
Duration.ofSeconds(5)
execution
);
State.Type finalState = behavior == Pause.Behavior.RESUME ? State.Type.SUCCESS : behavior.mapToState();

View File

@@ -1,27 +1,22 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@@ -29,11 +24,11 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RetryCaseTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;
protected TestRunnerUtils runnerUtils;
@Inject
protected RunnerUtils runnerUtils;
private ExecutionRepositoryInterface executionRepository;
@Inject
private FlowRepositoryInterface flowRepository;
public void retrySuccess(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.WARNING);
@@ -72,117 +67,53 @@ public class RetryCaseTest {
}
public void retryNewExecutionTaskDuration() throws TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-task-duration") && execution.getState().getCurrent().isTerminated()) {
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
countDownLatch.countDown();
}
});
runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"retry-new-execution-task-duration",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
receive.blockLast();
assertThat(stateHistory.get()).containsExactlyInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED);
public void retryNewExecutionTaskDuration(String tenant) throws TimeoutException, QueueException {
var flow = flowRepository
.findById(tenant, "io.kestra.tests", "retry-new-execution-task-duration")
.orElseThrow();
runAndAssertThereWasTwoRetriesAndFinishedFailed(flow);
}
public void retryNewExecutionTaskAttempts() throws TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-task-attempts") && execution.getState().getCurrent().isTerminated()) {
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
countDownLatch.countDown();
}
});
runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"retry-new-execution-task-attempts",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
receive.blockLast();
assertThat(stateHistory.get()).containsExactlyInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED);
public void retryNewExecutionTaskAttempts(String tenant) throws TimeoutException, QueueException {
var flow = flowRepository
.findById(tenant, "io.kestra.tests", "retry-new-execution-task-attempts")
.orElseThrow();
runAndAssertThereWasTwoRetriesAndFinishedFailed(flow);
}
public void retryNewExecutionFlowDuration() throws TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-flow-duration") && execution.getState().getCurrent().isTerminated()) {
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
countDownLatch.countDown();
}
});
runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"retry-new-execution-flow-duration",
null,
null
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
receive.blockLast();
assertThat(stateHistory.get()).containsExactlyInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED);
public void retryNewExecutionFlowDuration(String tenant) throws TimeoutException, QueueException {
var flow = flowRepository
.findById(tenant, "io.kestra.tests", "retry-new-execution-flow-duration")
.orElseThrow();
runAndAssertThereWasTwoRetriesAndFinishedFailed(flow);
}
public void retryNewExecutionFlowAttempts() throws TimeoutException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(3);
AtomicReference<List<State.Type>> stateHistory = new AtomicReference<>(new ArrayList<>());
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("retry-new-execution-flow-attempts") && execution.getState().getCurrent().isTerminated()) {
List<State.Type> stateHistoryList = stateHistory.get();
stateHistoryList.add(execution.getState().getCurrent());
stateHistory.set(stateHistoryList);
countDownLatch.countDown();
}
});
public void retryNewExecutionFlowAttempts(String tenant) throws TimeoutException, QueueException {
var flow = flowRepository
.findById(tenant, "io.kestra.tests", "retry-new-execution-flow-attempts")
.orElseThrow();
runAndAssertThereWasTwoRetriesAndFinishedFailed(flow);
}
private void runAndAssertThereWasTwoRetriesAndFinishedFailed(Flow flow) throws TimeoutException, QueueException {
runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"retry-new-execution-flow-attempts",
null,
null
Execution.newExecution(flow, null),
flow,
Duration.ofSeconds(10)
);
Await.until(() -> countDownLatch.getCount() == 0, Duration.ofSeconds(2), Duration.ofMinutes(1));
receive.blockLast();
assertThat(stateHistory.get()).containsExactlyInAnyOrder(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED);
Await.until(
() -> "flow should have ended in Failed state",
() -> executionRepository.findLatestForStates(flow.getTenantId(), flow.getNamespace(), flow.getId(), List.of(State.Type.FAILED)).isPresent(),
Duration.ofMillis(100),
Duration.ofSeconds(10)
);
var executions = executionRepository.findByFlowId(flow.getTenantId(), flow.getNamespace(), flow.getId(), Pageable.UNPAGED);
assertThat(executions.stream().map(e -> e.getState().getCurrent())).contains(State.Type.RETRIED, State.Type.RETRIED, State.Type.FAILED);
}
public void retryFailedTaskDuration(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getTaskRunList().getFirst().attemptNumber()).isEqualTo(3);
assertThat(execution.getTaskRunList().getFirst().attemptNumber()).isGreaterThanOrEqualTo(2);
}
public void retryFailedTaskAttempts(Execution execution) {

View File

@@ -10,7 +10,7 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Map;
@@ -21,7 +21,7 @@ import org.junit.jupiter.api.Test;
class RuntimeLabelsTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/labels-update-task.yml"})
@@ -160,4 +160,25 @@ class RuntimeLabelsTest {
new Label("fromListKey", "value2")
);
}
@Test
@LoadFlows({"flows/valids/labels-update-task-empty.yml"})
void updateIgnoresEmpty() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"labels-update-task-empty",
null,
(flow, createdExecution) -> Map.of(),
null,
List.of()
);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getLabels()).containsExactly(
new Label(Label.CORRELATION_ID, execution.getId())
);
}
}

View File

@@ -10,7 +10,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import java.util.List;
@@ -24,7 +24,7 @@ class StateTest {
public static final String FLOW_ID = "state";
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@SuppressWarnings("unchecked")
@Test

View File

@@ -9,7 +9,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
@@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class SubflowRunnerTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Inject
private ExecutionRepositoryInterface executionRepository;

View File

@@ -9,7 +9,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
@@ -18,7 +18,7 @@ import org.junit.jupiter.api.Test;
class SwitchTest {
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows(value = {"flows/valids/switch.yaml"}, tenantId = "switch")

View File

@@ -11,7 +11,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.plugin.core.log.Log;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.annotation.Property;
@@ -45,7 +45,7 @@ public class TemplateTest {
private FlowInputOutput flowIO;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
public static final io.kestra.core.models.templates.Template TEMPLATE_1 = io.kestra.core.models.templates.Template.builder()
.id("template")
@@ -54,7 +54,7 @@ public class TemplateTest {
.tasks(Collections.singletonList(Log.builder().id("test").type(Log.class.getName()).message("{{ parent.outputs.args['my-forward'] }}").build())).build();
public static void withTemplate(
RunnerUtils runnerUtils,
TestRunnerUtils runnerUtils,
TemplateRepositoryInterface templateRepository,
QueueInterface<LogEntry> logQueue,
FlowInputOutput flowIO
@@ -90,7 +90,7 @@ public class TemplateTest {
}
public static void withFailedTemplate(RunnerUtils runnerUtils, QueueInterface<LogEntry> logQueue) throws TimeoutException, QueueException {
public static void withFailedTemplate(TestRunnerUtils runnerUtils, QueueInterface<LogEntry> logQueue) throws TimeoutException, QueueException {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(logQueue, either -> logs.add(either.getLeft()));

View File

@@ -11,7 +11,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
@@ -37,7 +37,7 @@ class TimeoutTest {
private QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@RetryingTest(5) // Flaky on CI but never locally even with 100 repetitions
void timeout() throws TimeoutException, QueueException {

View File

@@ -12,7 +12,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -31,7 +31,7 @@ class VariablesTest {
QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@ExecuteFlow("flows/valids/variables.yaml")

View File

@@ -16,7 +16,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
@KestraTest(startRunner = true)
public class WorkingDirectoryTest {
@@ -43,7 +42,7 @@ public class WorkingDirectoryTest {
RunContextFactory runContextFactory;
@Inject
RunnerUtils runnerUtils;
TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/working-directory.yaml"})
@@ -122,7 +121,7 @@ public class WorkingDirectoryTest {
@Inject
StorageInterface storageInterface;
public void success(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
public void success(TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory", null,
(f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60)
);
@@ -132,7 +131,7 @@ public class WorkingDirectoryTest {
assertThat((String) execution.getTaskRunList().get(3).getOutputs().get("value")).startsWith("kestra://");
}
public void failed(String tenantId, RunnerUtils runnerUtils) throws TimeoutException, QueueException {
public void failed(String tenantId, TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(tenantId, "io.kestra.tests", "working-directory", null,
(f, e) -> ImmutableMap.of("failed", "true"), Duration.ofSeconds(60)
);
@@ -142,7 +141,7 @@ public class WorkingDirectoryTest {
assertThat(execution.findTaskRunsByTaskId("error-t1")).hasSize(1);
}
public void each(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
public void each(TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-each", Duration.ofSeconds(60));
assertThat(execution.getTaskRunList()).hasSize(8);
@@ -151,7 +150,7 @@ public class WorkingDirectoryTest {
}
@SuppressWarnings("unchecked")
public void outputFiles(String tenantId, RunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
public void outputFiles(String tenantId, TestRunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
Execution execution = runnerUtils.runOne(tenantId, "io.kestra.tests", "working-directory-outputs");
@@ -178,7 +177,7 @@ public class WorkingDirectoryTest {
}
@SuppressWarnings("unchecked")
public void inputFiles(RunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
public void inputFiles(TestRunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-inputs");
@@ -204,7 +203,7 @@ public class WorkingDirectoryTest {
}
@SuppressWarnings({"unchecked", "OptionalGetWithoutIsPresent"})
public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
public void cache(TestRunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
// make sure the cache didn't exist
StorageContext storageContext = StorageContext.forFlow(Flow
.builder()
@@ -247,7 +246,7 @@ public class WorkingDirectoryTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
public void taskRun(RunnerUtils runnerUtils) throws TimeoutException, InternalException, QueueException {
public void taskRun(TestRunnerUtils runnerUtils) throws TimeoutException, InternalException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-taskrun");
assertThat(execution.getTaskRunList()).hasSize(3);
@@ -255,7 +254,7 @@ public class WorkingDirectoryTest {
assertThat(((String) execution.findTaskRunByTaskIdAndValue("log-taskrun", List.of("1")).getOutputs().get("value"))).contains("1");
}
public void taskRunNested(RunnerUtils runnerUtils) throws TimeoutException, InternalException, QueueException {
public void taskRunNested(TestRunnerUtils runnerUtils) throws TimeoutException, InternalException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-taskrun-nested");
assertThat(execution.getTaskRunList()).hasSize(6);
@@ -263,7 +262,7 @@ public class WorkingDirectoryTest {
assertThat(((String) execution.findTaskRunByTaskIdAndValue("log-workerparent", List.of("1")).getOutputs().get("value"))).contains("{\"taskrun\":{\"value\":\"1\"}}");
}
public void namespaceFiles(RunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
public void namespaceFiles(TestRunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
put("/test/a/b/c/1.txt", "first");
put("/a/b/c/2.txt", "second");
put("/a/b/3.txt", "third");
@@ -279,7 +278,7 @@ public class WorkingDirectoryTest {
assertThat(execution.findTaskRunsByTaskId("t3").getFirst().getOutputs().get("value")).isEqualTo("third");
}
public void namespaceFilesWithNamespaces(RunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
public void namespaceFilesWithNamespaces(TestRunnerUtils runnerUtils) throws TimeoutException, IOException, QueueException {
//fist namespace
put("/test/a/b/c/1.txt", "first in first namespace", "io.test.first");
put("/a/b/c/2.txt", "second in first namespace", "io.test.first");
@@ -304,7 +303,7 @@ public class WorkingDirectoryTest {
}
@SuppressWarnings("unchecked")
public void encryption(RunnerUtils runnerUtils, RunContextFactory runContextFactory) throws TimeoutException, GeneralSecurityException, QueueException {
public void encryption(TestRunnerUtils runnerUtils, RunContextFactory runContextFactory) throws TimeoutException, GeneralSecurityException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-taskrun-encrypted");
assertThat(execution.getTaskRunList()).hasSize(3);
@@ -316,7 +315,7 @@ public class WorkingDirectoryTest {
assertThat(execution.findTaskRunsByTaskId("decrypted").getFirst().getOutputs().get("value")).isEqualTo("Hello World");
}
public void invalidRunIf(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
public void invalidRunIf(TestRunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-invalid-runif", null,
(f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60)
);

View File

@@ -11,6 +11,7 @@ import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVStoreException;
import io.kestra.core.storages.kv.KVValue;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
@@ -169,6 +170,7 @@ class SetTest {
@Test
void shouldFailGivenExistingKeyAndOverwriteFalse() throws Exception {
// Given
String key = IdUtils.create();
Set set = Set.builder()
.id(Set.class.getSimpleName())
.type(Set.class.getName())
@@ -179,16 +181,22 @@ class SetTest {
var value = Map.of("date", Instant.now().truncatedTo(ChronoUnit.MILLIS), "int", 1, "string", "string");
final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, Map.of(
"key", "existing_key",
"key", key,
"value", value
));
// When - Then
//set key a first:
runContext.namespaceKv(runContext.flowInfo().namespace()).put("existing_key", new KVValueAndMetadata(new KVMetadata("unused", (Instant)null), value));
runContext.namespaceKv(runContext.flowInfo().namespace()).put(key, new KVValueAndMetadata(new KVMetadata("unused", (Instant)null), value));
//fail because key is already set
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> set.run(runContext));
assertThat(exception.getMessage()).isEqualTo("Cannot set value for key 'existing_key'. Key already exists and `overwrite` is set to `false`.");
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> Set.builder()
.id(Set.class.getSimpleName())
.type(Set.class.getName())
.key(new Property<>("{{ inputs.key }}"))
.value(new Property<>("{{ inputs.value }}"))
.overwrite(Property.ofValue(false))
.build().run(runContext));
assertThat(exception.getMessage()).isEqualTo("Cannot set value for key '%s'. Key already exists and `overwrite` is set to `false`.".formatted(key));
}
@Test

View File

@@ -5,7 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import jakarta.inject.Inject;
import java.time.temporal.ChronoUnit;
import java.util.stream.Stream;
@@ -29,7 +29,7 @@ class PurgeLogsTest {
private LogRepositoryInterface logRepository;
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Test
@LoadFlows("flows/valids/purge_logs_no_arguments.yaml")

View File

@@ -3,7 +3,6 @@ package io.kestra.plugin.core.storage;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.junit.annotations.KestraTest;
@@ -47,7 +46,7 @@ class LocalFilesTest {
@Test
void run() throws Exception {
String tenant = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var runContext = runContextFactory.of("namesapce", tenant, Map.of("toto", "tata"));
var storageFile = internalFiles(tenant);
@@ -76,7 +75,7 @@ class LocalFilesTest {
@Test
void recursive() throws Exception {
String tenant = TestsUtils.randomTenant(AbstractLoadedFlowRepositoryTest.class.getSimpleName());
String tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
var runContext = runContextFactory.of("namesapce", tenant, Map.of("toto", "tata"));
var storageFile = internalFiles(tenant);

View File

@@ -168,12 +168,12 @@ class FlowTest {
Optional<Execution> evaluate = flowTrigger.evaluate(multipleConditionStorage, runContextFactory.of(), flow, execution);
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(6);
assertThat(evaluate.get().getLabels()).hasSize(5);
assertThat(evaluate.get().getLabels()).contains(new Label("flow-label-1", "flow-label-1"));
assertThat(evaluate.get().getLabels()).contains(new Label("flow-label-2", "flow-label-2"));
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-1", "trigger-label-1"));
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-2", "trigger-label-2"));
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-3", ""));
assertThat(evaluate.get().getLabels()).doesNotContain(new Label("trigger-label-3", ""));
assertThat(evaluate.get().getLabels()).contains(new Label(Label.CORRELATION_ID, "correlationId"));
assertThat(evaluate.get().getTrigger()).extracting(ExecutionTrigger::getVariables).hasFieldOrProperty("executionLabels");
assertThat(evaluate.get().getTrigger().getVariables().get("executionLabels")).isEqualTo(Map.of("execution-label", "execution"));

View File

@@ -169,7 +169,7 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-1", "trigger-label-1"));
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-2", "trigger-label-2"));
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-3", ""));
assertThat(evaluate.get().getLabels()).doesNotContain(new Label("trigger-label-3", ""));
}
@Test

View File

@@ -11,7 +11,7 @@ import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.scheduler.AbstractScheduler;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.TestsUtils;
@@ -38,7 +38,7 @@ class ToggleTest {
private AbstractScheduler scheduler;
@Inject
private RunnerUtils runnerUtils;
private TestRunnerUtils runnerUtils;
@Test
@LoadFlows({"flows/valids/trigger-toggle.yaml"})

View File

@@ -0,0 +1,16 @@
id: foreach-iteration
namespace: io.kestra.tests
tasks:
- id: foreach
type: io.kestra.plugin.core.flow.ForEach
values: [1, 2]
tasks:
- id: first
type: io.kestra.plugin.core.output.OutputValues
values:
iteration : "{{ taskrun.iteration }}"
- id: second
type: io.kestra.plugin.core.output.OutputValues
values:
iteration : "{{ taskrun.iteration }}"

View File

@@ -0,0 +1,14 @@
id: labels-update-task-empty
namespace: io.kestra.tests
tasks:
- id: from-string
type: io.kestra.plugin.core.execution.Labels
labels: "{ \"fromStringKey\": \"\", \"\": \"value2\" }"
- id: from-list
type: io.kestra.plugin.core.execution.Labels
labels:
- key: "fromListKey"
value: ""
- key: ""
value: "value2"

View File

@@ -7,6 +7,6 @@ tasks:
retry:
behavior: RETRY_FAILED_TASK
type: constant
maxDuration: PT6S
maxDuration: PT7.5S
interval: PT2S

View File

@@ -0,0 +1,35 @@
id: task-runif-executionupdating
namespace: io.kestra.tests
variables:
list: []
tasks:
- id: output
type: io.kestra.plugin.core.output.OutputValues
values:
taskrun_data: 1
- id: unsetVariables
type: io.kestra.plugin.core.execution.UnsetVariables
runIf: "true"
variables:
- list
- id: setVariables
type: io.kestra.plugin.core.execution.SetVariables
runIf: "{{ outputs.output['values']['taskrun_data'] == 1 }}"
variables:
list: [42]
- id: skipSetVariables
type: io.kestra.plugin.core.execution.SetVariables
runIf: "false"
variables:
list: [1]
- id: skipUnsetVariables
type: io.kestra.plugin.core.execution.UnsetVariables
runIf: "{{ outputs.output['values']['taskrun_data'] == 2 }}"
variables:
- list

View File

@@ -7,5 +7,11 @@ concurrency:
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT0.5S
- id: log
type: io.kestra.plugin.core.log.Log
message: "we are between sleeps"
- id: sleep_2
type: io.kestra.plugin.core.flow.Sleep
duration: PT0.5S

View File

@@ -1072,6 +1072,17 @@ public class ExecutorService {
var executionUpdatingTask = (ExecutionUpdatableTask) workerTask.getTask();
try {
// handle runIf
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
executor.withExecution(
executor
.getExecution()
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
"handleExecutionUpdatingTaskSkipped"
);
return false;
}
executor.withExecution(
executionUpdatingTask.update(executor.getExecution(), workerTask.getRunContext())
.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)),

View File

@@ -1,33 +0,0 @@
package io.kestra.repository.h2;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public class H2LoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
@Test
@Override
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
// FIXME since the big task renaming, H2 return 6 instead of 2
// as no core change this is a test artefact, or a latent bug in H2.
assertThat((long) search.size()).isEqualTo(6L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -7,18 +7,13 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class H2FlowListenersTest extends FlowListenersTest {
@Inject
JdbcTestUtils jdbcTestUtils;
@Inject
FlowRepositoryInterface flowRepository;
@@ -34,10 +29,4 @@ class H2FlowListenersTest extends FlowListenersTest {
// we don't inject FlowListeners to remove a flaky test
this.suite(new FlowListeners(flowRepository, flowQueue, pluginDefaultService));
}
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,30 +0,0 @@
package io.kestra.repository.mysql;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public class MysqlLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -2,27 +2,16 @@ package io.kestra.runner.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.FlowListenersTest;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class MysqlFlowListenersTest extends FlowListenersTest {
@Inject
FlowListeners flowListenersService;
@Inject
JdbcTestUtils jdbcTestUtils;
@Test
public void all() throws TimeoutException {
this.suite(flowListenersService);
}
@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
}

View File

@@ -1,7 +1,10 @@
package io.kestra.runner.mysql;
import io.kestra.jdbc.runner.JdbcRunnerRetryTest;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
@TestInstance(Lifecycle.PER_CLASS)
public class MysqlRunnerRetryTest extends JdbcRunnerRetryTest {
}

View File

@@ -1,7 +1,33 @@
package io.kestra.runner.mysql;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.jdbc.runner.JdbcRunnerTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
public class MysqlRunnerTest extends JdbcRunnerTest {
@Disabled("We have a bug here in the queue where no FAILED event is sent, so the state store is not cleaned")
@Test
@Override
@LoadFlows({"flows/valids/restart-with-finally.yaml"})
protected void restartFailedWithFinally() throws Exception {
restartCaseTest.restartFailedWithFinally();
}
@Disabled("Should fail the second time, but is success")
@Test
@Override
@LoadFlows({"flows/valids/restart_local_errors.yaml"})
protected void restartFailedThenFailureWithLocalErrors() throws Exception {
restartCaseTest.restartFailedThenFailureWithLocalErrors();
}
@Disabled("Is success, but is not terminated")
@Test
@Override
@LoadFlows({"flows/valids/restart-with-after-execution.yaml"})
protected void restartFailedWithAfterExecution() throws Exception {
restartCaseTest.restartFailedWithAfterExecution();
}
}

View File

@@ -5,6 +5,11 @@ datasources:
username: kestra
password: k3str4
dialect: MYSQL
maximumPoolSize: 32
minimumIdle: 8
connectionTimeout: 30000
idleTimeout: 600000
maxLifetime: 1800000
flyway:
datasources:

View File

@@ -1,7 +0,0 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.repository.AbstractJdbcLoadedFlowRepositoryTest;
public class PstogresLoadedFlowRepositoryTest extends AbstractJdbcLoadedFlowRepositoryTest {
}

View File

@@ -682,9 +682,8 @@ public class JdbcExecutor implements ExecutorInterface {
);
} catch (QueueException e) {
try {
this.executionQueue.emit(
message.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
);
Execution failedExecution = fail(message, e);
this.executionQueue.emit(failedExecution);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", message.getId(), ex);
}
@@ -701,6 +700,16 @@ public class JdbcExecutor implements ExecutorInterface {
}
}
private Execution fail(Execution message, Exception e) {
var failedExecution = message.failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
}
private void workerTaskResultQueue(Either<WorkerTaskResult, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a worker task result: {}", either.getRight().getMessage(), either.getRight());
@@ -1178,8 +1187,9 @@ public class JdbcExecutor implements ExecutorInterface {
// If we cannot add the new worker task result to the execution, we fail it
executionRepository.lock(executor.getExecution().getId(), pair -> {
Execution execution = pair.getLeft();
Execution failedExecution = fail(execution, e);
try {
this.executionQueue.emit(execution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED));
this.executionQueue.emit(failedExecution);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", execution.getId(), ex);
}

View File

@@ -1,31 +0,0 @@
package io.kestra.jdbc.repository;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.models.SearchResult;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.AbstractLoadedFlowRepositoryTest;
import io.micronaut.data.model.Pageable;
import io.micronaut.data.model.Sort;
import java.util.List;
import org.junit.jupiter.api.Test;
public abstract class AbstractJdbcLoadedFlowRepositoryTest extends AbstractLoadedFlowRepositoryTest {
@Test
public void findSourceCode() {
List<SearchResult<Flow>> search = flowRepository.findSourceCode(Pageable.from(1, 10, Sort.UNSORTED), "io.kestra.plugin.core.condition.MultipleCondition", TENANT, null);
assertThat((long) search.size()).isEqualTo(2L);
SearchResult<Flow> flow = search
.stream()
.filter(flowSearchResult -> flowSearchResult.getModel()
.getId()
.equals("trigger-multiplecondition-listener"))
.findFirst()
.orElseThrow();
assertThat(flow.getFragments().getFirst()).contains("condition.MultipleCondition[/mark]");
}
}

View File

@@ -7,12 +7,12 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueException;
import io.kestra.plugin.core.flow.RetryCaseTest;
import jakarta.inject.Inject;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import java.util.concurrent.TimeoutException;
@KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
public abstract class JdbcRunnerRetryTest {
@Inject
@@ -55,27 +55,27 @@ public abstract class JdbcRunnerRetryTest {
}
@Test
@LoadFlows({"flows/valids/retry-new-execution-task-duration.yml"})
@LoadFlows(value = {"flows/valids/retry-new-execution-task-duration.yml"}, tenantId = "retrynewexecutiontaskdurationtenant")
void retryNewExecutionTaskDuration() throws TimeoutException, QueueException {
retryCaseTest.retryNewExecutionTaskDuration();
retryCaseTest.retryNewExecutionTaskDuration("retrynewexecutiontaskdurationtenant");
}
@Test
@LoadFlows({"flows/valids/retry-new-execution-task-attempts.yml"})
@LoadFlows(value = {"flows/valids/retry-new-execution-task-attempts.yml"}, tenantId = "retrynewexecutiontaskattempts")
void retryNewExecutionTaskAttempts() throws TimeoutException, QueueException {
retryCaseTest.retryNewExecutionTaskAttempts();
retryCaseTest.retryNewExecutionTaskAttempts("retrynewexecutiontaskattempts");
}
@Test
@LoadFlows({"flows/valids/retry-new-execution-flow-duration.yml"})
@LoadFlows(value = {"flows/valids/retry-new-execution-flow-duration.yml"}, tenantId = "retrynewexecutionflowdurationtenant")
void retryNewExecutionFlowDuration() throws TimeoutException, QueueException {
retryCaseTest.retryNewExecutionFlowDuration();
retryCaseTest.retryNewExecutionFlowDuration("retrynewexecutionflowdurationtenant");
}
@Test
@LoadFlows({"flows/valids/retry-new-execution-flow-attempts.yml"})
@LoadFlows(value = {"flows/valids/retry-new-execution-flow-attempts.yml"}, tenantId = "retrynewexecutionflowattemptstenant")
void retryNewExecutionFlowAttempts() throws TimeoutException, QueueException {
retryCaseTest.retryNewExecutionFlowAttempts();
retryCaseTest.retryNewExecutionFlowAttempts("retrynewexecutionflowattemptstenant");
}
@Test

View File

@@ -9,6 +9,9 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.AbstractRunnerTest;
import io.kestra.core.runners.InputsTest;
import io.kestra.core.utils.TestsUtils;
import io.kestra.jdbc.JdbcTestUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.event.Level;
@@ -29,6 +32,15 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
public static final String NAMESPACE = "io.kestra.tests";
@Inject
private JdbcTestUtils jdbcTestUtils;
@BeforeAll
public void init(){
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}
@Test
@LoadFlows({"flows/valids/waitfor-child-task-warning.yaml"})
void waitForChildTaskWarning() throws Exception {
@@ -55,14 +67,10 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
assertThat(execution.getTaskRunList().size()).isGreaterThanOrEqualTo(6); // the exact number is test-run-dependent.
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
// To avoid flooding the database with big messages, we re-init it
// jdbcTestUtils.drop(); TODO
// jdbcTestUtils.migrate();
}
@Test
@LoadFlows({"flows/valids/inputs-large.yaml"})
@LoadFlows(value = {"flows/valids/inputs-large.yaml"}, tenantId = TENANT_1)
void queueMessageTooLarge() {
char[] chars = new char[1100000];
Arrays.fill(chars, 'a');
@@ -71,7 +79,7 @@ public abstract class JdbcRunnerTest extends AbstractRunnerTest {
inputs.put("string", new String(chars));
var exception = assertThrows(QueueException.class, () -> runnerUtils.runOne(
MAIN_TENANT,
TENANT_1,
NAMESPACE,
"inputs-large",
null,

View File

@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.TemplateRepositoryInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.plugin.core.flow.TemplateTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -20,7 +20,7 @@ import org.junitpioneer.jupiter.RetryingTest;
public abstract class JdbcTemplateRunnerTest {
@Inject
protected RunnerUtils runnerUtils;
protected TestRunnerUtils runnerUtils;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)

View File

@@ -13,7 +13,7 @@ dependencies {
// versions for libraries with multiple module but no BOM
def slf4jVersion = "2.0.17"
def protobufVersion = "3.25.5" // Orc still uses 3.25.5 see https://github.com/apache/orc/blob/main/java/pom.xml
def bouncycastleVersion = "1.81"
def bouncycastleVersion = "1.82"
def mavenResolverVersion = "2.0.10"
def jollydayVersion = "1.5.6"
def jsonschemaVersion = "4.38.0"
@@ -35,7 +35,7 @@ dependencies {
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.68.0')
api platform("com.azure:azure-sdk-bom:1.2.38")
api platform('software.amazon.awssdk:bom:2.33.11')
api platform('software.amazon.awssdk:bom:2.34.2')
api platform("dev.langchain4j:langchain4j-bom:$langchain4jVersion")
api platform("dev.langchain4j:langchain4j-community-bom:$langchain4jCommunityVersion")
@@ -65,8 +65,8 @@ dependencies {
// http5 client
api("org.apache.httpcomponents.client5:httpclient5:5.5")
api("org.apache.httpcomponents.core5:httpcore5:5.3.5")
api("org.apache.httpcomponents.core5:httpcore5-h2:5.3.5")
api("org.apache.httpcomponents.core5:httpcore5:5.3.6")
api("org.apache.httpcomponents.core5:httpcore5-h2:5.3.6")
api("com.fasterxml.uuid:java-uuid-generator:$jugVersion")
// issue with the Docker lib having a too old version for the k8s extension
@@ -75,17 +75,17 @@ dependencies {
api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "org.apache.kafka:kafka-streams:$kafkaVersion"
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
api 'software.amazon.awssdk.crt:aws-crt:0.38.13'
api 'software.amazon.awssdk.crt:aws-crt:0.39.0'
// we need at least 0.14, it could be removed when Micronaut contains a recent only version in their BOM
api "io.micrometer:micrometer-core:1.15.4"
// We need at least 6.17, it could be removed when Micronaut contains a recent only version in their BOM
api "io.micronaut.openapi:micronaut-openapi-bom:6.18.0"
api "io.micronaut.openapi:micronaut-openapi-bom:6.18.1"
// Other libs
api("org.projectlombok:lombok:1.18.40")
api("org.projectlombok:lombok:1.18.42")
api("org.codehaus.janino:janino:3.1.12")
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.1'
api group: 'org.apache.logging.log4j', name: 'log4j-to-slf4j', version: '2.25.2'
api group: 'org.slf4j', name: 'jul-to-slf4j', version: slf4jVersion
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
@@ -101,7 +101,7 @@ dependencies {
api group: 'org.apache.maven.resolver', name: 'maven-resolver-connector-basic', version: mavenResolverVersion
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-file', version: mavenResolverVersion
api group: 'org.apache.maven.resolver', name: 'maven-resolver-transport-apache', version: mavenResolverVersion
api 'com.github.oshi:oshi-core:6.8.3'
api 'com.github.oshi:oshi-core:6.9.0'
api 'io.pebbletemplates:pebble:3.2.4'
api group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '1.7.0'
api group: 'de.focus-shift', name: 'jollyday-core', version: jollydayVersion
@@ -124,9 +124,9 @@ dependencies {
api 'org.jsoup:jsoup:1.21.2'
api "org.xhtmlrenderer:flying-saucer-core:$flyingSaucerVersion"
api "org.xhtmlrenderer:flying-saucer-pdf:$flyingSaucerVersion"
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.4'
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.5'
api group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '3.0.0'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.4'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.5'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
api group: 'de.siegmar', name: 'fastcsv', version: '4.0.0'
// Json Diff

View File

@@ -14,6 +14,7 @@ import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.ExecutionKilledTrigger;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
@@ -37,6 +38,7 @@ import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.Nullable;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -70,6 +72,7 @@ public abstract class AbstractScheduler implements Scheduler {
private final QueueInterface<WorkerJob> workerJobQueue;
private final QueueInterface<WorkerTriggerResult> workerTriggerResultQueue;
private final QueueInterface<ExecutionKilled> executionKilledQueue;
private final QueueInterface<LogEntry> logQueue;
@SuppressWarnings("rawtypes")
private final Optional<QueueInterface> clusterEventQueue;
protected final FlowListenersInterface flowListeners;
@@ -124,6 +127,7 @@ public abstract class AbstractScheduler implements Scheduler {
this.executionKilledQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.KILL_NAMED));
this.workerTriggerResultQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTRIGGERRESULT_NAMED));
this.clusterEventQueue = applicationContext.findBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.CLUSTER_EVENT_NAMED));
this.logQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.WORKERTASKLOG_NAMED));
this.flowListeners = flowListeners;
this.runContextFactory = applicationContext.getBean(RunContextFactory.class);
this.runContextInitializer = applicationContext.getBean(RunContextInitializer.class);
@@ -767,7 +771,7 @@ public abstract class AbstractScheduler implements Scheduler {
this.executionEventPublisher.publishEvent(new CrudEvent<>(newExecution, CrudEventType.CREATE));
} catch (QueueException e) {
try {
Execution failedExecution = newExecution.failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED);
Execution failedExecution = fail(newExecution, e);
this.executionQueue.emit(failedExecution);
this.executionEventPublisher.publishEvent(new CrudEvent<>(failedExecution, CrudEventType.CREATE));
} catch (QueueException ex) {
@@ -776,6 +780,16 @@ public abstract class AbstractScheduler implements Scheduler {
}
}
private Execution fail(Execution message, Exception e) {
var failedExecution = message.failedExecutionFromExecutor(e);
try {
logQueue.emitAsync(failedExecution.getLogs());
} catch (QueueException ex) {
// fail silently
}
return failedExecution.getExecution().getState().isFailed() ? failedExecution.getExecution() : failedExecution.getExecution().withState(State.Type.FAILED);
}
private void executionMonitor() {
try {
// Retrieve triggers with non-null execution_id from all corresponding virtual nodes

View File

@@ -12,5 +12,5 @@ dependencies {
api 'org.hamcrest:hamcrest:3.0'
api 'org.hamcrest:hamcrest-library:3.0'
api 'org.mockito:mockito-junit-jupiter'
api 'org.assertj:assertj-core:3.27.4'
api 'org.assertj:assertj-core:3.27.6'
}

View File

@@ -8,7 +8,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.TestRunnerUtils;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
@@ -59,7 +59,7 @@ public class FlowExecutorExtension implements AfterEachCallback, ParameterResolv
TestsUtils.loads(tenantId, repositoryLoader, Objects.requireNonNull(url));
Flow flow = YamlParser.parse(Paths.get(url.toURI()).toFile(), Flow.class);
RunnerUtils runnerUtils = context.getBean(RunnerUtils.class);
TestRunnerUtils runnerUtils = context.getBean(TestRunnerUtils.class);
return runnerUtils.runOne(tenantId, flow.getNamespace(), flow.getId(), Duration.parse(executeFlow.timeout()));
}

View File

@@ -5,11 +5,13 @@ import static io.kestra.core.junit.extensions.ExtensionUtils.loadFile;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
@@ -56,6 +58,7 @@ public class FlowLoaderExtension implements BeforeEachCallback, AfterEachCallbac
public void afterEach(ExtensionContext extensionContext) throws URISyntaxException {
LoadFlows loadFlows = getLoadFlows(extensionContext);
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
ExecutionRepositoryInterface executionRepository = applicationContext.getBean(ExecutionRepositoryInterface.class);
Set<String> flowIds = new HashSet<>();
for (String path : loadFlows.value()) {
@@ -66,7 +69,11 @@ public class FlowLoaderExtension implements BeforeEachCallback, AfterEachCallbac
flowRepository.findAllForAllTenants().stream()
.filter(flow -> flowIds.contains(flow.getId()))
.filter(flow -> loadFlows.tenantId().equals(flow.getTenantId()))
.forEach(flow -> flowRepository.delete(FlowWithSource.of(flow, "unused")));
.forEach(flow -> {
flowRepository.delete(FlowWithSource.of(flow, "unused"));
executionRepository.findByFlowId(loadFlows.tenantId(), flow.getNamespace(), flow.getId(), Pageable.UNPAGED)
.forEach(executionRepository::delete);
});
}
private static LoadFlows getLoadFlows(ExtensionContext extensionContext) {

View File

@@ -0,0 +1,320 @@
package io.kestra.core.runners;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.utils.Await;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import static io.kestra.core.utils.TestsUtils.stringify;
@Singleton
public class TestRunnerUtils {
public static final Duration DEFAULT_MAX_WAIT_DURATION = Duration.ofSeconds(15);
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
private ExecutionService executionService;
public Execution runOne(String tenantId, String namespace, String flowId)
throws TimeoutException, QueueException {
return this.runOne(tenantId, namespace, flowId, null, null, null, null);
}
public Execution runOne(String tenantId, String namespace, String flowId, Integer revision)
throws TimeoutException, QueueException {
return this.runOne(tenantId, namespace, flowId, revision, null, null, null);
}
public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs)
throws TimeoutException, QueueException {
return this.runOne(tenantId, namespace, flowId, revision, inputs, null, null);
}
public Execution runOne(String tenantId, String namespace, String flowId, Duration duration)
throws TimeoutException, QueueException {
return this.runOne(tenantId, namespace, flowId, null, null, duration, null);
}
public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws TimeoutException, QueueException {
return this.runOne(tenantId, namespace, flowId, revision, inputs, duration, null);
}
public Execution runOne(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels)
throws TimeoutException, QueueException {
return this.runOne(
flowRepository
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration,
labels);
}
public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs)
throws TimeoutException, QueueException {
return this.runOne(flow, inputs, null, null);
}
public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws TimeoutException, QueueException {
return this.runOne(flow, inputs, duration, null);
}
public Execution runOne(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration, List<Label> labels)
throws TimeoutException, QueueException {
if (duration == null) {
duration = Duration.ofSeconds(15);
}
Execution execution = Execution.newExecution(flow, inputs, labels, Optional.empty());
return runOne(execution, flow, duration);
}
public Execution runOne(Execution execution, Flow flow, Duration duration)
throws TimeoutException, QueueException {
return this.emitAndAwaitExecution(isTerminatedExecution(execution, flow), execution, duration);
}
public Execution runOneUntilPaused(String tenantId, String namespace, String flowId)
throws QueueException {
return this.runOneUntilPaused(tenantId, namespace, flowId, null, null, null);
}
public Execution runOneUntilPaused(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws QueueException {
return this.runOneUntilPaused(
flowRepository
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration
);
}
public Execution runOneUntilPaused(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws QueueException {
if (duration == null) {
duration = DEFAULT_MAX_WAIT_DURATION;
}
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
return this.emitAndAwaitExecution(isPausedExecution(execution), execution, duration);
}
public Execution runOneUntilRunning(String tenantId, String namespace, String flowId)
throws QueueException {
return this.runOneUntilRunning(tenantId, namespace, flowId, null, null, null);
}
public Execution runOneUntilRunning(String tenantId, String namespace, String flowId, Integer revision, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws QueueException {
return this.runOneUntilRunning(
flowRepository
.findById(tenantId, namespace, flowId, revision != null ? Optional.of(revision) : Optional.empty())
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'")),
inputs,
duration
);
}
public Execution runOneUntilRunning(Flow flow, BiFunction<FlowInterface, Execution, Map<String, Object>> inputs, Duration duration)
throws QueueException {
if (duration == null) {
duration = DEFAULT_MAX_WAIT_DURATION;
}
Execution execution = Execution.newExecution(flow, inputs, null, Optional.empty());
return this.emitAndAwaitExecution(isRunningExecution(execution), execution, duration);
}
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution) throws QueueException {
return emitAndAwaitExecution(predicate, execution, Duration.ofSeconds(20));
}
public Execution emitAndAwaitExecution(Predicate<Execution> predicate, Execution execution, Duration duration)
throws QueueException {
this.executionQueue.emit(execution);
return awaitExecution(predicate, execution, duration);
}
public Execution awaitExecution(Predicate<Execution> predicate, Execution execution) {
return awaitExecution(predicate, execution, Duration.ofSeconds(20));
}
public Execution awaitExecution(Predicate<Execution> predicate, Execution execution, Duration duration) {
AtomicReference<Execution> receive = new AtomicReference<>();
try {
if (duration == null){
duration = Duration.ofSeconds(20);
}
Await.until(() -> {
testExecution(predicate, receive, execution);
return receive.get() != null;
}, Duration.ofMillis(10), duration);
} catch (TimeoutException e) {
Optional<Execution> byId = executionRepository.findById(execution.getTenantId(), execution.getId());
if (byId.isPresent()) {
Execution exec = byId.get();
throw new RuntimeException("Execution %s is currently at the status %s which is not the awaited one, full execution object:\n%s".formatted(exec.getId(), exec.getState().getCurrent(), stringify(exec)));
} else {
throw new RuntimeException("Execution %s doesn't exist in the database".formatted(execution.getId()));
}
}
return receive.get();
}
private void testExecution(Predicate<Execution> predicate, AtomicReference<Execution> receive, Execution execution){
Optional<Execution> exec = executionRepository.findById(execution.getTenantId(), execution.getId());
if (exec.isPresent() && predicate.test(exec.get())) {
receive.set(exec.get());
}
}
/**
* This method will return the last created execution
* @param predicate
* @param tenantId
* @param namespace
* @param flowId
* @return
*/
public Execution awaitFlowExecution(Predicate<Execution> predicate, String tenantId, String namespace, String flowId) {
return awaitFlowExecution(predicate, tenantId, namespace, flowId, null);
}
public Execution awaitFlowExecution(Predicate<Execution> predicate, String tenantId, String namespace, String flowId, Duration duration) {
AtomicReference<Execution> receive = new AtomicReference<>();
try {
if (duration == null){
duration = Duration.ofSeconds(20);
}
Await.until(() -> {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (!byFlowId.isEmpty()) {
Execution first = byFlowId.stream()
.sorted(Comparator.comparing(e -> e.getMetadata().getOriginalCreatedDate()))
.toList().getLast();
testExecution(predicate, receive, first);
return receive.get() != null;
}
return false;
}, Duration.ofMillis(50), duration);
} catch (TimeoutException e) {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (!byFlowId.isEmpty()) {
Execution exec = byFlowId.getLast();
throw new RuntimeException("Execution %s is currently at the status %s which is not the awaited one".formatted(exec.getId(), exec.getState().getCurrent()));
} else {
throw new RuntimeException("No execution for flow %s exist in the database".formatted(flowId));
}
}
return receive.get();
}
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId) {
return awaitFlowExecutionNumber(number, tenantId, namespace, flowId, null);
}
public List<Execution> awaitFlowExecutionNumber(int number, String tenantId, String namespace, String flowId, Duration duration) {
AtomicReference<List<Execution>> receive = new AtomicReference<>();
Flow flow = flowRepository
.findById(tenantId, namespace, flowId, Optional.empty())
.orElseThrow(
() -> new IllegalArgumentException("Unable to find flow '" + flowId + "'"));
try {
if (duration == null){
duration = Duration.ofSeconds(20);
}
Await.until(() -> {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (byFlowId.size() == number
&& byFlowId.stream()
.filter(e -> executionService.isTerminated(flow, e))
.toList().size() == number) {
receive.set(byFlowId);
return true;
}
return false;
}, Duration.ofMillis(50), duration);
} catch (TimeoutException e) {
ArrayListTotal<Execution> byFlowId = executionRepository.findByFlowId(
tenantId, namespace, flowId, Pageable.UNPAGED);
if (!byFlowId.isEmpty()) {
throw new RuntimeException("%d Execution found for flow %s, but %d where awaited".formatted(byFlowId.size(), flowId, number));
} else {
throw new RuntimeException("No execution for flow %s exist in the database".formatted(flowId));
}
}
return receive.get();
}
@VisibleForTesting
public Execution awaitChildExecution(Flow flow, Execution parentExecution, Execution execution, Duration duration)
throws QueueException {
return this.emitAndAwaitExecution(isTerminatedChildExecution(parentExecution, flow), execution, duration);
}
private Predicate<Execution> isTerminatedExecution(Execution execution, Flow flow) {
return e -> e.getId().equals(execution.getId()) && executionService.isTerminated(flow, e);
}
private Predicate<Execution> isPausedExecution(Execution execution) {
return e -> e.getId().equals(execution.getId()) && e.getState().isPaused() && e.getTaskRunList() != null && e.getTaskRunList().stream().anyMatch(t -> t.getState().isPaused());
}
private Predicate<Execution> isRunningExecution(Execution execution) {
return e -> e.getId().equals(execution.getId()) && e.getState().isRunning() && e.getTaskRunList() != null && e.getTaskRunList().stream().anyMatch(t -> t.getState().isRunning());
}
private Predicate<Execution> isTerminatedChildExecution(Execution parentExecution, Flow flow) {
return e -> e.getParentId() != null && e.getParentId().equals(parentExecution.getId()) && executionService.isTerminated(flow, e);
}
}

View File

@@ -504,7 +504,7 @@
import YAML_CHART from "../dashboard/assets/executions_timeseries_chart.yaml?raw";
import Utils from "../../utils/utils";
import {filterLabels} from "./utils"
import {filterValidLabels} from "./utils"
import {useExecutionsStore} from "../../stores/executions";
import {useAuthStore} from "override/stores/auth";
import {useFlowStore} from "../../stores/flow";
@@ -1055,9 +1055,9 @@
);
},
setLabels() {
const filtered = filterLabels(this.executionLabels)
const filtered = filterValidLabels(this.executionLabels)
if(filtered.error) {
if (filtered.error) {
this.$toast().error(this.$t("wrong labels"))
return;
}

View File

@@ -55,7 +55,7 @@
import LabelInput from "../../components/labels/LabelInput.vue";
import {State} from "@kestra-io/ui-libs"
import {filterLabels} from "./utils"
import {filterValidLabels} from "./utils"
import permission from "../../models/permission";
import action from "../../models/action";
import {useAuthStore} from "override/stores/auth"
@@ -78,10 +78,11 @@
},
methods: {
setLabels() {
let filtered = filterLabels(this.executionLabels)
const filtered = filterValidLabels(this.executionLabels)
if(filtered.error) {
filtered.labels = filtered.labels.filter(obj => !(obj.key === null && obj.value === null));
if (filtered.error) {
this.$toast().error(this.$t("wrong labels"))
return;
}
this.isOpen = false;

View File

@@ -8,7 +8,7 @@ interface FilterResult {
error?: boolean;
}
export const filterLabels = (labels: Label[]): FilterResult => {
const invalid = labels.some(label => label.key === null || label.value === null || label.key === "" || label.value === "");
return invalid ? {labels, error: true} : {labels};
export const filterValidLabels = (labels: Label[]): FilterResult => {
const validLabels = labels.filter(label => label.key !== null && label.value !== null && label.key !== "" && label.value !== "");
return validLabels.length === labels.length ? {labels} : {labels: validLabels, error: true};
};

View File

@@ -351,13 +351,15 @@
const dataTableRef = useTemplateRef<typeof DataTable>("dataTable");
const {queryWithFilter, onPageChanged, onRowDoubleClick, onSort} = useDataTableActions({dblClickRouteName: "flows/update"});
function selectionMapper(element: {id: string; namespace: string; disabled: boolean}): {id: string; namespace: string; enabled: boolean} {
function selectionMapper({id, namespace, disabled}: {id: string; namespace: string; disabled: boolean}) {
return {
id: element.id,
namespace: element.namespace,
enabled: !element.disabled,
id,
namespace,
enabled: !disabled,
};
}
const {selection, queryBulkAction, handleSelectionChange, toggleAllUnselected, toggleAllSelection} = useSelectTableActions({
dataTableRef,
selectionMapper

View File

@@ -242,7 +242,7 @@
return this.kvs?.filter(kv =>
!this.searchQuery ||
kv.key.toLowerCase().includes(this.searchQuery.toLowerCase()) ||
kv.description.toLowerCase().includes(this.searchQuery.toLowerCase())
kv.description?.toLowerCase().includes(this.searchQuery.toLowerCase())
);
},
kvModalTitle() {

View File

@@ -11,10 +11,9 @@
hideToggle
>
<template #header>
<el-button @click="collapsed = onToggleCollapse(!collapsed)" class="collapseButton" :size="collapsed ? 'small':undefined">
<ChevronRight v-if="collapsed" />
<ChevronLeft v-else />
</el-button>
<SidebarToggleButton
@toggle="collapsed = onToggleCollapse(!collapsed)"
/>
<div class="logo">
<component :is="props.showLink ? 'router-link' : 'div'" :to="{name: 'home'}">
<span class="img" />
@@ -41,14 +40,14 @@
import {SidebarMenu} from "vue-sidebar-menu";
import ChevronLeft from "vue-material-design-icons/ChevronLeft.vue";
import ChevronRight from "vue-material-design-icons/ChevronRight.vue";
import StarOutline from "vue-material-design-icons/StarOutline.vue";
import Environment from "./Environment.vue";
import BookmarkLinkList from "./BookmarkLinkList.vue";
import {useBookmarksStore} from "../../stores/bookmarks";
import type {MenuItem} from "override/components/useLeftMenu";
import {useLayoutStore} from "../../stores/layout";
import SidebarToggleButton from "./SidebarToggleButton.vue";
const props = withDefaults(defineProps<{
@@ -63,9 +62,11 @@
const $route = useRoute()
const {t} = useI18n({useScope: "global"});
const layoutStore = useLayoutStore();
function onToggleCollapse(folded) {
collapsed.value = folded;
localStorage.setItem("menuCollapsed", folded ? "true" : "false");
layoutStore.setSideMenuCollapsed(folded);
$emit("menu-collapse", folded);
return folded;

View File

@@ -0,0 +1,43 @@
<template>
<el-button
class="collapseButton sidebar-toggle"
@click="$emit('toggle')"
>
<svg
width="12"
height="12"
viewBox="0 0 12 12"
fill="none"
xmlns="http://www.w3.org/2000/svg"
>
<path
fill-rule="evenodd"
clip-rule="evenodd"
d="M11.8554 10.9932C11.8567 11.4542 11.4841 11.8289 11.0231 11.8301L1.02524 11.858C0.564312 11.8593 0.189613 11.4867 0.188327 11.0258L0.160404 1.01728C0.159118 0.556349 0.531732 0.181649 0.99266 0.180363L10.9906 0.152469C11.4515 0.151183 11.8262 0.523797 11.8275 0.984726L11.8554 10.9932ZM11.0318 11.0054L5.18316 11.0217L5.15511 0.967535L11.0037 0.951218L11.0318 11.0054ZM4.31027 11.023L0.975876 11.0323L0.947825 0.978203L4.28221 0.9689L4.31027 11.023Z"
fill="currentColor"
/>
</svg>
</el-button>
</template>
<script setup lang="ts">
defineEmits<{
(e: "toggle"): void;
}>();
</script>
<style lang="scss" scoped>
.sidebar-toggle {
border: none;
color: var(--ks-text-secondary);
&:hover {
color: var(--ks-content-link);
}
html.dark & {
color: var(--ks-text-secondary);
}
}
</style>

Some files were not shown because too many files have changed in this diff Show More