mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
40 Commits
feat/embed
...
fix/logs-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0baedf7f1 | ||
|
|
a06421dd84 | ||
|
|
de6fcab785 | ||
|
|
b2f68a7b97 | ||
|
|
01cb30f933 | ||
|
|
d2cda63cfa | ||
|
|
f89187db6a | ||
|
|
8e4fe892e9 | ||
|
|
eb13dce0ff | ||
|
|
a14518b810 | ||
|
|
c64f15a035 | ||
|
|
f79541616e | ||
|
|
cb6a6bfd91 | ||
|
|
b2ae2ff6f7 | ||
|
|
a7836ca673 | ||
|
|
7bc60a1056 | ||
|
|
09943a1e7b | ||
|
|
8b6af7a808 | ||
|
|
e74e7ff8e5 | ||
|
|
9adf3a5444 | ||
|
|
c069b2fbb3 | ||
|
|
534b7f4ec7 | ||
|
|
7b1ee4a9e0 | ||
|
|
38a9ebcbef | ||
|
|
eea47c6e40 | ||
|
|
e1c4ae22f2 | ||
|
|
b7861a139e | ||
|
|
954d64ecaa | ||
|
|
f61ba36023 | ||
|
|
fbd989ccab | ||
|
|
12affd4b4b | ||
|
|
b75730a0ca | ||
|
|
4170615765 | ||
|
|
5cfb6aa1f5 | ||
|
|
41d660e18e | ||
|
|
8af4f1928a | ||
|
|
1488caccc7 | ||
|
|
85fc48963f | ||
|
|
b706dec9d2 | ||
|
|
ceac4d38f9 |
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -62,7 +62,7 @@ jobs:
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
run: ./gradlew testClasses -x :ui:installFrontend -x :ui:assembleFrontend
|
||||
run: ./gradlew testClasses -x :ui:assembleFrontend
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
|
||||
32
.github/workflows/vulnerabilities-check.yml
vendored
32
.github/workflows/vulnerabilities-check.yml
vendored
@@ -8,6 +8,9 @@ on:
|
||||
env:
|
||||
JAVA_VERSION: '21'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
dependency-check:
|
||||
name: Dependency Check
|
||||
@@ -57,6 +60,10 @@ jobs:
|
||||
develop-image-check:
|
||||
name: Image Check (develop)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
@@ -83,13 +90,25 @@ jobs:
|
||||
uses: aquasecurity/trivy-action@0.30.0
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: table
|
||||
format: 'template'
|
||||
template: '@/contrib/sarif.tpl'
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
|
||||
latest-image-check:
|
||||
name: Image Check (latest)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
@@ -118,4 +137,11 @@ jobs:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
scanners: vuln
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -31,6 +31,8 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
|
||||
@@ -129,7 +129,7 @@ class FlowCreateOrUpdateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ class FlowDotCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("\"root.date\"[shape=box];");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ class FlowExpandCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).isEqualTo("id: include\n" +
|
||||
"namespace: io.kestra.cli\n" +
|
||||
"\n" +
|
||||
|
||||
@@ -55,7 +55,7 @@ class FlowExportCommandTest {
|
||||
};
|
||||
PicocliRunner.call(FlowExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/flows.zip");
|
||||
assertThat(file.exists()).isEqualTo(true);
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
|
||||
// When launching the test in a suite, there is 4 flows but when lauching individualy there is only 3
|
||||
|
||||
@@ -169,7 +169,7 @@ class FlowUpdatesCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ class FlowValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
|
||||
}
|
||||
}
|
||||
@@ -39,7 +39,7 @@ class FlowValidateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("✓ - system / warning");
|
||||
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
|
||||
assertThat(out.toString()).contains("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
|
||||
|
||||
@@ -19,7 +19,7 @@ class FlowNamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra flow namespace");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +162,7 @@ class FlowNamespaceUpdateCommandTest {
|
||||
};
|
||||
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("1 flow(s)");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ class NamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ class NamespaceFilesCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace files");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ class KvCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra namespace kv");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ class PluginDocCommandTest {
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
|
||||
var directory = files.getFirst().toFile();
|
||||
assertThat(directory.isDirectory()).isEqualTo(true);
|
||||
assertThat(directory.isDirectory()).isTrue();
|
||||
assertThat(directory.listFiles().length).isEqualTo(3);
|
||||
|
||||
var readme = directory.toPath().resolve("index.md");
|
||||
|
||||
@@ -42,7 +42,7 @@ class ReindexCommandTest {
|
||||
"flow",
|
||||
};
|
||||
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
// in local it reindex 3 flows and in CI 4 for an unknown reason
|
||||
assertThat(out.toString()).contains("Successfully reindex");
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ class DatabaseCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys database");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ class StateStoreCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra sys state-store");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ class StateStoreMigrateCommandTest {
|
||||
oldStateStoreUri,
|
||||
new ByteArrayInputStream("my-value".getBytes())
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isEqualTo(true);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
@@ -67,9 +67,9 @@ class StateStoreMigrateCommandTest {
|
||||
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
|
||||
|
||||
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isEqualTo(false);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ class TemplateExportCommandTest {
|
||||
};
|
||||
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
|
||||
File file = new File("/tmp/templates.zip");
|
||||
assertThat(file.exists()).isEqualTo(true);
|
||||
assertThat(file.exists()).isTrue();
|
||||
ZipFile zipFile = new ZipFile(file);
|
||||
assertThat(zipFile.stream().count()).isEqualTo(3L);
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class TemplateValidateCommandTest {
|
||||
class TemplateValidateCommandTest {
|
||||
@Test
|
||||
void runLocal() {
|
||||
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
|
||||
|
||||
@@ -19,7 +19,7 @@ class TemplateNamespaceCommandTest {
|
||||
String[] args = {};
|
||||
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
|
||||
|
||||
assertThat(call).isEqualTo(0);
|
||||
assertThat(call).isZero();
|
||||
assertThat(out.toString()).contains("Usage: kestra template namespace");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ class DeleteConfigurationApplicationListenersTest {
|
||||
);
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(mapPropertySource, Environment.CLI, Environment.TEST)) {
|
||||
assertThat(tempFile.exists()).isEqualTo(false);
|
||||
assertThat(tempFile.exists()).isFalse();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -91,6 +91,8 @@ public class MetricRegistry {
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION = "Trigger lock duration waiting for an execution to be terminated";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
|
||||
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
|
||||
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
|
||||
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
|
||||
|
||||
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
|
||||
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
|
||||
|
||||
@@ -39,30 +39,6 @@ import java.util.Optional;
|
||||
@NoArgsConstructor
|
||||
@JsonDeserialize
|
||||
public class GenericFlow extends AbstractFlow implements HasUID {
|
||||
|
||||
private String id;
|
||||
|
||||
private String namespace;
|
||||
|
||||
private Integer revision;
|
||||
|
||||
private List<Input<?>> inputs;
|
||||
|
||||
private Map<String, Object> variables;
|
||||
|
||||
@Builder.Default
|
||||
private boolean disabled = false;
|
||||
|
||||
@Builder.Default
|
||||
private boolean deleted = false;
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
|
||||
private List<Label> labels;
|
||||
|
||||
private String tenantId;
|
||||
|
||||
private String source;
|
||||
|
||||
private List<SLA> sla;
|
||||
@@ -84,7 +60,6 @@ public class GenericFlow extends AbstractFlow implements HasUID {
|
||||
* @return a new {@link GenericFlow}
|
||||
* @throws DeserializationException if source cannot be deserialized.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static GenericFlow of(final FlowInterface flow) throws DeserializationException {
|
||||
return fromYaml(flow.getTenantId(), flow.sourceOrGenerateIfNull());
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ public class State {
|
||||
|
||||
public State withState(Type state) {
|
||||
if (this.current == state) {
|
||||
log.warn("Can't change state, already " + current);
|
||||
log.warn("Can't change state, already {}", current);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.models.namespaces;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
@@ -11,6 +12,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Schema(name = "NamespaceLight")
|
||||
public class Namespace implements NamespaceInterface {
|
||||
@NotNull
|
||||
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
|
||||
|
||||
@@ -2,38 +2,41 @@ package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
abstract public class PluginUtilsService {
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
|
||||
private static final Pattern PATTERN = Pattern.compile("^::(\\{.*})::$");
|
||||
|
||||
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
|
||||
|
||||
public static Map<String, String> createOutputFiles(
|
||||
@@ -52,12 +55,12 @@ abstract public class PluginUtilsService {
|
||||
) throws IOException {
|
||||
List<String> outputs = new ArrayList<>();
|
||||
|
||||
if (outputFiles != null && outputFiles.size() > 0) {
|
||||
if (outputFiles != null && !outputFiles.isEmpty()) {
|
||||
outputs.addAll(outputFiles);
|
||||
}
|
||||
|
||||
Map<String, String> result = new HashMap<>();
|
||||
if (outputs.size() > 0) {
|
||||
if (!outputs.isEmpty()) {
|
||||
outputs
|
||||
.forEach(throwConsumer(s -> {
|
||||
PluginUtilsService.validFilename(s);
|
||||
@@ -168,64 +171,27 @@ abstract public class PluginUtilsService {
|
||||
}
|
||||
|
||||
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
|
||||
Matcher m = PATTERN.matcher(line);
|
||||
|
||||
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
|
||||
|
||||
Map<String, Object> outputs = new HashMap<>();
|
||||
|
||||
if (m.find()) {
|
||||
try {
|
||||
BashCommand<?> bashCommand = MAPPER.readValue(m.group(1), BashCommand.class);
|
||||
|
||||
if (bashCommand.getOutputs() != null) {
|
||||
outputs.putAll(bashCommand.getOutputs());
|
||||
}
|
||||
|
||||
if (bashCommand.getMetrics() != null) {
|
||||
bashCommand.getMetrics().forEach(runContext::metric);
|
||||
}
|
||||
|
||||
if (bashCommand.getLogs() != null) {
|
||||
bashCommand.getLogs().forEach(logLine -> {
|
||||
try {
|
||||
LoggingEventBuilder builder = runContext
|
||||
.logger()
|
||||
.atLevel(logLine.getLevel())
|
||||
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, customInstant);
|
||||
builder.log(logLine.getMessage());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Invalid log '{}'", m.group(1), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
|
||||
}
|
||||
} else {
|
||||
if (isStdErr) {
|
||||
try {
|
||||
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
|
||||
if (matches.isPresent()) {
|
||||
TaskLogMatch taskLogMatch = matches.get();
|
||||
outputs.putAll(taskLogMatch.outputs());
|
||||
} else if (isStdErr) {
|
||||
runContext.logger().error(line);
|
||||
} else {
|
||||
runContext.logger().info(line);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
|
||||
}
|
||||
|
||||
return outputs;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public static class BashCommand <T> {
|
||||
private Map<String, Object> outputs;
|
||||
private List<AbstractMetricEntry<T>> metrics;
|
||||
private List<LogLine> logs;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
public static class LogLine {
|
||||
private Level level;
|
||||
private String message;
|
||||
}
|
||||
|
||||
/**
|
||||
* This helper method will allow gathering the execution information from a task parameters:
|
||||
* - If executionId is null, it is fetched from the runContext variables (a.k.a. current execution).
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
|
||||
/**
|
||||
* Service for matching and capturing structured data from task execution logs.
|
||||
* <p>
|
||||
* Example log format that may be matched:
|
||||
* <pre>{@code
|
||||
* ::{"outputs":{"key":"value"}}::
|
||||
* }</pre>
|
||||
*/
|
||||
@Singleton
|
||||
public class TaskLogLineMatcher {
|
||||
|
||||
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
|
||||
|
||||
protected static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
|
||||
|
||||
/**
|
||||
* Attempts to match and extract structured data from a given log line.
|
||||
* <p>
|
||||
* If the line contains recognized patterns (e.g., JSON-encoded output markers),
|
||||
* a {@link TaskLogMatch} is returned encapsulating the extracted data.
|
||||
* </p>
|
||||
*
|
||||
* @param logLine the raw log line.
|
||||
* @param logger the logger
|
||||
* @param runContext the {@link RunContext}
|
||||
* @return an {@link Optional} containing the {@link TaskLogMatch} if a match was found,
|
||||
* otherwise {@link Optional#empty()}
|
||||
*/
|
||||
public Optional<TaskLogMatch> matches(String logLine, Logger logger, RunContext runContext, Instant instant) throws IOException {
|
||||
Optional<String> matches = matches(logLine);
|
||||
if (matches.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
TaskLogMatch match = MAPPER.readValue(matches.get(), TaskLogLineMatcher.TaskLogMatch.class);
|
||||
|
||||
return Optional.of(handle(logger, runContext, instant, match, matches.get()));
|
||||
}
|
||||
|
||||
protected TaskLogMatch handle(Logger logger, RunContext runContext, Instant instant, TaskLogMatch match, String data) {
|
||||
|
||||
if (match.metrics() != null) {
|
||||
match.metrics().forEach(runContext::metric);
|
||||
}
|
||||
|
||||
if (match.logs() != null) {
|
||||
match.logs().forEach(it -> {
|
||||
try {
|
||||
LoggingEventBuilder builder = runContext
|
||||
.logger()
|
||||
.atLevel(it.level())
|
||||
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, instant);
|
||||
builder.log(it.message());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Invalid log '{}'",data, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
return match;
|
||||
}
|
||||
|
||||
protected Optional<String> matches(String logLine) {
|
||||
Matcher m = LOG_DATA_SYNTAX.matcher(logLine);
|
||||
return m.find() ? Optional.ofNullable(m.group(1)) : Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the result of log line match.
|
||||
*
|
||||
* @param outputs a map of extracted output key-value pairs
|
||||
* @param metrics a list of captured metric entries, typically used for reporting or monitoring
|
||||
* @param logs additional log lines derived from the matched line, if any
|
||||
*/
|
||||
public record TaskLogMatch(
|
||||
Map<String, Object> outputs,
|
||||
List<AbstractMetricEntry<?>> metrics,
|
||||
List<LogLine> logs
|
||||
) {
|
||||
@Override
|
||||
public Map<String, Object> outputs() {
|
||||
return Optional.ofNullable(outputs).orElse(Map.of());
|
||||
}
|
||||
}
|
||||
|
||||
public record LogLine(
|
||||
Level level,
|
||||
String message
|
||||
) {
|
||||
}
|
||||
}
|
||||
@@ -140,11 +140,28 @@ public final class ExecutableUtils {
|
||||
}
|
||||
}
|
||||
|
||||
String tenantId = currentExecution.getTenantId();
|
||||
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
|
||||
String subflowId = runContext.render(currentTask.subflowId().flowId());
|
||||
Optional<Integer> subflowRevision = currentTask.subflowId().revision();
|
||||
FlowInterface flow = getSubflow(tenantId, subflowNamespace, subflowId, subflowRevision, flowExecutorInterface, currentFlow);
|
||||
|
||||
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
|
||||
currentExecution.getTenantId(),
|
||||
subflowNamespace,
|
||||
subflowId,
|
||||
subflowRevision,
|
||||
currentExecution.getTenantId(),
|
||||
currentFlow.getNamespace(),
|
||||
currentFlow.getId()
|
||||
)
|
||||
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
|
||||
|
||||
if (flow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (flow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
|
||||
if (labels != null) {
|
||||
@@ -206,35 +223,6 @@ public final class ExecutableUtils {
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static FlowInterface getSubflow(String tenantId,
|
||||
String subflowNamespace,
|
||||
String subflowId,
|
||||
Optional<Integer> subflowRevision,
|
||||
FlowExecutorInterface flowExecutorInterface,
|
||||
FlowInterface currentFlow) {
|
||||
|
||||
FlowInterface flow = flowExecutorInterface.findByIdFromTask(
|
||||
tenantId,
|
||||
subflowNamespace,
|
||||
subflowId,
|
||||
subflowRevision,
|
||||
tenantId,
|
||||
currentFlow.getNamespace(),
|
||||
currentFlow.getId()
|
||||
)
|
||||
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'"));
|
||||
|
||||
if (flow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (flow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
return flow;
|
||||
}
|
||||
|
||||
private static List<Label> systemLabels(Execution execution) {
|
||||
return Streams.of(execution.getLabels())
|
||||
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
|
||||
|
||||
@@ -615,16 +615,19 @@ public class ExecutorService {
|
||||
Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
|
||||
|
||||
if (task instanceof Pause pauseTask) {
|
||||
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
|
||||
if (pauseTask.getPauseDuration() != null || pauseTask.getTimeout() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
|
||||
Duration duration = runContext.render(pauseTask.getPauseDuration()).as(Duration.class).orElse(null);
|
||||
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
|
||||
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
|
||||
Pause.Behavior behavior = runContext.render(pauseTask.getBehavior()).as(Pause.Behavior.class).orElse(Pause.Behavior.RESUME);
|
||||
if (duration != null || timeout != null) { // rendering can lead to null, so we must re-check here
|
||||
// if duration is set, we use it, and we use the Pause behavior as a state
|
||||
// if no duration, we use the standard timeout property and use FAILED as the target state
|
||||
return ExecutionDelay.builder()
|
||||
.taskRunId(workerTaskResult.getTaskRun().getId())
|
||||
.executionId(executor.getExecution().getId())
|
||||
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
|
||||
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
|
||||
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
|
||||
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
|
||||
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -660,6 +660,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
});
|
||||
});
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION, MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION)
|
||||
.record(Duration.between(now, ZonedDateTime.now()));
|
||||
}
|
||||
|
||||
private List<FlowWithSource> getFlowsWithDefaults() {
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
|
||||
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
@@ -50,7 +51,7 @@ public final class YamlParser {
|
||||
}
|
||||
|
||||
private static <T> String type(Class<T> cls) {
|
||||
return cls.getSimpleName().toLowerCase();
|
||||
return FlowInterface.class.isAssignableFrom(cls) ? "flow" : cls.getSimpleName().toLowerCase();
|
||||
}
|
||||
|
||||
public static <T> T parse(File file, Class<T> cls) throws ConstraintViolationException {
|
||||
|
||||
@@ -344,7 +344,7 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
// if it's a Pause task with no subtask, we terminate the task
|
||||
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null) {
|
||||
if (task instanceof Pause pauseTask && ListUtils.isEmpty(pauseTask.getTasks())) {
|
||||
if (newState == State.Type.RUNNING) {
|
||||
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
|
||||
} else if (newState == State.Type.KILLING) {
|
||||
@@ -365,11 +365,12 @@ public class ExecutionService {
|
||||
}
|
||||
|
||||
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
|
||||
// there is still some tasks paused, this can occur with parallel pause
|
||||
// there are still some tasks paused, this can occur with parallel pause
|
||||
return newExecution;
|
||||
}
|
||||
return newExecution
|
||||
.withState(State.Type.RESTARTED);
|
||||
|
||||
// we need to cancel immediately or the executor will process the next task if it's restarted.
|
||||
return newState == State.Type.CANCELLED ? newExecution.withState(State.Type.CANCELLED) : newExecution.withState(State.Type.RESTARTED);
|
||||
}
|
||||
|
||||
public Execution markWithTaskRunAs(final Execution execution, String taskRunId, State.Type newState, Boolean markParents) throws Exception {
|
||||
@@ -655,7 +656,7 @@ public class ExecutionService {
|
||||
*
|
||||
* @return the execution in a KILLING state if not already terminated
|
||||
*/
|
||||
public Execution kill(Execution execution, Flow flow) {
|
||||
public Execution kill(Execution execution, FlowInterface flow) {
|
||||
if (execution.getState().getCurrent() == State.Type.KILLING || execution.getState().isTerminated()) {
|
||||
return execution;
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import io.kestra.core.runners.RunContextLogger;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.plugin.core.flow.EmbeddedSubflow;
|
||||
import io.kestra.plugin.core.flow.Template;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
@@ -198,14 +197,8 @@ public class PluginDefaultService {
|
||||
try {
|
||||
return this.injectAllDefaults(flow, false);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
e.getMessage(),
|
||||
e
|
||||
);
|
||||
String cause = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
|
||||
logService.get().logExecution(flow, logger, Level.WARN, "Unable to inject plugin defaults. Cause: '{}'", cause);
|
||||
return readWithoutDefaultsOrThrow(flow);
|
||||
}
|
||||
}
|
||||
@@ -310,7 +303,7 @@ public class PluginDefaultService {
|
||||
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
|
||||
} catch (Exception e) {
|
||||
if (safe) {
|
||||
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
|
||||
logService.get().logExecution(flow, log, Level.WARN, "Unable to inject plugin default versions. Cause: {}", e.getMessage());
|
||||
result = FlowWithException.from(flow, e);
|
||||
|
||||
// deleted is not part of the original 'source'
|
||||
@@ -382,16 +375,11 @@ public class PluginDefaultService {
|
||||
.build();
|
||||
|
||||
if (tenant != null) {
|
||||
// This is a hack to set the tenant in Template and EmbeddedSubflow tasks.
|
||||
// When using the Template or EmbeddedSubflow task, we need the tenant to fetch the them from the database.
|
||||
// This is a hack to set the tenant in template tasks.
|
||||
// When using the Template task, we need the tenant to fetch the Template from the database.
|
||||
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
|
||||
// So we save it at flow creation/updating time.
|
||||
full.allTasksWithChilds().stream()
|
||||
.filter(task -> task instanceof Template)
|
||||
.forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
full.allTasksWithChilds().stream()
|
||||
.filter(task -> task instanceof EmbeddedSubflow)
|
||||
.forEach(task -> ((EmbeddedSubflow) task).setTenantId(tenant));
|
||||
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
|
||||
}
|
||||
|
||||
return full;
|
||||
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.condition.*;
|
||||
import io.kestra.plugin.core.flow.ChildFlowInterface;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -162,9 +161,11 @@ public class FlowTopologyService {
|
||||
return parent
|
||||
.allTasksWithChilds()
|
||||
.stream()
|
||||
.filter(t -> t instanceof ChildFlowInterface)
|
||||
.map(t -> (ChildFlowInterface) t)
|
||||
.anyMatch(t -> Objects.equals(t.getFlowId(), child.getId()) && Objects.equals(t.getNamespace(), child.getNamespace()));
|
||||
.filter(t -> t instanceof ExecutableTask)
|
||||
.map(t -> (ExecutableTask<?>) t)
|
||||
.anyMatch(t ->
|
||||
t.subflowId() != null && t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to detect flow task on namespace:'{}', flowId:'{}'", parent.getNamespace(), parent.getId(), e);
|
||||
return false;
|
||||
|
||||
@@ -8,7 +8,6 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.FlowValidation;
|
||||
import io.kestra.plugin.core.flow.ChildFlowInterface;
|
||||
import io.micronaut.core.annotation.AnnotationValue;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
@@ -70,9 +69,9 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ChildFlowInterface childFlow
|
||||
&& value.getId().equals(childFlow.getFlowId())
|
||||
&& value.getNamespace().equals(childFlow.getNamespace()))
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
|
||||
// input unique name
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.slf4j.event.Level;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Log a message in the task logs.",
|
||||
title = "Log a message in the task logs (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
|
||||
deprecated = true
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Assert some conditions.",
|
||||
title = "Assert some conditions to control task output data.",
|
||||
description = "Used to control outputs data emitted from previous task on this execution."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "List execution counts for a list of flow.",
|
||||
title = "List execution counts for a list of flows.",
|
||||
description = "This can be used to send an alert if a condition is met about execution counts."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Exit the execution: terminate it in the state defined by the property `state`.",
|
||||
title = "Terminate an execution in the state defined by the property state.",
|
||||
description = "Note that if this execution has running tasks, for example in a parallel branch, the tasks will not be terminated except if `state` is set to `KILLED`."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Fail the execution.",
|
||||
title = "Intentionally fail the execution.",
|
||||
description = "Used to fail the execution, for example, on a switch branch or on some conditions based on the execution context."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -37,7 +37,8 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Resume a paused execution. By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
|
||||
title = "Resume a paused execution.",
|
||||
description = "By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -36,7 +36,7 @@ import java.util.stream.Stream;
|
||||
@NoArgsConstructor
|
||||
@DagTaskValidation
|
||||
@Schema(
|
||||
title = "Create a directed acyclic graph (DAG) of tasks without explicitly specifying the order in which the tasks need to run.",
|
||||
title = "Create a DAG of tasks without explicitly specifying the order in which the tasks must run.",
|
||||
description = "List your tasks and their dependencies, and Kestra will figure out the execution sequence.\n" +
|
||||
"Each task can only depend on other tasks from the DAG task.\n" +
|
||||
"For technical reasons, low-code interaction via UI forms is disabled for now when using this task."
|
||||
|
||||
@@ -32,7 +32,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks in parallel.",
|
||||
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item in parallel. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +
|
||||
|
||||
@@ -34,7 +34,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "For each value in the list, execute one or more tasks sequentially.",
|
||||
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
|
||||
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
|
||||
"The list of `tasks` will be executed for each item sequentially. " +
|
||||
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.NextTaskRun;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.hierarchies.AbstractGraph;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.hierarchies.RelationType;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.StartupEvent;
|
||||
import io.micronaut.runtime.event.annotation.EventListener;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Embeds subflow tasks into this flow."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Embeds subflow tasks.",
|
||||
full = true,
|
||||
code = """
|
||||
id: parent_flow
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: embed_subflow
|
||||
type: io.kestra.plugin.core.flow.EmbeddedSubflow
|
||||
namespace: company.team
|
||||
flowId: subflow
|
||||
"""
|
||||
)
|
||||
}
|
||||
)
|
||||
public class EmbeddedSubflow extends Task implements FlowableTask<EmbeddedSubflow.Output>, ChildFlowInterface {
|
||||
static final String PLUGIN_FLOW_OUTPUTS_ENABLED = "outputs.enabled";
|
||||
|
||||
@Hidden
|
||||
@Setter // we have no other option here as we need to update the task inside the flow when creating it
|
||||
private String tenantId;;
|
||||
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "The namespace of the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The identifier of the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
private String flowId;
|
||||
|
||||
@Schema(
|
||||
title = "The revision of the subflow to be embedded.",
|
||||
description = "By default, the last, i.e. the most recent, revision of the subflow is embedded."
|
||||
)
|
||||
@PluginProperty
|
||||
@Min(value = 1)
|
||||
private Integer revision;
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the subflow to be embedded."
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public List<Task> getErrors() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
|
||||
return subflow.getErrors();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public List<Task> getFinally() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
|
||||
return subflow.getFinally();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractGraph tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.SEQUENTIAL);
|
||||
|
||||
GraphUtils.sequential(
|
||||
subGraph,
|
||||
subflow.getTasks(),
|
||||
subflow.getErrors(),
|
||||
subflow.getFinally(),
|
||||
taskRun,
|
||||
execution
|
||||
);
|
||||
|
||||
return subGraph;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Task> allChildTasks() {
|
||||
Optional<Flow> maybeSubflow = fetchSubflow();
|
||||
if (maybeSubflow.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Flow subflow = maybeSubflow.get();
|
||||
return Stream
|
||||
.concat(
|
||||
subflow.getTasks() != null ? subflow.getTasks().stream() : Stream.empty(),
|
||||
subflow.getErrors() != null ? subflow.getErrors().stream() : Stream.empty()
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
Flow subflow = fetchSubflow(runContext);
|
||||
|
||||
return FlowableUtils.resolveTasks(subflow.getTasks(), parentTaskRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
return FlowableUtils.resolveSequentialNexts(
|
||||
execution,
|
||||
this.childTasks(runContext, parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
|
||||
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
|
||||
parentTaskRun
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Output outputs(RunContext runContext) throws Exception {
|
||||
final Output.OutputBuilder builder = Output.builder();
|
||||
Flow subflow = fetchSubflow(runContext);
|
||||
|
||||
final Optional<Map<String, Object>> subflowOutputs = Optional
|
||||
.ofNullable(subflow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
);
|
||||
|
||||
if (subflowOutputs.isPresent() && runContext.getVariables().get("outputs") != null) {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs.get());
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (subflow.getOutputs() != null && flowInputOutput != null) {
|
||||
// to be able to use FILE Input, we need the execution info, so we create a fake execution with what's needed here
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
String executionId = (String) ((Map<String, Object>) runContext.getVariables().get("execution")).get("id");
|
||||
Execution fake = Execution.builder()
|
||||
.id(executionId)
|
||||
.tenantId(flowInfo.tenantId())
|
||||
.namespace(flowInfo.namespace())
|
||||
.flowId(flowInfo.id())
|
||||
.build();
|
||||
outputs = flowInputOutput.typedOutputs(subflow, fake, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// This method should only be used when getSubflow(RunContext) cannot be used.
|
||||
private Optional<Flow> fetchSubflow() {
|
||||
// at validation time, namespace and flowId may not yet be set, in this case let's return an optional to not fail
|
||||
if (namespace == null || flowId == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
ApplicationContext applicationContext = ContextHelper.context();
|
||||
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
|
||||
FlowInterface subflow = flowExecutor.findById(tenantId, namespace, flowId, Optional.ofNullable(revision)).orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + namespace + "." + flowId));
|
||||
|
||||
if (subflow.isDisabled()) {
|
||||
throw new IllegalStateException("Cannot execute a flow which is disabled");
|
||||
}
|
||||
|
||||
if (subflow instanceof FlowWithException fwe) {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
|
||||
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
||||
try {
|
||||
return Optional.of(pluginDefaultService.injectAllDefaults(subflow, true));
|
||||
} catch (FlowProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// This method is preferred as getSubflow() as it checks current flow and subflow and allowed namespaces
|
||||
private Flow fetchSubflow(RunContext runContext) {
|
||||
// we check that the task tenant is the current tenant to avoid accessing flows from another tenant
|
||||
if (!Objects.equals(tenantId, runContext.flowInfo().tenantId())) {
|
||||
throw new IllegalArgumentException("Cannot embeds a flow from a different tenant");
|
||||
}
|
||||
|
||||
ApplicationContext applicationContext = ContextHelper.context();
|
||||
FlowExecutorInterface flowExecutor = applicationContext.getBean(FlowExecutorInterface.class);
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
|
||||
FlowInterface flow = flowExecutor.findById(flowInfo.tenantId(), flowInfo.namespace(), flowInfo.id(), Optional.of(flowInfo.revision()))
|
||||
.orElseThrow(() -> new IllegalArgumentException("Unable to find flow " + flowInfo.namespace() + "." + flowInfo.id()));
|
||||
FlowInterface subflow = ExecutableUtils.getSubflow(tenantId, namespace, flowId, Optional.ofNullable(revision), flowExecutor, flow);
|
||||
|
||||
// check inputs
|
||||
if (!ListUtils.isEmpty(subflow.getInputs())) {
|
||||
Optional<Input<?>> missing = subflow.getInputs().stream()
|
||||
.filter(input -> input.getRequired() && !inputs.containsKey(input.getId()))
|
||||
.findFirst();
|
||||
if (missing.isPresent()) {
|
||||
throw new IllegalArgumentException("Missing required input " + missing.get().getId());
|
||||
}
|
||||
}
|
||||
|
||||
PluginDefaultService pluginDefaultService = applicationContext.getBean(PluginDefaultService.class);
|
||||
return pluginDefaultService.injectAllDefaults(subflow, runContext.logger());
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly hack to provide the ApplicationContext on {{@link #allChildTasks }} & {{@link #tasksTree }}
|
||||
* We need to inject a way to fetch embedded subflows ...
|
||||
*/
|
||||
@Singleton
|
||||
static class ContextHelper {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private static ApplicationContext context;
|
||||
|
||||
static ApplicationContext context() {
|
||||
return ContextHelper.context;
|
||||
}
|
||||
|
||||
@EventListener
|
||||
void onStartup(final StartupEvent event) {
|
||||
ContextHelper.context = this.applicationContext;
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The extracted outputs from the embedded subflow."
|
||||
)
|
||||
private final Map<String, Object> outputs;
|
||||
}
|
||||
}
|
||||
@@ -22,8 +22,10 @@ import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
@@ -40,9 +42,9 @@ import java.util.stream.Stream;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Pause the current execution and wait for a manual approval (either by humans or other automated processes).",
|
||||
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
|
||||
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after a timeout."
|
||||
title = "Pause the current execution and wait for approval (either by humans or other automated processes).",
|
||||
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
|
||||
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after the `pauseDuration`."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -129,6 +131,24 @@ import java.util.stream.Stream;
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Status is {{ outputs.wait_for_approval.onResume.reason }}. Process finished with {{ outputs.approve.body }}
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
title = "Pause the execution and set the execution in WARNING if it has not been resumed after 5 minutes",
|
||||
full = true,
|
||||
code = """
|
||||
id: pause_warn
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: pause
|
||||
type: io.kestra.plugin.core.flow.Pause
|
||||
pauseDuration: PT5M
|
||||
behavior: WARN
|
||||
|
||||
- id: post_resume
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ task.id }} started on {{ taskrun.startDate }} after the Pause"
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = "io.kestra.core.tasks.flows.Pause"
|
||||
@@ -136,17 +156,38 @@ import java.util.stream.Stream;
|
||||
public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
@Schema(
|
||||
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
|
||||
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
description = "**Deprecated**: use `pauseDuration` instead.",
|
||||
implementation = Duration.class
|
||||
)
|
||||
@Deprecated
|
||||
private Property<Duration> delay;
|
||||
|
||||
@Deprecated
|
||||
public void setDelay(Property<Duration> delay) {
|
||||
this.delay = delay;
|
||||
this.pauseDuration = delay;
|
||||
}
|
||||
|
||||
@Schema(
|
||||
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
|
||||
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
title = "Duration of the pause. If not set the task will wait forever to be manually resumed except if a timeout is set, in this case, the timeout will be honored.",
|
||||
description = "The duration is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no pauseDuration and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
|
||||
implementation = Duration.class
|
||||
)
|
||||
private Property<Duration> timeout;
|
||||
private Property<Duration> pauseDuration;
|
||||
|
||||
@Schema(
|
||||
title = "Pause behavior, by default RESUME. What happens when a pause task reach its duration.",
|
||||
description = """
|
||||
Tasks that are resumed before the duration (for example, from the UI) will not use the behavior property but will always success.
|
||||
Possible values are:
|
||||
- RESUME: continue with the execution
|
||||
- WARN: ends the Pause task in WARNING and continue with the execution
|
||||
- FAIL: fail the Pause task
|
||||
- CANCEL: cancel the execution"""
|
||||
)
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<Behavior> behavior = Property.of(Behavior.RESUME);
|
||||
|
||||
@Valid
|
||||
@Schema(
|
||||
@@ -230,17 +271,34 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
parentTaskRun.getState().getHistories().stream().noneMatch(history -> history.getState() == State.Type.PAUSED);
|
||||
}
|
||||
|
||||
// This method is only called when there are subtasks
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
if (this.needPause(parentTaskRun)) {
|
||||
return Optional.of(State.Type.PAUSED);
|
||||
}
|
||||
|
||||
if (this.tasks == null || this.tasks.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
Behavior behavior = runContext.render(this.behavior).as(Behavior.class).orElse(Behavior.RESUME);
|
||||
return switch (behavior) {
|
||||
case Behavior.RESUME -> {
|
||||
// yield SUCCESS or the final flowable task state
|
||||
if (ListUtils.isEmpty(this.tasks)) {
|
||||
yield Optional.of(State.Type.SUCCESS);
|
||||
} else {
|
||||
yield FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
}
|
||||
}
|
||||
case Behavior.WARN -> {
|
||||
// yield WARNING or the final flowable task state, if the flowable ends in SUCCESS, yield WARNING
|
||||
if (ListUtils.isEmpty(this.tasks)) {
|
||||
yield Optional.of(State.Type.WARNING);
|
||||
} else {
|
||||
Optional<State.Type> finalState = FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
|
||||
yield finalState.map(state -> state == State.Type.SUCCESS ? State.Type.WARNING : state);
|
||||
}
|
||||
}
|
||||
case Behavior.CANCEL ,Behavior.FAIL -> throw new IllegalArgumentException("The " + behavior + " cannot be handled at this stage, this is certainly a bug!");
|
||||
};
|
||||
}
|
||||
|
||||
public Map<String, Object> generateOutputs(Map<String, Object> inputs) {
|
||||
@@ -256,4 +314,21 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
private Map<String, Object> onResume;
|
||||
}
|
||||
|
||||
public enum Behavior {
|
||||
RESUME(State.Type.RUNNING),
|
||||
WARN(State.Type.WARNING),
|
||||
CANCEL(State.Type.CANCELLED),
|
||||
FAIL(State.Type.FAILED);
|
||||
|
||||
private final State.Type executionState;
|
||||
|
||||
Behavior(State.Type executionState) {
|
||||
this.executionState = executionState;
|
||||
}
|
||||
|
||||
public State.Type mapToState() {
|
||||
return this.executionState;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ import java.util.stream.Stream;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Run tasks sequentially, one after the other, in the order they are defined.",
|
||||
title = "Run tasks sequentially in the order they are defined.",
|
||||
description = "Used to visually group tasks."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "A task that sleep for a specified duration before proceeding."
|
||||
title = "Sleep task, wait for a specified duration before proceeding."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -49,8 +49,8 @@ import java.util.stream.Collectors;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Create a subflow execution. Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language.",
|
||||
description = "Restarting a parent flow will restart any subflows that has previously been executed."
|
||||
title = "Create a subflow execution.",
|
||||
description = "Subflows offer a modular way to reuse workflow logic by calling other flows just like calling a function in a programming language. Restarting a parent flow will restart any subflows that has previously been executed."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -43,7 +43,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Run tasks conditionally, i.e. decide which branch of tasks should be executed based on a given value.",
|
||||
title = "Run tasks conditionally based on a given value.",
|
||||
description = "This task runs a set of tasks based on a given value.\n" +
|
||||
"The value is evaluated at runtime and compared to the list of cases.\n" +
|
||||
"If the value matches a case, the corresponding tasks are executed.\n" +
|
||||
|
||||
@@ -53,7 +53,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@NoArgsConstructor
|
||||
@Slf4j
|
||||
@Schema(
|
||||
title = "Include a reusable template inside a flow."
|
||||
title = "Include a reusable template inside a flow (Deprecated)."
|
||||
)
|
||||
@Deprecated
|
||||
@Plugin(
|
||||
|
||||
@@ -36,8 +36,8 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Download a file from a HTTP server.",
|
||||
description = "This task connects to a HTTP server and copy a file to Kestra's internal storage."
|
||||
title = "Download a file from an HTTP server.",
|
||||
description = "This task connects to a HTTP server and copies a file to Kestra's internal storage."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -32,11 +32,11 @@ import java.util.OptionalInt;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Make an HTTP API request to a specified URL and store the response as output.",
|
||||
title = "Make an HTTP API request to a specified URL and store the response as an output.",
|
||||
description = """
|
||||
This task makes an API call to a specified URL of an HTTP server and stores the response as output.
|
||||
This task makes an API call to a specified URL of an HTTP server and stores the response as an output.
|
||||
By default, the maximum length of the response is limited to 10MB, but it can be increased to at most 2GB by using the `options.maxContentLength` property.
|
||||
Note that the response is added as output to the task. If you need to process large API payloads, we recommend using the `Download` task instead."""
|
||||
Note that the response is added as an output of the task. If you need to process large API payloads, we recommend using the `Download` task instead."""
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -21,7 +21,7 @@ import java.util.NoSuchElementException;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Deletes a KV pair."
|
||||
title = "Delete a KV pair."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.Optional;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Gets value linked to a key."
|
||||
title = "Retrieve a value of a KV pair by a key."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -26,7 +26,7 @@ import java.util.function.Predicate;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Gets keys matching a given prefix."
|
||||
title = "Fetch all keys matching a given KV pair prefix."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -27,7 +27,7 @@ import java.util.List;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Purge flow execution and trigger logs.",
|
||||
title = "Purge flow execution logs and trigger-related logs.",
|
||||
description = "This task can be used to purge flow execution and trigger logs for all flows, for a specific namespace, or for a specific flow."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -24,7 +24,7 @@ import java.util.List;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Publish metrics.",
|
||||
title = "Publish Kestra metrics within an execution.",
|
||||
description = "This task is useful to easily publish metrics for a flow."
|
||||
)
|
||||
@Plugin(
|
||||
@@ -32,20 +32,25 @@ import java.util.List;
|
||||
@Example(
|
||||
full = true,
|
||||
code = """
|
||||
id: publish_metric
|
||||
id: publish_metrics
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: random
|
||||
type: io.kestra.plugin.core.output.OutputValues
|
||||
values:
|
||||
metric: "{{randomInt(0, 10)}}"
|
||||
- id: metric
|
||||
type: io.kestra.plugin.core.metric.Publish
|
||||
metrics:
|
||||
- name: myMetric
|
||||
type: counter
|
||||
value: "{{outputs.random.values.metric}}"
|
||||
- type: timer
|
||||
name: duration
|
||||
value: PT10M
|
||||
tags:
|
||||
flow: "{{flow.id}}"
|
||||
project: kestra
|
||||
- type: counter
|
||||
name: number
|
||||
value: 42
|
||||
tags:
|
||||
flow: "{{flow.id}}"
|
||||
project: kestra
|
||||
"""
|
||||
)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import java.io.FileNotFoundException;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Delete a state from the state store."
|
||||
title = "Delete a state from the state store (Deprecated)."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Get a state from the state store."
|
||||
title = "Get a state from the state store (Deprecated)."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -20,7 +20,7 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Set a state in the state store.",
|
||||
title = "Set a state in the state store (Deprecated).",
|
||||
description = "Values will be merged: \n" +
|
||||
"* If you provide a new key, the new key will be added.\n" +
|
||||
"* If you provide an existing key, the previous key will be overwrite.\n" +
|
||||
|
||||
@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Concat files from the internal storage."
|
||||
title = "Concat files from Kestra’s internal storage."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -23,7 +23,7 @@ import java.util.NoSuchElementException;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Delete a file from the Kestra's internal storage."
|
||||
title = "Delete a file from Kestra's internal storage."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -23,8 +23,8 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "This task is deprecated and replaced by `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. ",
|
||||
description = "This task was intended to be used along with the `WorkingDirectory` task to create temporary files. This task suffers from multiple limitations e.g. it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
|
||||
title = "Create temporary files (Deprecated).",
|
||||
description = "This task is deprecated and replaced by `inputFiles` property available in all script tasks and in the [WorkingDirectory](https://kestra.io/plugins/core/tasks/io.kestra.plugin.core.flow.workingdirectory) task. Check the [migration guide](https://kestra.io/docs/migration-guide/0.17.0/local-files) for more details. This task suffers from multiple limitations e.g. it cannot be skipped, so setting `disabled: true` will have no effect. Overall, the WorkingDirectory task is more flexible and should be used instead of this task. This task will be removed in a future version of Kestra."
|
||||
)
|
||||
@Deprecated
|
||||
@Plugin(examples = {
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Reverse a file from the Kestra's internal storage, last line first."
|
||||
title = "Reverse a file from Kestra's internal storage, last line first."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -22,7 +22,7 @@ import java.net.URI;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Get the size of a file from the Kestra's internal storage."
|
||||
title = "Get the size of a file from Kestra's internal storage."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -23,7 +23,7 @@ import java.util.List;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Split a file from the Kestra's internal storage into multiple files."
|
||||
title = "Split a file from Kestra's internal storage into multiple files."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -24,7 +24,7 @@ import static io.kestra.core.utils.Rethrow.throwSupplier;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Write data to a file in the internal storage.",
|
||||
title = "Write data to a file in Kestra’s internal storage.",
|
||||
description = "Use the Write task to store outputs as files internally and then reference the stored file for processing further down your flow."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Templatize a task.",
|
||||
title = "Templatize task properties using Kestra’s Pebble templating.",
|
||||
description = "This task's `spec` property allows you to fully templatize all task properties using Kestra's Pebble templating. This way, all task properties and their values can be dynamically rendered based on your custom inputs, variables, and outputs from other tasks."
|
||||
)
|
||||
@Plugin(
|
||||
|
||||
@@ -31,13 +31,13 @@ import jakarta.validation.constraints.Size;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Trigger a flow from a webhook.",
|
||||
title = "Execute a flow from an API call triggered by a webhook.",
|
||||
description = """
|
||||
Webhook trigger allows you to create a unique URL that you can use to trigger a Kestra flow execution based on events in another application such as GitHub or Amazon EventBridge. In order to use that URL, you have to add a secret key that will secure your webhook URL.
|
||||
|
||||
The URL will then follow the following format: `https://{your_hostname}/api/v1/executions/webhook/{namespace}/{flowId}/{key}`. Replace the templated values according to your workflow setup.
|
||||
|
||||
The webhook URL accepts `GET`, `POST` and `PUT` requests.
|
||||
The webhook URL accepts `GET`, `POST`, and `PUT` requests.
|
||||
|
||||
You can access the request body and headers sent by another application using the following template variables:
|
||||
- `{{ trigger.body }}`
|
||||
@@ -48,7 +48,7 @@ import jakarta.validation.constraints.Size;
|
||||
- 200 if the webhook triggers an execution.
|
||||
- 204 if the webhook cannot trigger an execution due to a lack of matching event conditions sent by other application.
|
||||
|
||||
A webhook trigger can have conditions but it doesn't support conditions of type `MultipleCondition`."""
|
||||
A webhook trigger can have conditions, but it doesn't support conditions of type `MultipleCondition`."""
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -71,7 +71,7 @@ class ClassPluginDocumentationTest {
|
||||
// map
|
||||
Map<String, Object> childInputMap = (Map<String, Object>) childInput.get("map");
|
||||
assertThat((String) (childInputMap).get("type")).isEqualTo("object");
|
||||
assertThat((Boolean) (childInputMap).get("$dynamic")).isEqualTo(true);
|
||||
assertThat((Boolean) (childInputMap).get("$dynamic")).isTrue();
|
||||
assertThat(((Map<String, String>) (childInputMap).get("additionalProperties")).get("type")).isEqualTo("number");
|
||||
|
||||
// output
|
||||
@@ -151,18 +151,18 @@ class ClassPluginDocumentationTest {
|
||||
List<Map<String, Object>> anyOf = (List<Map<String, Object>>) number.get("anyOf");
|
||||
assertThat(anyOf).hasSize(2);
|
||||
assertThat(anyOf.getFirst().get("type")).isEqualTo("integer");
|
||||
assertThat(anyOf.getFirst().get("$dynamic")).isEqualTo(true);
|
||||
assertThat((Boolean) anyOf.getFirst().get("$dynamic")).isTrue();
|
||||
assertThat(anyOf.get(1).get("type")).isEqualTo("string");
|
||||
// assertThat(anyOf.get(1).get("pattern"), is(".*{{.*}}.*"));
|
||||
|
||||
Map<String, Object> withDefault = (Map<String, Object>) properties.get("withDefault");
|
||||
assertThat(withDefault.get("type")).isEqualTo("string");
|
||||
assertThat(withDefault.get("default")).isEqualTo("Default Value");
|
||||
assertThat(withDefault.get("$dynamic")).isEqualTo(true);
|
||||
assertThat((Boolean) withDefault.get("$dynamic")).isTrue();
|
||||
|
||||
Map<String, Object> internalStorageURI = (Map<String, Object>) properties.get("uri");
|
||||
assertThat(internalStorageURI.get("type")).isEqualTo("string");
|
||||
assertThat(internalStorageURI.get("$internalStorageURI")).isEqualTo(true);
|
||||
assertThat((Boolean) internalStorageURI.get("$internalStorageURI")).isTrue();
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ class ExecutionTest {
|
||||
.withState(State.Type.RUNNING)
|
||||
))
|
||||
.build()
|
||||
)).isEqualTo(true);
|
||||
)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -44,7 +44,7 @@ class ExecutionTest {
|
||||
assertThat(execution.hasTaskRunJoinable(TASK_RUN
|
||||
.state(new State())
|
||||
.build()
|
||||
)).isEqualTo(false);
|
||||
)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -61,7 +61,7 @@ class ExecutionTest {
|
||||
assertThat(execution.hasTaskRunJoinable(TASK_RUN
|
||||
.state(new State(State.Type.RUNNING, new State()))
|
||||
.build()
|
||||
)).isEqualTo(false);
|
||||
)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -81,7 +81,7 @@ class ExecutionTest {
|
||||
.withState(State.Type.RUNNING)
|
||||
))
|
||||
.build()
|
||||
)).isEqualTo(false);
|
||||
)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -102,7 +102,7 @@ class ExecutionTest {
|
||||
.withState(State.Type.SUCCESS)
|
||||
))
|
||||
.build()
|
||||
)).isEqualTo(true);
|
||||
)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -125,7 +125,7 @@ class ExecutionTest {
|
||||
.withState(State.Type.RUNNING)
|
||||
))
|
||||
.build()
|
||||
)).isEqualTo(true);
|
||||
)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -33,7 +33,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/duplicate.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("Duplicate task id with name [date, listen]");
|
||||
@@ -45,7 +45,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/duplicate-inputs.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("Duplicate input with name [first_input]");
|
||||
@@ -56,7 +56,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/duplicate-parallel.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("Duplicate task id with name [t3]");
|
||||
@@ -68,7 +68,7 @@ class FlowTest {
|
||||
Flow updated = this.parse("flows/invalids/duplicate.yaml");
|
||||
Optional<ConstraintViolationException> validate = flow.validateUpdate(updated);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("Illegal flow id update");
|
||||
@@ -80,7 +80,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/switch-invalid.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("impossible: No task defined, neither cases or default have any tasks");
|
||||
@@ -91,7 +91,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/workingdirectory-invalid.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("impossible: Only runnable tasks are allowed as children of a WorkingDirectory task");
|
||||
@@ -102,7 +102,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/workingdirectory-no-tasks.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(2);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("impossible: The 'tasks' property cannot be empty");
|
||||
@@ -129,7 +129,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/worker-group.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(1);
|
||||
|
||||
assertThat(validate.get().getMessage()).isEqualTo("tasks[0].workerGroup: Worker Group is an Enterprise Edition functionality\n");
|
||||
@@ -148,7 +148,7 @@ class FlowTest {
|
||||
Flow flow = this.parse("flows/invalids/inputs-validation.yaml");
|
||||
Optional<ConstraintViolationException> validate = modelValidator.isValid(flow);
|
||||
|
||||
assertThat(validate.isPresent()).isEqualTo(true);
|
||||
assertThat(validate.isPresent()).isTrue();
|
||||
assertThat(validate.get().getConstraintViolations().size()).isEqualTo(2);
|
||||
|
||||
assertThat(validate.get().getMessage()).contains("file: no `defaults` can be set for inputs of type 'FILE'");
|
||||
@@ -164,13 +164,13 @@ class FlowTest {
|
||||
triggerInputsReverseOrder.put("c", "d");
|
||||
triggerInputsReverseOrder.put("a", "b");
|
||||
Flow flowABis = baseFlow().toBuilder().revision(2).triggers(List.of(io.kestra.plugin.core.trigger.Flow.builder().inputs(triggerInputsReverseOrder).build())).build();
|
||||
assertThat(flowA.equalsWithoutRevision(flowABis)).isEqualTo(true);
|
||||
assertThat(flowA.equalsWithoutRevision(flowABis)).isTrue();
|
||||
|
||||
Flow flowB = baseFlow().toBuilder().id("b").build();
|
||||
assertThat(flowA.equalsWithoutRevision(flowB)).isEqualTo(false);
|
||||
assertThat(flowA.equalsWithoutRevision(flowB)).isFalse();
|
||||
|
||||
Flow flowAnotherTenant = baseFlow().toBuilder().tenantId("b").build();
|
||||
assertThat(flowA.equalsWithoutRevision(flowAnotherTenant)).isEqualTo(false);
|
||||
assertThat(flowA.equalsWithoutRevision(flowAnotherTenant)).isFalse();
|
||||
}
|
||||
|
||||
private static Flow baseFlow() {
|
||||
|
||||
@@ -134,7 +134,7 @@ class FlowWithSourceTest {
|
||||
String expectedSource = flow.sourceOrGenerateIfNull() + " # additional comment";
|
||||
FlowWithSource of = FlowWithSource.of(flow, expectedSource);
|
||||
|
||||
assertThat(of.equalsWithoutRevision(flow)).isEqualTo(true);
|
||||
assertThat(of.equalsWithoutRevision(flow)).isTrue();
|
||||
assertThat(of.getSource()).isEqualTo(expectedSource);
|
||||
}
|
||||
}
|
||||
@@ -53,7 +53,7 @@ class FlowGraphTest {
|
||||
|
||||
assertThat(flowGraph.getNodes().size()).isEqualTo(5);
|
||||
assertThat(flowGraph.getEdges().size()).isEqualTo(4);
|
||||
assertThat(flowGraph.getClusters().size()).isEqualTo(0);
|
||||
assertThat(flowGraph.getClusters().size()).isZero();
|
||||
|
||||
assertThat(((AbstractGraphTask) flowGraph.getNodes().get(2)).getTask().getId()).isEqualTo("date");
|
||||
assertThat(((AbstractGraphTask) flowGraph.getNodes().get(2)).getRelationType()).isEqualTo(RelationType.SEQUENTIAL);
|
||||
@@ -228,7 +228,7 @@ class FlowGraphTest {
|
||||
assertThat(flowGraph.getEdges().size()).isEqualTo(5);
|
||||
assertThat(flowGraph.getClusters().size()).isEqualTo(1);
|
||||
AbstractGraph triggerGraph = flowGraph.getNodes().stream().filter(e -> e instanceof GraphTrigger).findFirst().orElseThrow();
|
||||
assertThat(((GraphTrigger) triggerGraph).getTrigger().getDisabled()).isEqualTo(true);
|
||||
assertThat(((GraphTrigger) triggerGraph).getTrigger().getDisabled()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -50,17 +50,17 @@ class ScriptServiceTest {
|
||||
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, false);
|
||||
|
||||
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
|
||||
assertThat(matcher.matches()).isEqualTo(true);
|
||||
assertThat(matcher.matches()).isTrue();
|
||||
Path absoluteLocalFilePath = Path.of(matcher.group(1));
|
||||
localFile = absoluteLocalFilePath.toFile();
|
||||
assertThat(localFile.exists()).isEqualTo(true);
|
||||
assertThat(localFile.exists()).isTrue();
|
||||
|
||||
command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, true);
|
||||
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
|
||||
assertThat(matcher.matches()).isEqualTo(true);
|
||||
assertThat(matcher.matches()).isTrue();
|
||||
String relativePath = matcher.group(1);
|
||||
assertThat(relativePath).doesNotStartWith("/");
|
||||
assertThat(runContext.workingDir().resolve(Path.of(relativePath)).toFile().exists()).isEqualTo(true);
|
||||
assertThat(runContext.workingDir().resolve(Path.of(relativePath)).toFile().exists()).isTrue();
|
||||
} finally {
|
||||
localFile.delete();
|
||||
path.toFile().delete();
|
||||
@@ -94,18 +94,18 @@ class ScriptServiceTest {
|
||||
|
||||
assertThat(commands.getFirst(), not(is("my command with an internal storage file: " + internalStorageUri)));
|
||||
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
|
||||
assertThat(matcher.matches()).isEqualTo(true);
|
||||
assertThat(matcher.matches()).isTrue();
|
||||
File file = Path.of(matcher.group(1)).toFile();
|
||||
assertThat(file.exists()).isEqualTo(true);
|
||||
assertThat(file.exists()).isTrue();
|
||||
filesToDelete.add(file);
|
||||
|
||||
assertThat(commands.get(1)).isEqualTo("my command with some additional var usage: " + wdir);
|
||||
|
||||
commands = ScriptService.replaceInternalStorage(runContext, Collections.emptyMap(), List.of("my command with an internal storage file: " + internalStorageUri), true);
|
||||
matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(commands.getFirst());
|
||||
assertThat(matcher.matches()).isEqualTo(true);
|
||||
assertThat(matcher.matches()).isTrue();
|
||||
file = runContext.workingDir().resolve(Path.of(matcher.group(1))).toFile();
|
||||
assertThat(file.exists()).isEqualTo(true);
|
||||
assertThat(file.exists()).isTrue();
|
||||
filesToDelete.add(file);
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@@ -123,14 +123,14 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
|
||||
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(window.getResults().get("a")).isEqualTo(true);
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
MultipleConditionWindow next = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(next.getStart().format(DateTimeFormatter.ISO_DATE_TIME)).isNotEqualTo(window.getStart().format(DateTimeFormatter.ISO_DATE_TIME));
|
||||
assertThat(next.getResults().containsKey("a")).isEqualTo(false);
|
||||
assertThat(next.getResults().containsKey("a")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -144,10 +144,10 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
|
||||
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(window.getResults().get("a")).isEqualTo(true);
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
assertThat(expired.size()).isEqualTo(0);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
@@ -166,10 +166,10 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
|
||||
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(window.getResults().get("a")).isEqualTo(true);
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
assertThat(expired.size()).isEqualTo(0);
|
||||
assertThat(expired.size()).isZero();
|
||||
|
||||
Thread.sleep(2005);
|
||||
|
||||
@@ -206,10 +206,10 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
|
||||
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(window.getResults().get("a")).isEqualTo(true);
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
assertThat(expired.size()).isEqualTo(0);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -223,10 +223,10 @@ public abstract class AbstractMultipleConditionStorageTest {
|
||||
assertThat(window.getFlowId()).isEqualTo(pair.getLeft().getId());
|
||||
window = multipleConditionStorage.getOrCreate(pair.getKey(), pair.getRight(), Collections.emptyMap());
|
||||
|
||||
assertThat(window.getResults().get("a")).isEqualTo(true);
|
||||
assertThat(window.getResults().get("a")).isTrue();
|
||||
|
||||
List<MultipleConditionWindow> expired = multipleConditionStorage.expired(null);
|
||||
assertThat(expired.size()).isEqualTo(0);
|
||||
assertThat(expired.size()).isZero();
|
||||
}
|
||||
|
||||
private static Pair<Flow, MultipleCondition> mockFlow(TimeWindow sla) {
|
||||
|
||||
@@ -297,7 +297,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
assertThat(full.get().getId()).isEqualTo(ExecutionFixture.EXECUTION_1.getId());
|
||||
@@ -309,12 +309,12 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
executionRepository.purge(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(false);
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -322,12 +322,12 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
Optional<Execution> full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
executionRepository.delete(ExecutionFixture.EXECUTION_1);
|
||||
|
||||
full = executionRepository.findById(null, ExecutionFixture.EXECUTION_1.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(false);
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -720,7 +720,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executionRepository.update(updated);
|
||||
|
||||
Optional<Execution> validation = executionRepository.findById(null, updated.getId());
|
||||
assertThat(validation.isPresent()).isEqualTo(true);
|
||||
assertThat(validation.isPresent()).isTrue();
|
||||
assertThat(validation.get().getLabels().size()).isEqualTo(1);
|
||||
assertThat(validation.get().getLabels().getFirst()).isEqualTo(label);
|
||||
}
|
||||
@@ -734,7 +734,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executionRepository.save(latest);
|
||||
|
||||
Optional<Execution> result = executionRepository.findLatestForStates(null, "io.kestra.unittest", "full", List.of(State.Type.CREATED));
|
||||
assertThat(result.isPresent()).isEqualTo(true);
|
||||
assertThat(result.isPresent()).isTrue();
|
||||
assertThat(result.get().getId()).isEqualTo(latest.getId());
|
||||
}
|
||||
|
||||
|
||||
@@ -134,6 +134,6 @@ public abstract class AbstractExecutionServiceTest {
|
||||
null
|
||||
);
|
||||
|
||||
assertThat(purge.getExecutionsCount()).isEqualTo(0);
|
||||
assertThat(purge.getExecutionsCount()).isZero();
|
||||
}
|
||||
}
|
||||
@@ -82,11 +82,11 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
Optional<Flow> full = flowRepository.findById(null, flow.getNamespace(), flow.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getRevision()).isEqualTo(1);
|
||||
|
||||
full = flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
}
|
||||
@@ -100,11 +100,11 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
Optional<Flow> full = flowRepository.findByIdWithoutAcl(null, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getRevision()).isEqualTo(1);
|
||||
|
||||
full = flowRepository.findByIdWithoutAcl(null, flow.getNamespace(), flow.getId(), Optional.empty());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
}
|
||||
@@ -120,7 +120,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
try {
|
||||
Optional<FlowWithSource> full = flowRepository.findByIdWithSource(null, flow.getNamespace(), flow.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
|
||||
full.ifPresent(current -> {
|
||||
assertThat(full.get().getRevision()).isEqualTo(1);
|
||||
@@ -273,7 +273,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
FlowWithSource save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
assertThat(flowRepository.findById(null, save.getNamespace(), save.getId()).isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, save.getNamespace(), save.getId()).isPresent()).isTrue();
|
||||
} catch (Throwable e) {
|
||||
deleteFlow(save);
|
||||
throw e;
|
||||
@@ -281,8 +281,8 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
Flow delete = flowRepository.delete(save);
|
||||
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(false);
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isFalse();
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId(), Optional.of(save.getRevision())).isPresent()).isTrue();
|
||||
|
||||
List<FlowWithSource> revisions = flowRepository.findRevisions(null, flow.getNamespace(), flow.getId());
|
||||
assertThat(revisions.getLast().getRevision()).isEqualTo(delete.getRevision());
|
||||
@@ -302,7 +302,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
Flow save = flowRepository.create(GenericFlow.of(flow));
|
||||
|
||||
try {
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
|
||||
Flow update = Flow.builder()
|
||||
.id(IdUtils.create())
|
||||
@@ -339,7 +339,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
flow = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
|
||||
Flow update = Flow.builder()
|
||||
.id(flowId)
|
||||
@@ -377,7 +377,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
|
||||
Flow save = flowRepository.create(GenericFlow.of(flow));
|
||||
try {
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, flow.getNamespace(), flow.getId()).isPresent()).isTrue();
|
||||
} finally {
|
||||
deleteFlow(save);
|
||||
}
|
||||
@@ -420,8 +420,8 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
try {
|
||||
Optional<Flow> found = flowRepository.findById(null, flow.getNamespace(), flow.getId());
|
||||
|
||||
assertThat(found.isPresent()).isEqualTo(true);
|
||||
assertThat(found.get() instanceof FlowWithException).isEqualTo(true);
|
||||
assertThat(found.isPresent()).isTrue();
|
||||
assertThat(found.get() instanceof FlowWithException).isTrue();
|
||||
assertThat(((FlowWithException) found.get()).getException()).contains("Templates are disabled");
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
|
||||
@@ -44,7 +44,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
LogEntry.LogEntryBuilder builder = logEntry(Level.INFO);
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.find(Pageable.UNPAGED, null, null);
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
|
||||
LogEntry save = logRepository.save(builder.build());
|
||||
@@ -63,7 +63,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
.value(Instant.now().minus(1, ChronoUnit.HOURS))
|
||||
.build());
|
||||
find = logRepository.find(Pageable.UNPAGED, "doe", filters);
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
find = logRepository.find(Pageable.UNPAGED, null, null);
|
||||
assertThat(find.size()).isEqualTo(1);
|
||||
@@ -101,7 +101,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
assertThat(countDeleted).isEqualTo(1);
|
||||
|
||||
list = logRepository.findByExecutionIdAndTaskId(null, save.getExecutionId(), save.getTaskId(), null);
|
||||
assertThat(list.size()).isEqualTo(0);
|
||||
assertThat(list.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -147,7 +147,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
find = logRepository.findByExecutionIdAndTaskRunId(null, executionId, logEntry2.getTaskRunId(), null, Pageable.from(10, 10));
|
||||
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -158,14 +158,14 @@ public abstract class AbstractLogRepositoryTest {
|
||||
logRepository.deleteByQuery(null, log1.getExecutionId(), null, (String) null, null, null);
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
|
||||
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -176,14 +176,14 @@ public abstract class AbstractLogRepositoryTest {
|
||||
logRepository.deleteByQuery(null, log1.getExecutionId(), null, (String) null, null, null);
|
||||
|
||||
ArrayListTotal<LogEntry> find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", null);
|
||||
|
||||
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size()).isEqualTo(0);
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -24,13 +24,13 @@ public abstract class AbstractSettingRepositoryTest {
|
||||
.build();
|
||||
|
||||
Optional<Setting> find = settingRepository.findByKey(setting.getKey());
|
||||
assertThat(find.isPresent()).isEqualTo(false);
|
||||
assertThat(find.isPresent()).isFalse();
|
||||
|
||||
Setting save = settingRepository.save(setting);
|
||||
|
||||
find = settingRepository.findByKey(save.getKey());
|
||||
|
||||
assertThat(find.isPresent()).isEqualTo(true);
|
||||
assertThat(find.isPresent()).isTrue();
|
||||
assertThat(find.get().getValue()).isEqualTo(save.getValue());
|
||||
|
||||
List<Setting> all = settingRepository.findAll();
|
||||
@@ -41,9 +41,9 @@ public abstract class AbstractSettingRepositoryTest {
|
||||
assertThat(delete.getValue()).isEqualTo(setting.getValue());
|
||||
|
||||
all = settingRepository.findAll();
|
||||
assertThat(all.size()).isEqualTo(0);
|
||||
assertThat(all.size()).isZero();
|
||||
|
||||
find = settingRepository.findByKey(setting.getKey());
|
||||
assertThat(find.isPresent()).isEqualTo(false);
|
||||
assertThat(find.isPresent()).isFalse();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,11 +50,11 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
templateRepository.create(template);
|
||||
|
||||
Optional<Template> full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
|
||||
full = templateRepository.findById(null, template.getNamespace(), template.getId());
|
||||
assertThat(full.isPresent()).isEqualTo(true);
|
||||
assertThat(full.isPresent()).isTrue();
|
||||
assertThat(full.get().getId()).isEqualTo(template.getId());
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ public abstract class AbstractTemplateRepositoryTest {
|
||||
Template save = templateRepository.create(template);
|
||||
templateRepository.delete(save);
|
||||
|
||||
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isEqualTo(false);
|
||||
assertThat(templateRepository.findById(null, template.getNamespace(), template.getId()).isPresent()).isFalse();
|
||||
|
||||
assertThat(TemplateListener.getEmits().size()).isEqualTo(2);
|
||||
assertThat(TemplateListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
|
||||
@@ -35,20 +35,20 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
Trigger.TriggerBuilder<?, ?> builder = trigger();
|
||||
|
||||
Optional<Trigger> findLast = triggerRepository.findLast(builder.build());
|
||||
assertThat(findLast.isPresent()).isEqualTo(false);
|
||||
assertThat(findLast.isPresent()).isFalse();
|
||||
|
||||
Trigger save = triggerRepository.save(builder.build());
|
||||
|
||||
findLast = triggerRepository.findLast(save);
|
||||
|
||||
assertThat(findLast.isPresent()).isEqualTo(true);
|
||||
assertThat(findLast.isPresent()).isTrue();
|
||||
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
save = triggerRepository.save(builder.executionId(IdUtils.create()).build());
|
||||
|
||||
findLast = triggerRepository.findLast(save);
|
||||
|
||||
assertThat(findLast.isPresent()).isEqualTo(true);
|
||||
assertThat(findLast.isPresent()).isTrue();
|
||||
assertThat(findLast.get().getExecutionId()).isEqualTo(save.getExecutionId());
|
||||
|
||||
|
||||
|
||||
@@ -305,9 +305,9 @@ public abstract class AbstractRunnerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/pause-delay-from-input.yaml"})
|
||||
public void pauseRunDelayFromInput() throws Exception {
|
||||
pauseTest.runDelayFromInput(runnerUtils);
|
||||
@LoadFlows({"flows/valids/pause-duration-from-input.yaml"})
|
||||
public void pauseRunDurationFromInput() throws Exception {
|
||||
pauseTest.runDurationFromInput(runnerUtils);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -54,7 +54,7 @@ public class ChangeStateTestCase {
|
||||
Execution markedAs = executionService.markAs(execution, flow, execution.getTaskRunList().getFirst().getId(), State.Type.SUCCESS);
|
||||
executionQueue.emit(markedAs);
|
||||
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
receivedExecutions.blockLast();
|
||||
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(lastExecution.get().getTaskRunList()).hasSize(2);
|
||||
@@ -80,7 +80,7 @@ public class ChangeStateTestCase {
|
||||
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
// assert on the subflow
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
receivedExecutions.blockLast();
|
||||
assertThat(lastExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(lastExecution.get().getTaskRunList()).hasSize(1);
|
||||
@@ -103,7 +103,7 @@ public class ChangeStateTestCase {
|
||||
executionQueue.emit(markedAs);
|
||||
|
||||
// assert for the parent flow
|
||||
assertThat(parentLatch.await(10, TimeUnit.SECONDS)).isEqualTo(true);
|
||||
assertThat(parentLatch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
receivedExecutions.blockLast();
|
||||
assertThat(lastParentExecution.get().getState().getCurrent()).isEqualTo(State.Type.FAILED); // FIXME should be success but it's FAILED on unit tests
|
||||
assertThat(lastParentExecution.get().getTaskRunList()).hasSize(1);
|
||||
|
||||
@@ -40,7 +40,7 @@ public class FlowConcurrencyCaseTest {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(null, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(null, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isEqualTo(true);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CANCELLED);
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
@@ -63,7 +63,7 @@ public class FlowConcurrencyCaseTest {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(null, "io.kestra.tests", "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(null, "io.kestra.tests", "flow-concurrency-fail");
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isEqualTo(true);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
@@ -90,7 +90,7 @@ public class FlowConcurrencyCaseTest {
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isEqualTo(true);
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
@@ -170,7 +170,7 @@ public class FlowConcurrencyCaseTest {
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isPaused()).isEqualTo(true);
|
||||
assertThat(execution1.getState().isPaused()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
assertTrue(firstExecutionLatch.await(10, TimeUnit.SECONDS));
|
||||
@@ -222,7 +222,7 @@ public class FlowConcurrencyCaseTest {
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isPaused()).isEqualTo(true);
|
||||
assertThat(execution1.getState().isPaused()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
assertTrue(firstExecLatch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
@@ -51,15 +51,15 @@ abstract public class FlowListenersTest {
|
||||
|
||||
// initial state
|
||||
wait(ref, () -> {
|
||||
assertThat(count.get()).isEqualTo(0);
|
||||
assertThat(flowListenersService.flows().size()).isEqualTo(0);
|
||||
assertThat(count.get()).isZero();
|
||||
assertThat(flowListenersService.flows().size()).isZero();
|
||||
});
|
||||
|
||||
// resend on startup done for kafka
|
||||
if (flowListenersService.getClass().getName().equals("io.kestra.ee.runner.kafka.KafkaFlowListeners")) {
|
||||
wait(ref, () -> {
|
||||
assertThat(count.get()).isEqualTo(0);
|
||||
assertThat(flowListenersService.flows().size()).isEqualTo(0);
|
||||
assertThat(count.get()).isZero();
|
||||
assertThat(flowListenersService.flows().size()).isZero();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ public class InputsTest {
|
||||
HashMap<String, Object> inputsWithMissingOptionalInput = new HashMap<>(inputs);
|
||||
inputsWithMissingOptionalInput.remove("bool");
|
||||
|
||||
assertThat(typedInputs(inputsWithMissingOptionalInput).containsKey("bool")).isEqualTo(true);
|
||||
assertThat(typedInputs(inputsWithMissingOptionalInput).containsKey("bool")).isTrue();
|
||||
assertThat(typedInputs(inputsWithMissingOptionalInput).get("bool")).isNull();
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ public class InputsTest {
|
||||
assertThat(typeds.get("string")).isEqualTo("myString");
|
||||
assertThat(typeds.get("int")).isEqualTo(42);
|
||||
assertThat(typeds.get("float")).isEqualTo(42.42F);
|
||||
assertThat(typeds.get("bool")).isEqualTo(false);
|
||||
assertThat((Boolean) typeds.get("bool")).isFalse();
|
||||
assertThat(typeds.get("instant")).isEqualTo(Instant.parse("2019-10-06T18:27:49Z"));
|
||||
assertThat(typeds.get("instantDefaults")).isEqualTo(Instant.parse("2013-08-09T14:19:00Z"));
|
||||
assertThat(typeds.get("date")).isEqualTo(LocalDate.parse("2019-10-06"));
|
||||
@@ -143,7 +143,7 @@ public class InputsTest {
|
||||
assertThat(typeds.get("json")).isEqualTo(Map.of("a", "b"));
|
||||
assertThat(typeds.get("uri")).isEqualTo("https://www.google.com");
|
||||
assertThat(((Map<String, Object>) typeds.get("nested")).get("string")).isEqualTo("a string");
|
||||
assertThat(((Map<String, Object>) typeds.get("nested")).get("bool")).isEqualTo(true);
|
||||
assertThat((Boolean) ((Map<String, Object>) typeds.get("nested")).get("bool")).isTrue();
|
||||
assertThat(((Map<String, Object>) ((Map<String, Object>) typeds.get("nested")).get("more")).get("int")).isEqualTo(123);
|
||||
assertThat(typeds.get("validatedString")).isEqualTo("A123");
|
||||
assertThat(typeds.get("validatedInt")).isEqualTo(12);
|
||||
@@ -173,7 +173,7 @@ public class InputsTest {
|
||||
assertThat(typeds.get("enum")).isEqualTo("ENUM_VALUE");
|
||||
assertThat(typeds.get("int")).isEqualTo(42);
|
||||
assertThat(typeds.get("float")).isEqualTo(42.42F);
|
||||
assertThat(typeds.get("bool")).isEqualTo(false);
|
||||
assertThat((Boolean) typeds.get("bool")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -345,7 +345,7 @@ public class InputsTest {
|
||||
Map<String, Object> typeds = typedInputs(map);
|
||||
|
||||
assertThat(typeds.get("json")).isInstanceOf(Map.class);
|
||||
assertThat(((Map<?, ?>) typeds.get("json")).size()).isEqualTo(0);
|
||||
assertThat(((Map<?, ?>) typeds.get("json")).size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -366,7 +366,7 @@ public class InputsTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
assertThat(execution.getInputs().get("json")).isInstanceOf(Map.class);
|
||||
assertThat(((Map<?, ?>) execution.getInputs().get("json")).size()).isEqualTo(0);
|
||||
assertThat(((Map<?, ?>) execution.getInputs().get("json")).size()).isZero();
|
||||
assertThat((String) execution.findTaskRunsByTaskId("jsonOutput").getFirst().getOutputs().get("value")).isEqualTo("{}");
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ class LocalWorkingDirTest {
|
||||
String workingDirId = IdUtils.create();
|
||||
TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId));
|
||||
Path tempFile = workingDirectory.createTempFile();
|
||||
assertThat(tempFile.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
|
||||
assertThat(tempFile.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
|
||||
assertThat(workingDirectory.getAllCreatedTempFiles().size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -62,8 +62,8 @@ class LocalWorkingDirTest {
|
||||
TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId));
|
||||
Path path = workingDirectory.createFile("folder/file.txt");
|
||||
|
||||
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
|
||||
assertThat(path.toFile().getAbsolutePath().endsWith("/folder/file.txt")).isEqualTo(true);
|
||||
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
|
||||
assertThat(path.toFile().getAbsolutePath().endsWith("/folder/file.txt")).isTrue();
|
||||
assertThat(workingDirectory.getAllCreatedFiles().size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@@ -111,13 +111,13 @@ class LocalWorkingDirTest {
|
||||
workingDir.cleanup();
|
||||
|
||||
// Then
|
||||
assertThat(file.toFile().exists()).isEqualTo(false);
|
||||
assertThat(firtPath.toFile().exists()).isEqualTo(false);
|
||||
assertThat(file.toFile().exists()).isFalse();
|
||||
assertThat(firtPath.toFile().exists()).isFalse();
|
||||
|
||||
// When
|
||||
Path secondPath = workingDir.path(true);
|
||||
// Then
|
||||
assertThat(secondPath.toFile().exists()).isEqualTo(true);
|
||||
assertThat(secondPath.toFile().exists()).isTrue();
|
||||
assertThat(firtPath).isEqualTo(secondPath);
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/inputs.yaml"})
|
||||
void secretInput() {
|
||||
assertThat(flowRepository.findById(null, "io.kestra.tests", "inputs").isPresent()).isEqualTo(true);
|
||||
assertThat(flowRepository.findById(null, "io.kestra.tests", "inputs").isPresent()).isTrue();
|
||||
|
||||
Flow flow = flowRepository.findById(null, "io.kestra.tests", "inputs").get();
|
||||
Execution execution = Execution.builder()
|
||||
|
||||
@@ -39,6 +39,6 @@ public class NullOutputTest {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getTaskRunList()).hasSize(1);
|
||||
assertThat(execution.getTaskRunList().getFirst().getOutputs()).hasSize(1);
|
||||
assertThat(execution.getTaskRunList().getFirst().getOutputs().containsKey("value")).isEqualTo(true);
|
||||
assertThat(execution.getTaskRunList().getFirst().getOutputs().containsKey("value")).isTrue();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,6 @@ class WorkingDirFactoryTest {
|
||||
// When
|
||||
Path path = workingDirectory.path();
|
||||
// Then
|
||||
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isEqualTo(true);
|
||||
assertThat(path.toFile().getAbsolutePath().startsWith("/tmp/sub/dir/tmp/")).isTrue();
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,6 @@ class EndsWithFilterTest {
|
||||
variableRenderer.render("{{ \"Hello World\" | endsWith(\"World\") }}", Map.of())
|
||||
);
|
||||
|
||||
assertThat(render).isEqualTo(true);
|
||||
assertThat(render).isTrue();
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,6 @@ public class Md5FilterTest {
|
||||
void out() throws IllegalVariableEvaluationException {
|
||||
String render = variableRenderer.render("{{ \"hello\" | md5 }}", Map.of());
|
||||
|
||||
assertThat(render.equals("hello")).isEqualTo(false);
|
||||
assertThat(render.equals("hello")).isFalse();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,6 @@ public class Sha1FilterTest {
|
||||
void out() throws IllegalVariableEvaluationException {
|
||||
String render = variableRenderer.render("{{ \"hello\" | sha1 }}", Map.of());
|
||||
|
||||
assertThat(render.equals("hello")).isEqualTo(false);
|
||||
assertThat(render.equals("hello")).isFalse();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,6 @@ public class Sha512FilterTest {
|
||||
void out() throws IllegalVariableEvaluationException {
|
||||
String render = variableRenderer.render("{{ \"hello\" | sha512 }}", Map.of());
|
||||
|
||||
assertThat(render.equals("hello")).isEqualTo(false);
|
||||
assertThat(render.equals("hello")).isFalse();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,6 @@ class StartsWithFilterTest {
|
||||
variableRenderer.render("{{ \"Hello World\" | startsWith(\"Hello\") }}", Map.of())
|
||||
);
|
||||
|
||||
assertThat(render).isEqualTo(true);
|
||||
assertThat(render).isTrue();
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user