Compare commits

..

1 Commits

Author SHA1 Message Date
Loïc Mathieu
911cd47fe6 feat(flow): EmbeddedFlow task
Adds an EmbeddedFlow that allow to embed subflow tasks into a parent tasks.

Fixes #6518
2025-04-14 17:02:21 +02:00
276 changed files with 2279 additions and 2124 deletions

View File

@@ -62,7 +62,7 @@ jobs:
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
run: ./gradlew testClasses -x :ui:assembleFrontend
run: ./gradlew testClasses -x :ui:installFrontend -x :ui:assembleFrontend
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)

View File

@@ -8,9 +8,6 @@ on:
env:
JAVA_VERSION: '21'
permissions:
contents: read
jobs:
dependency-check:
name: Dependency Check
@@ -60,10 +57,6 @@ jobs:
develop-image-check:
name: Image Check (develop)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -90,25 +83,13 @@ jobs:
uses: aquasecurity/trivy-action@0.30.0
with:
image-ref: kestra/kestra:develop
format: 'template'
template: '@/contrib/sarif.tpl'
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
format: table
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
scanners: vuln
latest-image-check:
name: Image Check (latest)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -137,11 +118,4 @@ jobs:
image-ref: kestra/kestra:latest
format: table
skip-dirs: /app/plugins
scanners: vuln
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
scanners: vuln

View File

@@ -31,8 +31,6 @@ jobs:
steps:
- uses: actions/checkout@v4
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main

View File

@@ -129,7 +129,7 @@ class FlowCreateOrUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("1 flow(s)");
}
}

View File

@@ -24,7 +24,7 @@ class FlowDotCommandTest {
};
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("\"root.date\"[shape=box];");
}
}

View File

@@ -22,7 +22,7 @@ class FlowExpandCommandTest {
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +

View File

@@ -55,7 +55,7 @@ class FlowExportCommandTest {
};
PicocliRunner.call(FlowExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/flows.zip");
assertThat(file.exists()).isTrue();
assertThat(file.exists()).isEqualTo(true);
ZipFile zipFile = new ZipFile(file);
// When launching the test in a suite, there is 4 flows but when lauching individualy there is only 3

View File

@@ -169,7 +169,7 @@ class FlowUpdatesCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("1 flow(s)");
}
}

View File

@@ -22,7 +22,7 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
}
}
@@ -39,7 +39,7 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");

View File

@@ -19,7 +19,7 @@ class FlowNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra flow namespace");
}
}

View File

@@ -162,7 +162,7 @@ class FlowNamespaceUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("1 flow(s)");
}
}

View File

@@ -19,7 +19,7 @@ class NamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra namespace");
}
}

View File

@@ -19,7 +19,7 @@ class NamespaceFilesCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra namespace files");
}
}

View File

@@ -19,7 +19,7 @@ class KvCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra namespace kv");
}
}

View File

@@ -46,7 +46,7 @@ class PluginDocCommandTest {
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
var directory = files.getFirst().toFile();
assertThat(directory.isDirectory()).isTrue();
assertThat(directory.isDirectory()).isEqualTo(true);
assertThat(directory.listFiles().length).isEqualTo(3);
var readme = directory.toPath().resolve("index.md");

View File

@@ -42,7 +42,7 @@ class ReindexCommandTest {
"flow",
};
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
// in local it reindex 3 flows and in CI 4 for an unknown reason
assertThat(out.toString()).contains("Successfully reindex");
}

View File

@@ -19,7 +19,7 @@ class DatabaseCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra sys database");
}
}

View File

@@ -20,7 +20,7 @@ class StateStoreCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}

View File

@@ -53,7 +53,7 @@ class StateStoreMigrateCommandTest {
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isEqualTo(true);
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
@@ -67,9 +67,9 @@ class StateStoreMigrateCommandTest {
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isEqualTo(false);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
}
}
}

View File

@@ -54,7 +54,7 @@ class TemplateExportCommandTest {
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
assertThat(file.exists()).isEqualTo(true);
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);

View File

@@ -13,7 +13,7 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
public class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");

View File

@@ -19,7 +19,7 @@ class TemplateNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(call).isEqualTo(0);
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}

View File

@@ -27,7 +27,7 @@ class DeleteConfigurationApplicationListenersTest {
);
try (ApplicationContext ctx = ApplicationContext.run(mapPropertySource, Environment.CLI, Environment.TEST)) {
assertThat(tempFile.exists()).isFalse();
assertThat(tempFile.exists()).isEqualTo(false);
}
}
}

View File

@@ -91,8 +91,6 @@ public class MetricRegistry {
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION = "Trigger lock duration waiting for an execution to be terminated";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";

View File

@@ -39,6 +39,30 @@ import java.util.Optional;
@NoArgsConstructor
@JsonDeserialize
public class GenericFlow extends AbstractFlow implements HasUID {
private String id;
private String namespace;
private Integer revision;
private List<Input<?>> inputs;
private Map<String, Object> variables;
@Builder.Default
private boolean disabled = false;
@Builder.Default
private boolean deleted = false;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
private List<Label> labels;
private String tenantId;
private String source;
private List<SLA> sla;
@@ -60,6 +84,7 @@ public class GenericFlow extends AbstractFlow implements HasUID {
* @return a new {@link GenericFlow}
* @throws DeserializationException if source cannot be deserialized.
*/
@VisibleForTesting
public static GenericFlow of(final FlowInterface flow) throws DeserializationException {
return fromYaml(flow.getTenantId(), flow.sourceOrGenerateIfNull());
}

View File

@@ -69,7 +69,7 @@ public class State {
public State withState(Type state) {
if (this.current == state) {
log.warn("Can't change state, already {}", current);
log.warn("Can't change state, already " + current);
return this;
}

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.namespaces;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -12,7 +11,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@ToString
@EqualsAndHashCode
@Schema(name = "NamespaceLight")
public class Namespace implements NamespaceInterface {
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")

View File

@@ -2,41 +2,38 @@ package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
import static io.kestra.core.utils.Rethrow.throwConsumer;
abstract public class PluginUtilsService {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
private static final Pattern PATTERN = Pattern.compile("^::(\\{.*})::$");
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static Map<String, String> createOutputFiles(
@@ -55,12 +52,12 @@ abstract public class PluginUtilsService {
) throws IOException {
List<String> outputs = new ArrayList<>();
if (outputFiles != null && !outputFiles.isEmpty()) {
if (outputFiles != null && outputFiles.size() > 0) {
outputs.addAll(outputFiles);
}
Map<String, String> result = new HashMap<>();
if (!outputs.isEmpty()) {
if (outputs.size() > 0) {
outputs
.forEach(throwConsumer(s -> {
PluginUtilsService.validFilename(s);
@@ -171,27 +168,64 @@ abstract public class PluginUtilsService {
}
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
Matcher m = PATTERN.matcher(line);
Map<String, Object> outputs = new HashMap<>();
try {
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
if (matches.isPresent()) {
TaskLogMatch taskLogMatch = matches.get();
outputs.putAll(taskLogMatch.outputs());
} else if (isStdErr) {
if (m.find()) {
try {
BashCommand<?> bashCommand = MAPPER.readValue(m.group(1), BashCommand.class);
if (bashCommand.getOutputs() != null) {
outputs.putAll(bashCommand.getOutputs());
}
if (bashCommand.getMetrics() != null) {
bashCommand.getMetrics().forEach(runContext::metric);
}
if (bashCommand.getLogs() != null) {
bashCommand.getLogs().forEach(logLine -> {
try {
LoggingEventBuilder builder = runContext
.logger()
.atLevel(logLine.getLevel())
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, customInstant);
builder.log(logLine.getMessage());
} catch (Exception e) {
logger.warn("Invalid log '{}'", m.group(1), e);
}
});
}
}
catch (JsonProcessingException e) {
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
}
} else {
if (isStdErr) {
runContext.logger().error(line);
} else {
runContext.logger().info(line);
}
} catch (IOException e) {
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
}
return outputs;
}
@NoArgsConstructor
@Data
public static class BashCommand <T> {
private Map<String, Object> outputs;
private List<AbstractMetricEntry<T>> metrics;
private List<LogLine> logs;
}
@NoArgsConstructor
@Data
public static class LogLine {
private Level level;
private String message;
}
/**
* This helper method will allow gathering the execution information from a task parameters:
* - If executionId is null, it is fetched from the runContext variables (a.k.a. current execution).

View File

@@ -1,111 +0,0 @@
package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
/**
* Service for matching and capturing structured data from task execution logs.
* <p>
* Example log format that may be matched:
* <pre>{@code
* ::{"outputs":{"key":"value"}}::
* }</pre>
*/
@Singleton
public class TaskLogLineMatcher {
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
protected static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
/**
* Attempts to match and extract structured data from a given log line.
* <p>
* If the line contains recognized patterns (e.g., JSON-encoded output markers),
* a {@link TaskLogMatch} is returned encapsulating the extracted data.
* </p>
*
* @param logLine the raw log line.
* @param logger the logger
* @param runContext the {@link RunContext}
* @return an {@link Optional} containing the {@link TaskLogMatch} if a match was found,
* otherwise {@link Optional#empty()}
*/
public Optional<TaskLogMatch> matches(String logLine, Logger logger, RunContext runContext, Instant instant) throws IOException {
Optional<String> matches = matches(logLine);
if (matches.isEmpty()) {
return Optional.empty();
}
TaskLogMatch match = MAPPER.readValue(matches.get(), TaskLogLineMatcher.TaskLogMatch.class);
return Optional.of(handle(logger, runContext, instant, match, matches.get()));
}
protected TaskLogMatch handle(Logger logger, RunContext runContext, Instant instant, TaskLogMatch match, String data) {
if (match.metrics() != null) {
match.metrics().forEach(runContext::metric);
}
if (match.logs() != null) {
match.logs().forEach(it -> {
try {
LoggingEventBuilder builder = runContext
.logger()
.atLevel(it.level())
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, instant);
builder.log(it.message());
} catch (Exception e) {
logger.warn("Invalid log '{}'",data, e);
}
});
}
return match;
}
protected Optional<String> matches(String logLine) {
Matcher m = LOG_DATA_SYNTAX.matcher(logLine);
return m.find() ? Optional.ofNullable(m.group(1)) : Optional.empty();
}
/**
* Represents the result of log line match.
*
* @param outputs a map of extracted output key-value pairs
* @param metrics a list of captured metric entries, typically used for reporting or monitoring
* @param logs additional log lines derived from the matched line, if any
*/
public record TaskLogMatch(
Map<String, Object> outputs,
List<AbstractMetricEntry<?>> metrics,
List<LogLine> logs
) {
@Override
public Map<String, Object> outputs() {
return Optional.ofNullable(outputs).orElse(Map.of());
}
}
public record LogLine(
Level level,
String message
) {
}
}

View File

@@ -140,28 +140,11 @@ public final class ExecutableUtils {
}
}
String tenantId = currentExecution.getTenantId();
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
FlowInterface flow = getSubflow(tenantId, subflowNamespace, subflowId, subflowRevision, flowExecutorInterface, currentFlow);
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
if (labels != null) {
@@ -223,6 +206,35 @@ public final class ExecutableUtils {
.toList();
}
public static FlowInterface getSubflow(String tenantId,
String subflowNamespace,
String subflowId,
Optional<Integer> subflowRevision,
FlowExecutorInterface flowExecutorInterface,
FlowInterface currentFlow) {
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
tenantId,
subflowNamespace,
subflowId,
subflowRevision,
tenantId,
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
return flow;
}
private static List<Label> systemLabels(Execution execution) {
return Streams.of(execution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))

View File

@@ -615,19 +615,16 @@ public class ExecutorService {
Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
if (task instanceof Pause pauseTask) {
if (pauseTask.getPauseDuration() != null || pauseTask.getTimeout() != null) {
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
Duration duration = runContext.render(pauseTask.getPauseDuration()).as(Duration.class).orElse(null);
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
Pause.Behavior behavior = runContext.render(pauseTask.getBehavior()).as(Pause.Behavior.class).orElse(Pause.Behavior.RESUME);
if (duration != null || timeout != null) { // rendering can lead to null, so we must re-check here
// if duration is set, we use it, and we use the Pause behavior as a state
// if no duration, we use the standard timeout property and use FAILED as the target state
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}

View File

@@ -660,9 +660,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
});
});
metricRegistry
.timer(MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION, MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION)
.record(Duration.between(now, ZonedDateTime.now()));
}
private List<FlowWithSource> getFlowsWithDefaults() {

View File

@@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import org.apache.commons.io.FilenameUtils;
@@ -51,7 +50,7 @@ public final class YamlParser {
}
private static <T> String type(Class<T> cls) {
return FlowInterface.class.isAssignableFrom(cls) ? "flow" : cls.getSimpleName().toLowerCase();
return cls.getSimpleName().toLowerCase();
}
public static <T> T parse(File file, Class<T> cls) throws ConstraintViolationException {

View File

@@ -344,7 +344,7 @@ public class ExecutionService {
}
// if it's a Pause task with no subtask, we terminate the task
if (task instanceof Pause pauseTask && ListUtils.isEmpty(pauseTask.getTasks())) {
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
@@ -365,12 +365,11 @@ public class ExecutionService {
}
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
// there are still some tasks paused, this can occur with parallel pause
// there is still some tasks paused, this can occur with parallel pause
return newExecution;
}
// we need to cancel immediately or the executor will process the next task if it's restarted.
return newState == State.Type.CANCELLED ? newExecution.withState(State.Type.CANCELLED) : newExecution.withState(State.Type.RESTARTED);
return newExecution
.withState(State.Type.RESTARTED);
}
public Execution markWithTaskRunAs(final Execution execution, String taskRunId, State.Type newState, Boolean markParents) throws Exception {
@@ -656,7 +655,7 @@ public class ExecutionService {
*
* @return the execution in a KILLING state if not already terminated
*/
public Execution kill(Execution execution, FlowInterface flow) {
public Execution kill(Execution execution, Flow flow) {
if (execution.getState().getCurrent() == State.Type.KILLING || execution.getState().isTerminated()) {
return execution;
}

View File

@@ -24,6 +24,7 @@ import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.flow.EmbeddedSubflow;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
@@ -197,8 +198,14 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
String cause = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
logService.get().logExecution(flow, logger, Level.WARN, "Unable to inject plugin defaults. Cause: '{}'", cause);
logger.warn(
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
return readWithoutDefaultsOrThrow(flow);
}
}
@@ -303,7 +310,7 @@ public class PluginDefaultService {
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
} catch (Exception e) {
if (safe) {
logService.get().logExecution(flow, log, Level.WARN, "Unable to inject plugin default versions. Cause: {}", e.getMessage());
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
result = FlowWithException.from(flow, e);
// deleted is not part of the original 'source'
@@ -375,11 +382,16 @@ public class PluginDefaultService {
.build();
if (tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// This is a hack to set the tenant in Template and EmbeddedSubflow tasks.
// When using the Template or EmbeddedSubflow task, we need the tenant to fetch the them from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
full.allTasksWithChilds().stream()
.filter(task -> task instanceof Template)
.forEach(task -> ((Template) task).setTenantId(tenant));
full.allTasksWithChilds().stream()
.filter(task -> task instanceof EmbeddedSubflow)
.forEach(task -> ((EmbeddedSubflow) task).setTenantId(tenant));
}
return full;

View File

@@ -19,6 +19,7 @@ import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.condition.*;
import io.kestra.plugin.core.flow.ChildFlowInterface;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -161,11 +162,9 @@ public class FlowTopologyService {
return parent
.allTasksWithChilds()
.stream()
.filter(t -> t instanceof ExecutableTask)
.map(t -> (ExecutableTask<?>) t)
.anyMatch(t ->
t.subflowId() != null && t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
);
.filter(t -> t instanceof ChildFlowInterface)
.map(t -> (ChildFlowInterface) t)
.anyMatch(t -> Objects.equals(t.getFlowId(), child.getId()) && Objects.equals(t.getNamespace(), child.getNamespace()));
} catch (Exception e) {
log.warn("Failed to detect flow task on namespace:'{}', flowId:'{}'", parent.getNamespace(), parent.getId(), e);
return false;

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.kestra.plugin.core.flow.ChildFlowInterface;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
@@ -69,9 +70,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
}
value.allTasksWithChilds()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.stream().filter(task -> task instanceof ChildFlowInterface childFlow
&& value.getId().equals(childFlow.getFlowId())
&& value.getNamespace().equals(childFlow.getNamespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
// input unique name

View File

@@ -22,7 +22,7 @@ import org.slf4j.event.Level;
@Getter
@NoArgsConstructor
@Schema(
title = "Log a message in the task logs (Deprecated).",
title = "Log a message in the task logs.",
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
deprecated = true
)

View File

@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Getter
@NoArgsConstructor
@Schema(
title = "Assert some conditions to control task output data.",
title = "Assert some conditions.",
description = "Used to control outputs data emitted from previous task on this execution."
)
@Plugin(

View File

@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
@Getter
@NoArgsConstructor
@Schema(
title = "List execution counts for a list of flows.",
title = "List execution counts for a list of flow.",
description = "This can be used to send an alert if a condition is met about execution counts."
)
@Plugin(

View File

@@ -30,7 +30,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "Terminate an execution in the state defined by the property state.",
title = "Exit the execution: terminate it in the state defined by the property `state`.",
description = "Note that if this execution has running tasks, for example in a parallel branch, the tasks will not be terminated except if `state` is set to `KILLED`."
)
@Plugin(

View File

@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
@Getter
@NoArgsConstructor
@Schema(
title = "Intentionally fail the execution.",
title = "Fail the execution.",
description = "Used to fail the execution, for example, on a switch branch or on some conditions based on the execution context."
)
@Plugin(

View File

@@ -37,8 +37,7 @@ import java.util.Map;
@Getter
@NoArgsConstructor
@Schema(
title = "Resume a paused execution.",
description = "By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
title = "Resume a paused execution. By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
)
@Plugin(
examples = {

View File

@@ -36,7 +36,7 @@ import java.util.stream.Stream;
@NoArgsConstructor
@DagTaskValidation
@Schema(
title = "Create a DAG of tasks without explicitly specifying the order in which the tasks must run.",
title = "Create a directed acyclic graph (DAG) of tasks without explicitly specifying the order in which the tasks need to run.",
description = "List your tasks and their dependencies, and Kestra will figure out the execution sequence.\n" +
"Each task can only depend on other tasks from the DAG task.\n" +
"For technical reasons, low-code interaction via UI forms is disabled for now when using this task."

View File

@@ -32,7 +32,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
title = "For each value in the list, execute one or more tasks in parallel.",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item in parallel. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +

View File

@@ -34,7 +34,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
title = "For each value in the list, execute one or more tasks sequentially.",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item sequentially. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +

View File

@@ -0,0 +1,304 @@
package io.kestra.plugin.core.flow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.*;
import io.kestra.core.services.PluginDefaultService;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Embeds subflow tasks into this flow."
)
@Plugin(
examples = {
@Example(
title = "Embeds subflow tasks.",
full = true,
code = """
id: parent_flow
namespace: company.team
tasks:
- id: embed_subflow
type: io.kestra.plugin.core.flow.EmbeddedSubflow
namespace: company.team
flowId: subflow
"""
)
}
)
public class EmbeddedSubflow extends Task implements FlowableTask<EmbeddedSubflow.Output>, ChildFlowInterface {
static final String PLUGIN_FLOW_OUTPUTS_ENABLED = "outputs.enabled";
@Hidden
@Setter // we have no other option here as we need to update the task inside the flow when creating it
private String tenantId;;
@NotEmpty
@Schema(
title = "The namespace of the subflow to be embedded."
)
@PluginProperty
private String namespace;
@NotNull
@Schema(
title = "The identifier of the subflow to be embedded."
)
@PluginProperty
private String flowId;
@Schema(
title = "The revision of the subflow to be embedded.",
description = "By default, the last, i.e. the most recent, revision of the subflow is embedded."
)
@PluginProperty
@Min(value = 1)
private Integer revision;
@Schema(
title = "The inputs to pass to the subflow to be embedded."
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs;
@Override
@JsonIgnore
public List<Task> getErrors() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return subflow.getErrors();
}
@Override
@JsonIgnore
public List<Task> getFinally() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return subflow.getFinally();
}
@Override
public AbstractGraph tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return null;
}
Flow subflow = maybeSubflow.get();
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
GraphUtils.sequential(
subGraph,
subflow.getTasks(),
subflow.getErrors(),
subflow.getFinally(),
taskRun,
execution
);
return subGraph;
}
@Override
public List<Task> allChildTasks() {
Optional<Flow> maybeSubflow = fetchSubflow();
if (maybeSubflow.isEmpty()) {
return Collections.emptyList();
}
Flow subflow = maybeSubflow.get();
return Stream
.concat(
subflow.getTasks() != null ? subflow.getTasks().stream() : Stream.empty(),
subflow.getErrors() != null ? subflow.getErrors().stream() : Stream.empty()
)
.toList();
}
@Override
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
Flow subflow = fetchSubflow(runContext);
return FlowableUtils.resolveTasks(subflow.getTasks(), parentTaskRun);
}
@Override
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveSequentialNexts(
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun
);
}
@Override
public Output outputs(RunContext runContext) throws Exception {
final Output.OutputBuilder builder = Output.builder();
Flow subflow = fetchSubflow(runContext);
final Optional<Map<String, Object>> subflowOutputs = Optional
.ofNullable(subflow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
);
if (subflowOutputs.isPresent() && runContext.getVariables().get("outputs") != null) {
Map<String, Object> outputs = runContext.render(subflowOutputs.get());
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (subflow.getOutputs() != null && flowInputOutput != null) {
// to be able to use FILE Input, we need the execution info, so we create a fake execution with what's needed here
RunContext.FlowInfo flowInfo = runContext.flowInfo();
String executionId = (String) ((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
Execution fake = Execution.builder()
.id(executionId)
.tenantId(flowInfo.tenantId())
.namespace(flowInfo.namespace())
.flowId(flowInfo.id())
.build();
outputs = flowInputOutput.typedOutputs(subflow, fake, outputs);
}
builder.outputs(outputs);
}
return builder.build();
}
// This method should only be used when getSubflow(RunContext) cannot be used.
private Optional<Flow> fetchSubflow() {
// at validation time, namespace and flowId may not yet be set, in this case let's return an optional to not fail
if (namespace == null || flowId == null) {
return Optional.empty();
}
ApplicationContext applicationContext = ContextHelper.context();
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
FlowInterface subflow = flowExecutor.findById(tenantId, namespace, flowId, Optional.ofNullable(revision)).orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + namespace + "." + flowId));
if (subflow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}
if (subflow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
try {
return Optional.of(pluginDefaultService.injectAllDefaults(subflow, true));
} catch (FlowProcessingException e) {
throw new RuntimeException(e);
}
}
// This method is preferred as getSubflow() as it checks current flow and subflow and allowed namespaces
private Flow fetchSubflow(RunContext runContext) {
// we check that the task tenant is the current tenant to avoid accessing flows from another tenant
if (!Objects.equals(tenantId, runContext.flowInfo().tenantId())) {
throw new IllegalArgumentException("Cannot embeds a flow from a different tenant");
}
ApplicationContext applicationContext = ContextHelper.context();
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
RunContext.FlowInfo flowInfo = runContext.flowInfo();
FlowInterface flow = flowExecutor.findById(flowInfo.tenantId(), flowInfo.namespace(), flowInfo.id(), Optional.of(flowInfo.revision()))
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + flowInfo.namespace() + "." + flowInfo.id()));
FlowInterface subflow = ExecutableUtils.getSubflow(tenantId, namespace, flowId, Optional.ofNullable(revision), flowExecutor, flow);
// check inputs
if (!ListUtils.isEmpty(subflow.getInputs())) {
Optional<Input<?>> missing = subflow.getInputs().stream()
.filter(input -> input.getRequired() && !inputs.containsKey(input.getId()))
.findFirst();
if (missing.isPresent()) {
throw new IllegalArgumentException("Missing required input " + missing.get().getId());
}
}
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
return pluginDefaultService.injectAllDefaults(subflow, runContext.logger());
}
/**
* Ugly hack to provide the ApplicationContext on {{@link #allChildTasks }} &amp; {{@link #tasksTree }}
* We need to inject a way to fetch embedded subflows ...
*/
@Singleton
static class ContextHelper {
@Inject
private ApplicationContext applicationContext;
private static ApplicationContext context;
static ApplicationContext context() {
return ContextHelper.context;
}
@EventListener
void onStartup(final StartupEvent event) {
ContextHelper.context = this.applicationContext;
}
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The extracted outputs from the embedded subflow."
)
private final Map<String, Object> outputs;
}
}

View File

@@ -22,10 +22,8 @@ import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -42,9 +40,9 @@ import java.util.stream.Stream;
@Getter
@NoArgsConstructor
@Schema(
title = "Pause the current execution and wait for approval (either by humans or other automated processes).",
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after the `pauseDuration`."
title = "Pause the current execution and wait for a manual approval (either by humans or other automated processes).",
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after a timeout."
)
@Plugin(
examples = {
@@ -131,24 +129,6 @@ import java.util.stream.Stream;
type: io.kestra.plugin.core.log.Log
message: Status is {{ outputs.wait_for_approval.onResume.reason }}. Process finished with {{ outputs.approve.body }}
"""
),
@Example(
title = "Pause the execution and set the execution in WARNING if it has not been resumed after 5 minutes",
full = true,
code = """
id: pause_warn
namespace: company.team
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause
pauseDuration: PT5M
behavior: WARN
- id: post_resume
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} started on {{ taskrun.startDate }} after the Pause"
"""
)
},
aliases = "io.kestra.core.tasks.flows.Pause"
@@ -156,38 +136,17 @@ import java.util.stream.Stream;
public class Pause extends Task implements FlowableTask<Pause.Output> {
@Schema(
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
description = "**Deprecated**: use `pauseDuration` instead.",
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
implementation = Duration.class
)
@Deprecated
private Property<Duration> delay;
@Deprecated
public void setDelay(Property<Duration> delay) {
this.delay = delay;
this.pauseDuration = delay;
}
@Schema(
title = "Duration of the pause. If not set the task will wait forever to be manually resumed except if a timeout is set, in this case, the timeout will be honored.",
description = "The duration is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no pauseDuration and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
implementation = Duration.class
)
private Property<Duration> pauseDuration;
@Schema(
title = "Pause behavior, by default RESUME. What happens when a pause task reach its duration.",
description = """
Tasks that are resumed before the duration (for example, from the UI) will not use the behavior property but will always success.
Possible values are:
- RESUME: continue with the execution
- WARN: ends the Pause task in WARNING and continue with the execution
- FAIL: fail the Pause task
- CANCEL: cancel the execution"""
)
@NotNull
@Builder.Default
private Property<Behavior> behavior = Property.of(Behavior.RESUME);
private Property<Duration> timeout;
@Valid
@Schema(
@@ -271,34 +230,17 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
parentTaskRun.getState().getHistories().stream().noneMatch(history -> history.getState() == State.Type.PAUSED);
}
// This method is only called when there are subtasks
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (this.needPause(parentTaskRun)) {
return Optional.of(State.Type.PAUSED);
}
Behavior behavior = runContext.render(this.behavior).as(Behavior.class).orElse(Behavior.RESUME);
return switch (behavior) {
case Behavior.RESUME -> {
// yield SUCCESS or the final flowable task state
if (ListUtils.isEmpty(this.tasks)) {
yield Optional.of(State.Type.SUCCESS);
} else {
yield FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
}
}
case Behavior.WARN -> {
// yield WARNING or the final flowable task state, if the flowable ends in SUCCESS, yield WARNING
if (ListUtils.isEmpty(this.tasks)) {
yield Optional.of(State.Type.WARNING);
} else {
Optional<State.Type> finalState = FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
yield finalState.map(state -> state == State.Type.SUCCESS ? State.Type.WARNING : state);
}
}
case Behavior.CANCEL ,Behavior.FAIL -> throw new IllegalArgumentException("The " + behavior + " cannot be handled at this stage, this is certainly a bug!");
};
if (this.tasks == null || this.tasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
}
public Map<String, Object> generateOutputs(Map<String, Object> inputs) {
@@ -314,21 +256,4 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
public static class Output implements io.kestra.core.models.tasks.Output {
private Map<String, Object> onResume;
}
public enum Behavior {
RESUME(State.Type.RUNNING),
WARN(State.Type.WARNING),
CANCEL(State.Type.CANCELLED),
FAIL(State.Type.FAILED);
private final State.Type executionState;
Behavior(State.Type executionState) {
this.executionState = executionState;
}
public State.Type mapToState() {
return this.executionState;
}
}
}

View File

@@ -31,7 +31,7 @@ import java.util.stream.Stream;
@Getter
@NoArgsConstructor
@Schema(
title = "Run tasks sequentially in the order they are defined.",
title = "Run tasks sequentially, one after the other, in the order they are defined.",
description = "Used to visually group tasks."
)
@Plugin(

View File

@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
@Getter
@NoArgsConstructor
@Schema(
title = "Sleep task, wait for a specified duration before proceeding."
title = "A task that sleep for a specified duration before proceeding."
)
@Plugin(
examples = {

View File

@@ -49,8 +49,8 @@ import java.util.stream.Collectors;
@Getter
@NoArgsConstructor
@Schema(
title = "Create a subflow execution.",
description = "Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language. Restarting a parent flow will restart any subflows that has previously been executed."
title = "Create a subflow execution. Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language.",
description = "Restarting a parent flow will restart any subflows that has previously been executed."
)
@Plugin(
examples = {

View File

@@ -43,7 +43,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
@Getter
@NoArgsConstructor
@Schema(
title = "Run tasks conditionally based on a given value.",
title = "Run tasks conditionally, i.e. decide which branch of tasks should be executed based on a given value.",
description = "This task runs a set of tasks based on a given value.\n" +
"The value is evaluated at runtime and compared to the list of cases.\n" +
"If the value matches a case, the corresponding tasks are executed.\n" +

View File

@@ -53,7 +53,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@NoArgsConstructor
@Slf4j
@Schema(
title = "Include a reusable template inside a flow (Deprecated)."
title = "Include a reusable template inside a flow."
)
@Deprecated
@Plugin(

View File

@@ -36,8 +36,8 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
@Getter
@NoArgsConstructor
@Schema(
title = "Download a file from an HTTP server.",
description = "This task connects to a HTTP server and copies a file to Kestra's internal storage."
title = "Download a file from a HTTP server.",
description = "This task connects to a HTTP server and copy a file to Kestra's internal storage."
)
@Plugin(
examples = {

View File

@@ -32,11 +32,11 @@ import java.util.OptionalInt;
@Getter
@NoArgsConstructor
@Schema(
title = "Make an HTTP API request to a specified URL and store the response as an output.",
title = "Make an HTTP API request to a specified URL and store the response as output.",
description = """
This task makes an API call to a specified URL of an HTTP server and stores the response as an output.
This task makes an API call to a specified URL of an HTTP server and stores the response as output.
By default, the maximum length of the response is limited to 10MB, but it can be increased to at most 2GB by using the `options.maxContentLength` property.
Note that the response is added as an output of the task. If you need to process large API payloads, we recommend using the `Download` task instead."""
Note that the response is added as output to the task. If you need to process large API payloads, we recommend using the `Download` task instead."""
)
@Plugin(
examples = {

View File

@@ -21,7 +21,7 @@ import java.util.NoSuchElementException;
@Getter
@NoArgsConstructor
@Schema(
title = "Delete a KV pair."
title = "Deletes a KV pair."
)
@Plugin(
examples = {

View File

@@ -29,7 +29,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "Retrieve a value of a KV pair by a key."
title = "Gets value linked to a key."
)
@Plugin(
examples = {

View File

@@ -26,7 +26,7 @@ import java.util.function.Predicate;
@Getter
@NoArgsConstructor
@Schema(
title = "Fetch all keys matching a given KV pair prefix."
title = "Gets keys matching a given prefix."
)
@Plugin(
examples = {

View File

@@ -27,7 +27,7 @@ import java.util.List;
@Getter
@NoArgsConstructor
@Schema(
title = "Purge flow execution logs and trigger-related logs.",
title = "Purge flow execution and trigger logs.",
description = "This task can be used to purge flow execution and trigger logs for all flows, for a specific namespace, or for a specific flow."
)
@Plugin(

View File

@@ -24,7 +24,7 @@ import java.util.List;
@Getter
@NoArgsConstructor
@Schema(
title = "Publish Kestra metrics within an execution.",
title = "Publish metrics.",
description = "This task is useful to easily publish metrics for a flow."
)
@Plugin(
@@ -32,25 +32,20 @@ import java.util.List;
@Example(
full = true,
code = """
id: publish_metrics
id: publish_metric
namespace: company.team
tasks:
- id: random
type: io.kestra.plugin.core.output.OutputValues
values:
metric: "{{randomInt(0, 10)}}"
- id: metric
type: io.kestra.plugin.core.metric.Publish
metrics:
- type: timer
name: duration
value: PT10M
tags:
flow: "{{flow.id}}"
project: kestra
- type: counter
name: number
value: 42
tags:
flow: "{{flow.id}}"
project: kestra
- name: myMetric
type: counter
value: "{{outputs.random.values.metric}}"
"""
)
}

View File

@@ -17,7 +17,7 @@ import java.io.FileNotFoundException;
@Getter
@NoArgsConstructor
@Schema(
title = "Delete a state from the state store (Deprecated)."
title = "Delete a state from the state store."
)
@Plugin(
examples = {

View File

@@ -18,7 +18,7 @@ import java.util.Map;
@Getter
@NoArgsConstructor
@Schema(
title = "Get a state from the state store (Deprecated)."
title = "Get a state from the state store."
)
@Plugin(
examples = {

View File

@@ -20,7 +20,7 @@ import java.util.Map;
@Getter
@NoArgsConstructor
@Schema(
title = "Set a state in the state store (Deprecated).",
title = "Set a state in the state store.",
description = "Values will be merged: \n" +
"* If you provide a new key, the new key will be added.\n" +
"* If you provide an existing key, the previous key will be overwrite.\n" +

View File

@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
@Getter
@NoArgsConstructor
@Schema(
title = "Concat files from Kestras internal storage."
title = "Concat files from the internal storage."
)
@Plugin(
examples = {

View File

@@ -23,7 +23,7 @@ import java.util.NoSuchElementException;
@Getter
@NoArgsConstructor
@Schema(
title = "Delete a file from Kestra's internal storage."
title = "Delete a file from the Kestra's internal storage."
)
@Plugin(
examples = {

View File

@@ -23,8 +23,8 @@ import java.util.Map;
@Getter
@NoArgsConstructor
@Schema(
title = "Create temporary files (Deprecated).",
description = "This task is deprecated and replaced by `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. This task suffers from multiple limitations e.g. it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
title = "This task is deprecated and replaced by `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. ",
description = "This task was intended to be used along with the `WorkingDirectory` task to create temporary files. This task suffers from multiple limitations e.g. it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
)
@Deprecated
@Plugin(examples = {

View File

@@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets;
@Getter
@NoArgsConstructor
@Schema(
title = "Reverse a file from Kestra's internal storage, last line first."
title = "Reverse a file from the Kestra's internal storage, last line first."
)
@Plugin(
examples = {

View File

@@ -22,7 +22,7 @@ import java.net.URI;
@Getter
@NoArgsConstructor
@Schema(
title = "Get the size of a file from Kestra's internal storage."
title = "Get the size of a file from the Kestra's internal storage."
)
@Plugin(
examples = {

View File

@@ -23,7 +23,7 @@ import java.util.List;
@Getter
@NoArgsConstructor
@Schema(
title = "Split a file from Kestra's internal storage into multiple files."
title = "Split a file from the Kestra's internal storage into multiple files."
)
@Plugin(
examples = {

View File

@@ -24,7 +24,7 @@ import static io.kestra.core.utils.Rethrow.throwSupplier;
@Getter
@NoArgsConstructor
@Schema(
title = "Write data to a file in Kestras internal storage.",
title = "Write data to a file in the internal storage.",
description = "Use the Write task to store outputs as files internally and then reference the stored file for processing further down your flow."
)
@Plugin(

View File

@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
@Getter
@NoArgsConstructor
@Schema(
title = "Templatize task properties using Kestras Pebble templating.",
title = "Templatize a task.",
description = "This task's `spec` property allows you to fully templatize all task properties using Kestra's Pebble templating. This way, all task properties and their values can be dynamically rendered based on your custom inputs, variables, and outputs from other tasks."
)
@Plugin(

View File

@@ -31,13 +31,13 @@ import jakarta.validation.constraints.Size;
@Getter
@NoArgsConstructor
@Schema(
title = "Execute a flow from an API call triggered by a webhook.",
title = "Trigger a flow from a webhook.",
description = """
Webhook trigger allows you to create a unique URL that you can use to trigger a Kestra flow execution based on events in another application such as GitHub or Amazon EventBridge. In order to use that URL, you have to add a secret key that will secure your webhook URL.
The URL will then follow the following format: `https://{your_hostname}/api/v1/executions/webhook/{namespace}/{flowId}/{key}`. Replace the templated values according to your workflow setup.
The webhook URL accepts `GET`, `POST`, and `PUT` requests.
The webhook URL accepts `GET`, `POST` and `PUT` requests.
You can access the request body and headers sent by another application using the following template variables:
- `{{ trigger.body }}`
@@ -48,7 +48,7 @@ import jakarta.validation.constraints.Size;
- 200 if the webhook triggers an execution.
- 204 if the webhook cannot trigger an execution due to a lack of matching event conditions sent by other application.
A webhook trigger can have conditions, but it doesn't support conditions of type `MultipleCondition`."""
A webhook trigger can have conditions but it doesn't support conditions of type `MultipleCondition`."""
)
@Plugin(
examples = {

View File

@@ -71,7 +71,7 @@ class ClassPluginDocumentationTest {
// map
Map<String, Object> childInputMap = (Map<String, Object>) childInput.get("map");
assertThat((String) (childInputMap).get("type")).isEqualTo("object");
assertThat((Boolean) (childInputMap).get("$dynamic")).isTrue();
assertThat((Boolean) (childInputMap).get("$dynamic")).isEqualTo(true);
assertThat(((Map<String, String>) (childInputMap).get("additionalProperties")).get("type")).isEqualTo("number");
// output
@@ -151,18 +151,18 @@ class ClassPluginDocumentationTest {
List<Map<String, Object>> anyOf = (List<Map<String, Object>>) number.get("anyOf");
assertThat(anyOf).hasSize(2);
assertThat(anyOf.getFirst().get("type")).isEqualTo("integer");
assertThat((Boolean) anyOf.getFirst().get("$dynamic")).isTrue();
assertThat(anyOf.getFirst().get("$dynamic")).isEqualTo(true);
assertThat(anyOf.get(1).get("type")).isEqualTo("string");
// assertThat(anyOf.get(1).get("pattern"), is(".*{{.*}}.*"));
Map<String, Object> withDefault = (Map<String, Object>) properties.get("withDefault");
assertThat(withDefault.get("type")).isEqualTo("string");
assertThat(withDefault.get("default")).isEqualTo("Default Value");
assertThat((Boolean) withDefault.get("$dynamic")).isTrue();
assertThat(withDefault.get("$dynamic")).isEqualTo(true);
Map<String, Object> internalStorageURI = (Map<String, Object>) properties.get("uri");
assertThat(internalStorageURI.get("type")).isEqualTo("string");
assertThat((Boolean) internalStorageURI.get("$internalStorageURI")).isTrue();
assertThat(internalStorageURI.get("$internalStorageURI")).isEqualTo(true);
}));
}
}

View File

@@ -29,7 +29,7 @@ class ExecutionTest {
.withState(State.Type.RUNNING)
))
.build()
)).isTrue();
)).isEqualTo(true);
}
@Test
@@ -44,7 +44,7 @@ class ExecutionTest {
assertThat(execution.hasTaskRunJoinable(TASK_RUN
.state(new State())
.build()
)).isFalse();
)).isEqualTo(false);
}
@Test
@@ -61,7 +61,7 @@ class ExecutionTest {
assertThat(execution.hasTaskRunJoinable(TASK_RUN
.state(new State(State.Type.RUNNING, new State()))
.build()
)).isFalse();
)).isEqualTo(false);
}
@Test
@@ -81,7 +81,7 @@ class ExecutionTest {
.withState(State.Type.RUNNING)
))
.build()
)).isFalse();
)).isEqualTo(false);
}
@Test
@@ -102,7 +102,7 @@ class ExecutionTest {
.withState(State.Type.SUCCESS)
))
.build()
)).isTrue();
)).isEqualTo(true);
}
@Test
@@ -125,7 +125,7 @@ class ExecutionTest {
.withState(State.Type.RUNNING)
))
.build()
)).isTrue();
)).isEqualTo(true);
}
@Test

View File

@@ -33,7 +33,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/duplicate.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("Duplicate task id with name [date, listen]");
@@ -45,7 +45,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/duplicate-inputs.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("Duplicate input with name [first_input]");
@@ -56,7 +56,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/duplicate-parallel.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("Duplicate task id with name [t3]");
@@ -68,7 +68,7 @@ class FlowTest {
Flow updated = this.parse("flows/invalids/duplicate.yaml");
Optional<ConstraintViolationException> validate = flow.validateUpdate(updated);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("Illegal flow id update");
@@ -80,7 +80,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/switch-invalid.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("impossible: No task defined, neither cases or default have any tasks");
@@ -91,7 +91,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/workingdirectory-invalid.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).contains("impossible: Only runnable tasks are allowed as children of a WorkingDirectory task");
@@ -102,7 +102,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/workingdirectory-no-tasks.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(2);
assertThat(validate.get().getMessage()).contains("impossible: The 'tasks' property cannot be empty");
@@ -129,7 +129,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/worker-group.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
assertThat(validate.get().getMessage()).isEqualTo("tasks[0].workerGroup: Worker Group is an Enterprise Edition functionality\n");
@@ -148,7 +148,7 @@ class FlowTest {
Flow flow = this.parse("flows/invalids/inputs-validation.yaml");
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
assertThat(validate.isPresent()).isTrue();
assertThat(validate.isPresent()).isEqualTo(true);
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(2);
assertThat(validate.get().getMessage()).contains("file: no `defaults` can be set for inputs of type 'FILE'");
@@ -164,13 +164,13 @@ class FlowTest {
triggerInputsReverseOrder.put("c", "d");
triggerInputsReverseOrder.put("a", "b");
Flow flowABis = baseFlow().toBuilder().revision(2).triggers(List.of(io.kestra.plugin.core.trigger.Flow.builder().inputs(triggerInputsReverseOrder).build())).build();
assertThat(flowA.equalsWithoutRevision(flowABis)).isTrue();
assertThat(flowA.equalsWithoutRevision(flowABis)).isEqualTo(true);
Flow flowB = baseFlow().toBuilder().id("b").build();
assertThat(flowA.equalsWithoutRevision(flowB)).isFalse();
assertThat(flowA.equalsWithoutRevision(flowB)).isEqualTo(false);
Flow flowAnotherTenant = baseFlow().toBuilder().tenantId("b").build();
assertThat(flowA.equalsWithoutRevision(flowAnotherTenant)).isFalse();
assertThat(flowA.equalsWithoutRevision(flowAnotherTenant)).isEqualTo(false);
}
private static Flow baseFlow() {

View File

@@ -134,7 +134,7 @@ class FlowWithSourceTest {
String expectedSource = flow.sourceOrGenerateIfNull() + " # additional comment";
FlowWithSource of = FlowWithSource.of(flow, expectedSource);
assertThat(of.equalsWithoutRevision(flow)).isTrue();
assertThat(of.equalsWithoutRevision(flow)).isEqualTo(true);
assertThat(of.getSource()).isEqualTo(expectedSource);
}
}

View File

@@ -53,7 +53,7 @@ class FlowGraphTest {
assertThat(flowGraph.getNodes().size()).isEqualTo(5);
assertThat(flowGraph.getEdges().size()).isEqualTo(4);
assertThat(flowGraph.getClusters().size()).isZero();
assertThat(flowGraph.getClusters().size()).isEqualTo(0);
assertThat(((AbstractGraphTask) flowGraph.getNodes().get(2)).getTask().getId()).isEqualTo("date");
assertThat(((AbstractGraphTask) flowGraph.getNodes().get(2)).getRelationType()).isEqualTo(RelationType.SEQUENTIAL);
@@ -228,7 +228,7 @@ class FlowGraphTest {
assertThat(flowGraph.getEdges().size()).isEqualTo(5);
assertThat(flowGraph.getClusters().size()).isEqualTo(1);
AbstractGraph triggerGraph = flowGraph.getNodes().stream().filter(e -> e instanceof GraphTrigger).findFirst().orElseThrow();
assertThat(((GraphTrigger) triggerGraph).getTrigger().getDisabled()).isTrue();
assertThat(((GraphTrigger) triggerGraph).getTrigger().getDisabled()).isEqualTo(true);
}
@Test

View File

@@ -50,17 +50,17 @@ class ScriptServiceTest {
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, false);
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
assertThat(matcher.matches()).isTrue();
assertThat(matcher.matches()).isEqualTo(true);
Path absoluteLocalFilePath = Path.of(matcher.group(1));
localFile = absoluteLocalFilePath.toFile();
assertThat(localFile.exists()).isTrue();
assertThat(localFile.exists()).isEqualTo(true);
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, true);
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
assertThat(matcher.matches()).isTrue();
assertThat(matcher.matches()).isEqualTo(true);
String relativePath = matcher.group(1);
assertThat(relativePath).doesNotStartWith("/");
assertThat(runContext.workingDir().resolve(Path.of(relativePath)).toFile().exists()).isTrue();
assertThat(runContext.workingDir().resolve(Path.of(relativePath)).toFile().exists()).isEqualTo(true);
} finally {
localFile.delete();
path.toFile().delete();
@@ -94,18 +94,18 @@ class ScriptServiceTest {
assertThat(commands.getFirst(), not(is("my command with an internal storage file: " + internalStorageUri)));
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
assertThat(matcher.matches()).isTrue();
assertThat(matcher.matches()).isEqualTo(true);
File file = Path.of(matcher.group(1)).toFile();
assertThat(file.exists()).isTrue();
assertThat(file.exists()).isEqualTo(true);
filesToDelete.add(file);
assertThat(commands.get(1)).isEqualTo("my command with some additional var usage: " + wdir);
commands = ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), List.of("my command with an internal storage file: " + internalStorageUri), true);
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
assertThat(matcher.matches()).isTrue();
assertThat(matcher.matches()).isEqualTo(true);
file = runContext.workingDir().resolve(Path.of(matcher.group(1))).toFile();
assertThat(file.exists()).isTrue();
assertThat(file.exists()).isEqualTo(true);
filesToDelete.add(file);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException(e);

View File

@@ -123,14 +123,14 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(window.getResults().get("a")).isTrue();
assertThat(window.getResults().get("a")).isEqualTo(true);
Thread.sleep(2005);
MultipleConditionWindow next = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(next.getStart().format(DateTimeFormatter.ISO_DATE_TIME)).isNotEqualTo(window.getStart().format(DateTimeFormatter.ISO_DATE_TIME));
assertThat(next.getResults().containsKey("a")).isFalse();
assertThat(next.getResults().containsKey("a")).isEqualTo(false);
}
@Test
@@ -144,10 +144,10 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(window.getResults().get("a")).isTrue();
assertThat(window.getResults().get("a")).isEqualTo(true);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
assertThat(expired.size()).isZero();
assertThat(expired.size()).isEqualTo(0);
Thread.sleep(2005);
@@ -166,10 +166,10 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(window.getResults().get("a")).isTrue();
assertThat(window.getResults().get("a")).isEqualTo(true);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
assertThat(expired.size()).isZero();
assertThat(expired.size()).isEqualTo(0);
Thread.sleep(2005);
@@ -206,10 +206,10 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(window.getResults().get("a")).isTrue();
assertThat(window.getResults().get("a")).isEqualTo(true);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
assertThat(expired.size()).isZero();
assertThat(expired.size()).isEqualTo(0);
}
@Test
@@ -223,10 +223,10 @@ public abstract class AbstractMultipleConditionStorageTest {
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
assertThat(window.getResults().get("a")).isTrue();
assertThat(window.getResults().get("a")).isEqualTo(true);
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
assertThat(expired.size()).isZero();
assertThat(expired.size()).isEqualTo(0);
}
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) {

View File

@@ -297,7 +297,7 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.save(ExecutionFixture.EXECUTION_1);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
full.ifPresent(current -> {
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId());
@@ -309,12 +309,12 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.save(ExecutionFixture.EXECUTION_1);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
executionRepository.purge(ExecutionFixture.EXECUTION_1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isFalse();
assertThat(full.isPresent()).isEqualTo(false);
}
@Test
@@ -322,12 +322,12 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.save(ExecutionFixture.EXECUTION_1);
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
executionRepository.delete(ExecutionFixture.EXECUTION_1);
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent()).isFalse();
assertThat(full.isPresent()).isEqualTo(false);
}
@Test
@@ -720,7 +720,7 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.update(updated);
Optional<Execution> validation = executionRepository.findById(null, updated.getId());
assertThat(validation.isPresent()).isTrue();
assertThat(validation.isPresent()).isEqualTo(true);
assertThat(validation.get().getLabels().size()).isEqualTo(1);
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
}
@@ -734,7 +734,7 @@ public abstract class AbstractExecutionRepositoryTest {
executionRepository.save(latest);
Optional<Execution> result = executionRepository.findLatestForStates(null, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
assertThat(result.isPresent()).isTrue();
assertThat(result.isPresent()).isEqualTo(true);
assertThat(result.get().getId()).isEqualTo(latest.getId());
}

View File

@@ -134,6 +134,6 @@ public abstract class AbstractExecutionServiceTest {
null
);
assertThat(purge.getExecutionsCount()).isZero();
assertThat(purge.getExecutionsCount()).isEqualTo(0);
}
}

View File

@@ -82,11 +82,11 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow));
try {
Optional<Flow> full = flowRepository.findById(null, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
} finally {
deleteFlow(flow);
}
@@ -100,11 +100,11 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow));
try {
Optional<Flow> full = flowRepository.findByIdWithoutAcl(null, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
assertThat(full.get().getRevision()).isEqualTo(1);
full = flowRepository.findByIdWithoutAcl(null, flow.getNamespace(), flow.getId(), Optional.empty());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
} finally {
deleteFlow(flow);
}
@@ -120,7 +120,7 @@ public abstract class AbstractFlowRepositoryTest {
try {
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(null, flow.getNamespace(), flow.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
full.ifPresent(current -> {
assertThat(full.get().getRevision()).isEqualTo(1);
@@ -273,7 +273,7 @@ public abstract class AbstractFlowRepositoryTest {
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(null, save.getNamespace(), save.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(null, save.getNamespace(), save.getId()).isPresent()).isEqualTo(true);
} catch (Throwable e) {
deleteFlow(save);
throw e;
@@ -281,8 +281,8 @@ public abstract class AbstractFlowRepositoryTest {
Flow delete = flowRepository.delete(save);
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(false);
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isEqualTo(true);
List<FlowWithSource> revisions = flowRepository.findRevisions(null, flow.getNamespace(), flow.getId());
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
@@ -302,7 +302,7 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
Flow update = Flow.builder()
.id(IdUtils.create())
@@ -339,7 +339,7 @@ public abstract class AbstractFlowRepositoryTest {
flow = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
Flow update = Flow.builder()
.id(flowId)
@@ -377,7 +377,7 @@ public abstract class AbstractFlowRepositoryTest {
Flow save = flowRepository.create(GenericFlow.of(flow));
try {
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
} finally {
deleteFlow(save);
}
@@ -420,8 +420,8 @@ public abstract class AbstractFlowRepositoryTest {
try {
Optional<Flow> found = flowRepository.findById(null, flow.getNamespace(), flow.getId());
assertThat(found.isPresent()).isTrue();
assertThat(found.get() instanceof FlowWithException).isTrue();
assertThat(found.isPresent()).isEqualTo(true);
assertThat(found.get() instanceof FlowWithException).isEqualTo(true);
assertThat(((FlowWithException) found.get()).getException()).contains("Templates are disabled");
} finally {
deleteFlow(flow);

View File

@@ -44,7 +44,7 @@ public abstract class AbstractLogRepositoryTest {
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, null, null);
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
LogEntry save = logRepository.save(builder.build());
@@ -63,7 +63,7 @@ public abstract class AbstractLogRepositoryTest {
.value(Instant.now().minus(1, ChronoUnit.HOURS))
.build());
find = logRepository.find(Pageable.UNPAGED, "doe", filters);
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
find = logRepository.find(Pageable.UNPAGED, null, null);
assertThat(find.size()).isEqualTo(1);
@@ -101,7 +101,7 @@ public abstract class AbstractLogRepositoryTest {
assertThat(countDeleted).isEqualTo(1);
list = logRepository.findByExecutionIdAndTaskId(null, save.getExecutionId(), save.getTaskId(), null);
assertThat(list.size()).isZero();
assertThat(list.size()).isEqualTo(0);
}
@Test
@@ -147,7 +147,7 @@ public abstract class AbstractLogRepositoryTest {
find = logRepository.findByExecutionIdAndTaskRunId(null, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
}
@Test
@@ -158,14 +158,14 @@ public abstract class AbstractLogRepositoryTest {
logRepository.deleteByQuery(null, log1.getExecutionId(), null, (String) null, null, null);
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
logRepository.save(log1);
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
}
@Test
@@ -176,14 +176,14 @@ public abstract class AbstractLogRepositoryTest {
logRepository.deleteByQuery(null, log1.getExecutionId(), null, (String) null, null, null);
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
logRepository.save(log1);
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", null);
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
assertThat(find.size()).isZero();
assertThat(find.size()).isEqualTo(0);
}
@Test

View File

@@ -24,13 +24,13 @@ public abstract class AbstractSettingRepositoryTest {
.build();
Optional<Setting> find = settingRepository.findByKey(setting.getKey());
assertThat(find.isPresent()).isFalse();
assertThat(find.isPresent()).isEqualTo(false);
Setting save = settingRepository.save(setting);
find = settingRepository.findByKey(save.getKey());
assertThat(find.isPresent()).isTrue();
assertThat(find.isPresent()).isEqualTo(true);
assertThat(find.get().getValue()).isEqualTo(save.getValue());
List<Setting> all = settingRepository.findAll();
@@ -41,9 +41,9 @@ public abstract class AbstractSettingRepositoryTest {
assertThat(delete.getValue()).isEqualTo(setting.getValue());
all = settingRepository.findAll();
assertThat(all.size()).isZero();
assertThat(all.size()).isEqualTo(0);
find = settingRepository.findByKey(setting.getKey());
assertThat(find.isPresent()).isFalse();
assertThat(find.isPresent()).isEqualTo(false);
}
}

View File

@@ -50,11 +50,11 @@ public abstract class AbstractTemplateRepositoryTest {
templateRepository.create(template);
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
assertThat(full.get().getId()).isEqualTo(template.getId());
full = templateRepository.findById(null, template.getNamespace(), template.getId());
assertThat(full.isPresent()).isTrue();
assertThat(full.isPresent()).isEqualTo(true);
assertThat(full.get().getId()).isEqualTo(template.getId());
}
@@ -133,7 +133,7 @@ public abstract class AbstractTemplateRepositoryTest {
Template save = templateRepository.create(template);
templateRepository.delete(save);
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isEqualTo(false);
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);

View File

@@ -35,20 +35,20 @@ public abstract class AbstractTriggerRepositoryTest {
Trigger.TriggerBuilder<?, ?> builder = trigger();
Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
assertThat(findLast.isPresent()).isFalse();
assertThat(findLast.isPresent()).isEqualTo(false);
Trigger save = triggerRepository.save(builder.build());
findLast = triggerRepository.findLast(save);
assertThat(findLast.isPresent()).isTrue();
assertThat(findLast.isPresent()).isEqualTo(true);
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
save = triggerRepository.save(builder.executionId(IdUtils.create()).build());
findLast = triggerRepository.findLast(save);
assertThat(findLast.isPresent()).isTrue();
assertThat(findLast.isPresent()).isEqualTo(true);
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());

View File

@@ -305,9 +305,9 @@ public abstract class AbstractRunnerTest {
}
@Test
@LoadFlows({"flows/valids/pause-duration-from-input.yaml"})
public void pauseRunDurationFromInput() throws Exception {
pauseTest.runDurationFromInput(runnerUtils);
@LoadFlows({"flows/valids/pause-delay-from-input.yaml"})
public void pauseRunDelayFromInput() throws Exception {
pauseTest.runDelayFromInput(runnerUtils);
}
@Test

View File

@@ -54,7 +54,7 @@ public class ChangeStateTestCase {
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
executionQueue.emit(markedAs);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(lastExecution.get().getTaskRunList()).hasSize(2);
@@ -80,7 +80,7 @@ public class ChangeStateTestCase {
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
// assert on the subflow
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(latch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
receivedExecutions.blockLast();
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(lastExecution.get().getTaskRunList()).hasSize(1);
@@ -103,7 +103,7 @@ public class ChangeStateTestCase {
executionQueue.emit(markedAs);
// assert for the parent flow
assertThat(parentLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(parentLatch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
receivedExecutions.blockLast();
assertThat(lastParentExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED); // FIXME should be success but it's FAILED on unit tests
assertThat(lastParentExecution.get().getTaskRunList()).hasSize(1);

View File

@@ -40,7 +40,7 @@ public class FlowConcurrencyCaseTest {
Execution execution1 = runnerUtils.runOneUntilRunning(null, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(null, "io.kestra.tests", "flow-concurrency-cancel");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution1.getState().isRunning()).isEqualTo(true);
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
CountDownLatch latch1 = new CountDownLatch(1);
@@ -63,7 +63,7 @@ public class FlowConcurrencyCaseTest {
Execution execution1 = runnerUtils.runOneUntilRunning(null, "io.kestra.tests", "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(null, "io.kestra.tests", "flow-concurrency-fail");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution1.getState().isRunning()).isEqualTo(true);
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.FAILED);
CountDownLatch latch1 = new CountDownLatch(1);
@@ -90,7 +90,7 @@ public class FlowConcurrencyCaseTest {
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution1.getState().isRunning()).isEqualTo(true);
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
@@ -170,7 +170,7 @@ public class FlowConcurrencyCaseTest {
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isPaused()).isTrue();
assertThat(execution1.getState().isPaused()).isEqualTo(true);
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertTrue(firstExecutionLatch.await(10, TimeUnit.SECONDS));
@@ -222,7 +222,7 @@ public class FlowConcurrencyCaseTest {
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isPaused()).isTrue();
assertThat(execution1.getState().isPaused()).isEqualTo(true);
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
assertTrue(firstExecLatch.await(10, TimeUnit.SECONDS));

View File

@@ -51,15 +51,15 @@ abstract public class FlowListenersTest {
// initial state
wait(ref, () -> {
assertThat(count.get()).isZero();
assertThat(flowListenersService.flows().size()).isZero();
assertThat(count.get()).isEqualTo(0);
assertThat(flowListenersService.flows().size()).isEqualTo(0);
});
// resend on startup done for kafka
if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
wait(ref, () -> {
assertThat(count.get()).isZero();
assertThat(flowListenersService.flows().size()).isZero();
assertThat(count.get()).isEqualTo(0);
assertThat(flowListenersService.flows().size()).isEqualTo(0);
});
}

View File

@@ -119,7 +119,7 @@ public class InputsTest {
HashMap<String, Object> inputsWithMissingOptionalInput = new HashMap<>(inputs);
inputsWithMissingOptionalInput.remove("bool");
assertThat(typedInputs(inputsWithMissingOptionalInput).containsKey("bool")).isTrue();
assertThat(typedInputs(inputsWithMissingOptionalInput).containsKey("bool")).isEqualTo(true);
assertThat(typedInputs(inputsWithMissingOptionalInput).get("bool")).isNull();
}
@@ -132,7 +132,7 @@ public class InputsTest {
assertThat(typeds.get("string")).isEqualTo("myString");
assertThat(typeds.get("int")).isEqualTo(42);
assertThat(typeds.get("float")).isEqualTo(42.42F);
assertThat((Boolean) typeds.get("bool")).isFalse();
assertThat(typeds.get("bool")).isEqualTo(false);
assertThat(typeds.get("instant")).isEqualTo(Instant.parse("2019-10-06T18:27:49Z"));
assertThat(typeds.get("instantDefaults")).isEqualTo(Instant.parse("2013-08-09T14:19:00Z"));
assertThat(typeds.get("date")).isEqualTo(LocalDate.parse("2019-10-06"));
@@ -143,7 +143,7 @@ public class InputsTest {
assertThat(typeds.get("json")).isEqualTo(Map.of("a", "b"));
assertThat(typeds.get("uri")).isEqualTo("https://www.google.com");
assertThat(((Map<String, Object>) typeds.get("nested")).get("string")).isEqualTo("a string");
assertThat((Boolean) ((Map<String, Object>) typeds.get("nested")).get("bool")).isTrue();
assertThat(((Map<String, Object>) typeds.get("nested")).get("bool")).isEqualTo(true);
assertThat(((Map<String, Object>) ((Map<String, Object>) typeds.get("nested")).get("more")).get("int")).isEqualTo(123);
assertThat(typeds.get("validatedString")).isEqualTo("A123");
assertThat(typeds.get("validatedInt")).isEqualTo(12);
@@ -173,7 +173,7 @@ public class InputsTest {
assertThat(typeds.get("enum")).isEqualTo("ENUM_VALUE");
assertThat(typeds.get("int")).isEqualTo(42);
assertThat(typeds.get("float")).isEqualTo(42.42F);
assertThat((Boolean) typeds.get("bool")).isFalse();
assertThat(typeds.get("bool")).isEqualTo(false);
}
@Test
@@ -345,7 +345,7 @@ public class InputsTest {
Map<String, Object> typeds = typedInputs(map);
assertThat(typeds.get("json")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) typeds.get("json")).size()).isZero();
assertThat(((Map<?, ?>) typeds.get("json")).size()).isEqualTo(0);
}
@Test
@@ -366,7 +366,7 @@ public class InputsTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getInputs().get("json")).isInstanceOf(Map.class);
assertThat(((Map<?, ?>) execution.getInputs().get("json")).size()).isZero();
assertThat(((Map<?, ?>) execution.getInputs().get("json")).size()).isEqualTo(0);
assertThat((String) execution.findTaskRunsByTaskId("jsonOutput").getFirst().getOutputs().get("value")).isEqualTo("{}");
}

View File

@@ -52,7 +52,7 @@ class LocalWorkingDirTest {
String workingDirId = IdUtils.create();
TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId));
Path tempFile = workingDirectory.createTempFile();
assertThat(tempFile.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
assertThat(tempFile.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
assertThat(workingDirectory.getAllCreatedTempFiles().size()).isEqualTo(1);
}
@@ -62,8 +62,8 @@ class LocalWorkingDirTest {
TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId));
Path path = workingDirectory.createFile("folder/file.txt");
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
assertThat(path.toFile().getAbsolutePath().endsWith("/folder/file.txt")).isTrue();
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
assertThat(path.toFile().getAbsolutePath().endsWith("/folder/file.txt")).isEqualTo(true);
assertThat(workingDirectory.getAllCreatedFiles().size()).isEqualTo(1);
}
@@ -111,13 +111,13 @@ class LocalWorkingDirTest {
workingDir.cleanup();
// Then
assertThat(file.toFile().exists()).isFalse();
assertThat(firtPath.toFile().exists()).isFalse();
assertThat(file.toFile().exists()).isEqualTo(false);
assertThat(firtPath.toFile().exists()).isEqualTo(false);
// When
Path secondPath = workingDir.path(true);
// Then
assertThat(secondPath.toFile().exists()).isTrue();
assertThat(secondPath.toFile().exists()).isEqualTo(true);
assertThat(firtPath).isEqualTo(secondPath);
}

View File

@@ -59,7 +59,7 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
@Test
@LoadFlows({"flows/valids/inputs.yaml"})
void secretInput() {
assertThat(flowRepository.findById(null, "io.kestra.tests", "inputs").isPresent()).isTrue();
assertThat(flowRepository.findById(null, "io.kestra.tests", "inputs").isPresent()).isEqualTo(true);
Flow flow = flowRepository.findById(null, "io.kestra.tests", "inputs").get();
Execution execution = Execution.builder()

View File

@@ -39,6 +39,6 @@ public class NullOutputTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getTaskRunList().getFirst().getOutputs()).hasSize(1);
assertThat(execution.getTaskRunList().getFirst().getOutputs().containsKey("value")).isTrue();
assertThat(execution.getTaskRunList().getFirst().getOutputs().containsKey("value")).isEqualTo(true);
}
}

View File

@@ -23,6 +23,6 @@ class WorkingDirFactoryTest {
// When
Path path = workingDirectory.path();
// Then
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
}
}

View File

@@ -21,6 +21,6 @@ class EndsWithFilterTest {
variableRenderer.render("{{ \"Hello World\" | endsWith(\"World\") }}", Map.of())
);
assertThat(render).isTrue();
assertThat(render).isEqualTo(true);
}
}

View File

@@ -19,6 +19,6 @@ public class Md5FilterTest {
void out() throws IllegalVariableEvaluationException {
String render = variableRenderer.render("{{ \"hello\" | md5 }}", Map.of());
assertThat(render.equals("hello")).isFalse();
assertThat(render.equals("hello")).isEqualTo(false);
}
}

View File

@@ -19,6 +19,6 @@ public class Sha1FilterTest {
void out() throws IllegalVariableEvaluationException {
String render = variableRenderer.render("{{ \"hello\" | sha1 }}", Map.of());
assertThat(render.equals("hello")).isFalse();
assertThat(render.equals("hello")).isEqualTo(false);
}
}

View File

@@ -19,6 +19,6 @@ public class Sha512FilterTest {
void out() throws IllegalVariableEvaluationException {
String render = variableRenderer.render("{{ \"hello\" | sha512 }}", Map.of());
assertThat(render.equals("hello")).isFalse();
assertThat(render.equals("hello")).isEqualTo(false);
}
}

View File

@@ -21,6 +21,6 @@ class StartsWithFilterTest {
variableRenderer.render("{{ \"Hello World\" | startsWith(\"Hello\") }}", Map.of())
);
assertThat(render).isTrue();
assertThat(render).isEqualTo(true);
}
}

Some files were not shown because too many files have changed in this diff Show More