mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
125 Commits
fix/sdk-ch
...
v0.18.22
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbc530e98f | ||
|
|
fc98bc7c4e | ||
|
|
4704d69c27 | ||
|
|
980efcd7c0 | ||
|
|
7ca4b8835b | ||
|
|
166efd6762 | ||
|
|
33cd2b5f63 | ||
|
|
551cab8f94 | ||
|
|
77673a8b6b | ||
|
|
3f552713a2 | ||
|
|
91a0fafd61 | ||
|
|
ff975faa05 | ||
|
|
8113a0b8b2 | ||
|
|
578f264214 | ||
|
|
d89ee01f12 | ||
|
|
e83b9fe1c3 | ||
|
|
427d682439 | ||
|
|
b36cfb6f6b | ||
|
|
58fc4a72ff | ||
|
|
20f4a51aca | ||
|
|
bdef59632f | ||
|
|
ac678b88e4 | ||
|
|
6683cb21b7 | ||
|
|
67ad4800e3 | ||
|
|
2f56df4031 | ||
|
|
f838f58068 | ||
|
|
963b0ca0a5 | ||
|
|
977fff5e7f | ||
|
|
38d221e630 | ||
|
|
07a10f3797 | ||
|
|
1f2c87a11b | ||
|
|
2cf9104a31 | ||
|
|
75887213c0 | ||
|
|
e90f8aa4df | ||
|
|
dd393f8e36 | ||
|
|
d9d9bdd3a9 | ||
|
|
6f638df047 | ||
|
|
a185f44f01 | ||
|
|
fac26d1dec | ||
|
|
351269a4c5 | ||
|
|
330b26d920 | ||
|
|
ad3ac7b4ab | ||
|
|
73735144aa | ||
|
|
5669cf5c77 | ||
|
|
45b1c18b48 | ||
|
|
7166addcec | ||
|
|
ea72c20388 | ||
|
|
bfe17d0a66 | ||
|
|
a3430c1185 | ||
|
|
cc6ccc1fff | ||
|
|
b4999c7cea | ||
|
|
31823ca6d8 | ||
|
|
1ba67db2dd | ||
|
|
561a71e862 | ||
|
|
7f39623930 | ||
|
|
bc5d2032ea | ||
|
|
7772aef729 | ||
|
|
44938e907a | ||
|
|
86c4388ac0 | ||
|
|
384a1b8b9b | ||
|
|
7f66d57eed | ||
|
|
e38a04fc64 | ||
|
|
1d08bda2c9 | ||
|
|
b78d04b359 | ||
|
|
83d0705fe4 | ||
|
|
d1c606b8e1 | ||
|
|
3bca40a82d | ||
|
|
45070bb9fb | ||
|
|
b0254f3997 | ||
|
|
788dc1376a | ||
|
|
1b0e3cbbd6 | ||
|
|
5bbc235677 | ||
|
|
e0f8fcbcde | ||
|
|
87a667dc27 | ||
|
|
81d54ef423 | ||
|
|
abe2086dbd | ||
|
|
7c0b0b14ec | ||
|
|
7061eef48c | ||
|
|
3c35c8a48f | ||
|
|
0d4e8f2c26 | ||
|
|
e1b7f1ce16 | ||
|
|
095e29ee18 | ||
|
|
4ede173331 | ||
|
|
442306f82e | ||
|
|
19fda0d1a4 | ||
|
|
549e3d7bec | ||
|
|
880bc67756 | ||
|
|
249bae8a01 | ||
|
|
089b9ee0ca | ||
|
|
615d42db62 | ||
|
|
a70ca97912 | ||
|
|
f2718797d0 | ||
|
|
04ce98e0ef | ||
|
|
d9e18d5e13 | ||
|
|
b9aafb37d4 | ||
|
|
6c9833936a | ||
|
|
d48f5407e8 | ||
|
|
6e3e4810f9 | ||
|
|
8afaec885c | ||
|
|
afc76509f2 | ||
|
|
63a8b69221 | ||
|
|
92f5b3face | ||
|
|
1a3343ff6c | ||
|
|
73a82d1cd5 | ||
|
|
f1ce76ff9f | ||
|
|
81817291ee | ||
|
|
9646a42ea7 | ||
|
|
1253d96c8a | ||
|
|
3e40a56c1c | ||
|
|
de2467012c | ||
|
|
a38bf61c3b | ||
|
|
92a323a36c | ||
|
|
d102033a2f | ||
|
|
654d62118c | ||
|
|
f9186b29b4 | ||
|
|
2ee9b9f069 | ||
|
|
d57d9dd3e8 | ||
|
|
ae15cef7b7 | ||
|
|
0e46055884 | ||
|
|
f91b070dca | ||
|
|
d964d49861 | ||
|
|
dd45545202 | ||
|
|
8d2589485f | ||
|
|
78069b45f8 | ||
|
|
7a13ed79d8 |
17
.github/dependabot.yml
vendored
17
.github/dependabot.yml
vendored
@@ -22,12 +22,21 @@ updates:
|
||||
- "dependency-upgrade"
|
||||
open-pull-requests-limit: 50
|
||||
|
||||
# Maintain dependencies for Npm modules
|
||||
# Maintain dependencies for NPM modules
|
||||
- package-ecosystem: "npm"
|
||||
directory: "/ui"
|
||||
schedule:
|
||||
# Check for updates to Npm modules every week
|
||||
interval: "weekly"
|
||||
labels:
|
||||
- "dependency-upgrade"
|
||||
day: "monday"
|
||||
time: "08:00"
|
||||
timezone: "Europe/Paris"
|
||||
open-pull-requests-limit: 50
|
||||
labels: ["dependency-upgrade"]
|
||||
reviewers: ["MilosPaunovic"]
|
||||
ignore:
|
||||
# Ignore updates of version 1.x, as we're using beta of 2.x
|
||||
- dependency-name: "vue-virtual-scroller"
|
||||
versions: ["1.x"]
|
||||
# Ignore major versions greater than 8, as it's still known to be flaky
|
||||
- dependency-name: "eslint"
|
||||
versions: [">8"]
|
||||
|
||||
5
.github/workflows/docker.yml
vendored
5
.github/workflows/docker.yml
vendored
@@ -109,6 +109,11 @@ jobs:
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
- name: Retag latest to latest-full
|
||||
if: ${{ github.event.inputs.retag-latest == 'true' && matrix.image.name == ''}}
|
||||
run: |
|
||||
regctl image copy kestra/kestra:latest kestra/kestra:latest-full
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
7
.github/workflows/main.yml
vendored
7
.github/workflows/main.yml
vendored
@@ -4,6 +4,7 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- develop
|
||||
- releases/*
|
||||
tags:
|
||||
- v*
|
||||
pull_request:
|
||||
@@ -117,10 +118,12 @@ jobs:
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Build & Export Docker Image
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
@@ -275,7 +278,7 @@ jobs:
|
||||
release:
|
||||
name: Github Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ check, check-e2e ]
|
||||
needs: build-artifacts
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
steps:
|
||||
# Download Exec
|
||||
@@ -367,7 +370,7 @@ jobs:
|
||||
maven:
|
||||
name: Publish to Maven
|
||||
runs-on: ubuntu-latest
|
||||
needs: [check, check-e2e]
|
||||
needs: [check]
|
||||
if: github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/tags/v')
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -179,6 +179,8 @@ subprojects {
|
||||
testImplementation 'org.hamcrest:hamcrest'
|
||||
testImplementation 'org.hamcrest:hamcrest-library'
|
||||
testImplementation 'org.exparity:hamcrest-date'
|
||||
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -22,6 +22,7 @@ micronaut:
|
||||
idle-timeout: 60m
|
||||
netty:
|
||||
max-chunk-size: 10MB
|
||||
max-header-size: 32768 # increased from the default of 8k
|
||||
responses:
|
||||
file:
|
||||
cache-seconds: 86400
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.docs;
|
||||
|
||||
import com.google.common.base.CaseFormat;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
@@ -144,6 +145,10 @@ public abstract class AbstractClassDocumentation<T> {
|
||||
if (AbstractRetry.class.isAssignableFrom(Class.forName(key))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (TaskRunner.class.isAssignableFrom(Class.forName(key))) {
|
||||
return true;
|
||||
}
|
||||
} catch (ClassNotFoundException ignored) {
|
||||
log.debug(ignored.getMessage(), ignored);
|
||||
}
|
||||
|
||||
@@ -37,13 +37,16 @@ public class Plugin {
|
||||
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup) {
|
||||
Plugin plugin = new Plugin();
|
||||
plugin.name = registeredPlugin.name();
|
||||
PluginSubGroup subGroupInfos = null;
|
||||
if (subgroup == null) {
|
||||
plugin.title = registeredPlugin.title();
|
||||
} else {
|
||||
plugin.title = subgroup.substring(subgroup.lastIndexOf('.') + 1);
|
||||
subGroupInfos = registeredPlugin.allClass().stream().filter(c -> c.getName().contains(subgroup)).map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class)).toList().getFirst();
|
||||
plugin.title = !subGroupInfos.title().isEmpty() ? subGroupInfos.title() : subgroup.substring(subgroup.lastIndexOf('.') + 1);;
|
||||
|
||||
}
|
||||
plugin.group = registeredPlugin.group();
|
||||
plugin.description = registeredPlugin.description();
|
||||
plugin.description = subGroupInfos != null && !subGroupInfos.description().isEmpty() ? subGroupInfos.description() : registeredPlugin.description();
|
||||
plugin.license = registeredPlugin.license();
|
||||
plugin.longDescription = registeredPlugin.longDescription();
|
||||
plugin.version = registeredPlugin.version();
|
||||
@@ -59,7 +62,9 @@ public class Plugin {
|
||||
e.getValue().toString()
|
||||
))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
plugin.categories = registeredPlugin
|
||||
plugin.categories = subGroupInfos != null ?
|
||||
Arrays.stream(subGroupInfos.categories()).toList() :
|
||||
registeredPlugin
|
||||
.allClass()
|
||||
.stream()
|
||||
.map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class))
|
||||
|
||||
@@ -61,14 +61,13 @@ public class LogEntry implements DeletedInterface, TenantInterface {
|
||||
@Builder.Default
|
||||
boolean deleted = false;
|
||||
|
||||
public static List<String> findLevelsByMin(Level minLevel) {
|
||||
public static List<Level> findLevelsByMin(Level minLevel) {
|
||||
if (minLevel == null) {
|
||||
return Arrays.stream(Level.values()).map(Enum::name).toList();
|
||||
return Arrays.asList(Level.values());
|
||||
}
|
||||
|
||||
return Arrays.stream(Level.values())
|
||||
.filter(level -> level.toInt() >= minLevel.toInt())
|
||||
.map(Enum::name)
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
@@ -209,6 +209,7 @@ public class Flow extends AbstractFlow {
|
||||
public List<String> allTriggerIds() {
|
||||
return this.triggers != null ? this.triggers.stream()
|
||||
.map(AbstractTrigger::getId)
|
||||
.filter(id -> id != null) // this can happen when validation a flow under creation
|
||||
.collect(Collectors.toList()) : Collections.emptyList();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
@@ -39,7 +38,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
|
||||
@JsonSubTypes.Type(value = MultiselectInput.class, name = "MULTISELECT")
|
||||
})
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public abstract class Input<T> implements Data {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
|
||||
@@ -6,7 +6,6 @@ import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -18,11 +17,19 @@ import java.util.List;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class MultiselectInput extends Input<List<String>> implements ItemTypeInterface {
|
||||
@Schema(
|
||||
title = "Deprecated, please use `values` instead."
|
||||
)
|
||||
// @NotNull
|
||||
@Deprecated
|
||||
List<@Regex String> options;
|
||||
|
||||
@Schema(
|
||||
title = "List of values available."
|
||||
)
|
||||
@NotNull
|
||||
List<@Regex String> options;
|
||||
// FIXME: REMOVE `options` in 0.20 and bring back the NotNull
|
||||
// @NotNull
|
||||
List<@Regex String> values;
|
||||
|
||||
@Schema(
|
||||
title = "Type of the different values available.",
|
||||
@@ -33,10 +40,21 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
}
|
||||
|
||||
for(String input : inputs){
|
||||
if (!options.contains(input)) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + options + "`",
|
||||
"it must match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -40,13 +40,17 @@ public interface TaskCommands {
|
||||
TargetOS getTargetOS();
|
||||
|
||||
default List<Path> relativeWorkingDirectoryFilesPaths() throws IOException {
|
||||
return this.relativeWorkingDirectoryFilesPaths(false);
|
||||
}
|
||||
|
||||
default List<Path> relativeWorkingDirectoryFilesPaths(boolean includeDirectories) throws IOException {
|
||||
Path workingDirectory = this.getWorkingDirectory();
|
||||
if (workingDirectory == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
try (Stream<Path> walk = Files.walk(workingDirectory)) {
|
||||
Stream<Path> filtered = walk.filter(path -> !Files.isDirectory(path));
|
||||
Stream<Path> filtered = includeDirectories ? walk : walk.filter(path -> !Files.isDirectory(path));
|
||||
Path outputDirectory = this.getOutputDirectory();
|
||||
if (outputDirectory != null) {
|
||||
filtered = filtered.filter(Predicate.not(path -> path.startsWith(outputDirectory)));
|
||||
|
||||
@@ -23,9 +23,6 @@ public class Trigger extends TriggerContext {
|
||||
@Nullable
|
||||
private String executionId;
|
||||
|
||||
@Nullable
|
||||
private State.Type executionCurrentState;
|
||||
|
||||
@Nullable
|
||||
private Instant updatedDate;
|
||||
|
||||
@@ -39,7 +36,6 @@ public class Trigger extends TriggerContext {
|
||||
protected Trigger(TriggerBuilder<?, ?> b) {
|
||||
super(b);
|
||||
this.executionId = b.executionId;
|
||||
this.executionCurrentState = b.executionCurrentState;
|
||||
this.updatedDate = b.updatedDate;
|
||||
this.evaluateRunningDate = b.evaluateRunningDate;
|
||||
}
|
||||
@@ -141,7 +137,6 @@ public class Trigger extends TriggerContext {
|
||||
.date(trigger.getDate())
|
||||
.nextExecutionDate(trigger.getNextExecutionDate())
|
||||
.executionId(execution.getId())
|
||||
.executionCurrentState(execution.getState().getCurrent())
|
||||
.updatedDate(Instant.now())
|
||||
.backfill(trigger.getBackfill())
|
||||
.stopAfter(trigger.getStopAfter())
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.function.Predicate;
|
||||
* @see io.kestra.core.plugins.serdes.PluginDeserializer
|
||||
* @see PluginScanner
|
||||
*/
|
||||
public final class DefaultPluginRegistry implements PluginRegistry {
|
||||
public class DefaultPluginRegistry implements PluginRegistry {
|
||||
|
||||
private static class LazyHolder {
|
||||
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
|
||||
@@ -43,7 +43,7 @@ public final class DefaultPluginRegistry implements PluginRegistry {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private DefaultPluginRegistry() {
|
||||
protected DefaultPluginRegistry() {
|
||||
}
|
||||
|
||||
private boolean isInitialized() {
|
||||
@@ -53,7 +53,7 @@ public final class DefaultPluginRegistry implements PluginRegistry {
|
||||
/**
|
||||
* Initializes the registry by loading all core plugins.
|
||||
*/
|
||||
private void init() {
|
||||
protected void init() {
|
||||
if (initialized.compareAndSet(false, true)) {
|
||||
register(scanner.scan());
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ public interface FlowRepositoryInterface {
|
||||
execution.getTenantId(),
|
||||
execution.getNamespace(),
|
||||
execution.getFlowId(),
|
||||
Optional.of(execution.getFlowRevision())
|
||||
Optional.ofNullable(execution.getFlowRevision())
|
||||
);
|
||||
|
||||
if (find.isEmpty()) {
|
||||
@@ -50,7 +50,7 @@ public interface FlowRepositoryInterface {
|
||||
execution.getTenantId(),
|
||||
execution.getNamespace(),
|
||||
execution.getFlowId(),
|
||||
Optional.of(execution.getFlowRevision())
|
||||
Optional.ofNullable(execution.getFlowRevision())
|
||||
);
|
||||
|
||||
if (find.isEmpty()) {
|
||||
|
||||
@@ -144,18 +144,16 @@ public class DefaultRunContext extends RunContext {
|
||||
@Override
|
||||
public DefaultRunContext clone() {
|
||||
DefaultRunContext runContext = new DefaultRunContext();
|
||||
runContext.variableRenderer = this.variableRenderer;
|
||||
runContext.applicationContext = this.applicationContext;
|
||||
runContext.storageInterface = this.storageInterface;
|
||||
runContext.storage = this.storage;
|
||||
runContext.variables = new HashMap<>(this.variables);
|
||||
runContext.metrics = new ArrayList<>();
|
||||
runContext.meterRegistry = this.meterRegistry;
|
||||
runContext.workingDir = this.workingDir;
|
||||
runContext.logger = this.logger;
|
||||
runContext.metrics = new ArrayList<>();
|
||||
runContext.storage = this.storage;
|
||||
runContext.pluginConfiguration = this.pluginConfiguration;
|
||||
runContext.version = version;
|
||||
runContext.isInitialized.set(this.isInitialized.get());
|
||||
if (this.isInitialized.get()) {
|
||||
//Inject all services
|
||||
runContext.init(applicationContext);
|
||||
}
|
||||
return runContext;
|
||||
}
|
||||
|
||||
|
||||
@@ -358,11 +358,11 @@ public class ExecutorService {
|
||||
.withState(executor.getExecution().guessFinalState(flow));
|
||||
|
||||
if (flow.getOutputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
try {
|
||||
Map<String, Object> outputs = flow.getOutputs()
|
||||
.stream()
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
outputs = runContext.render(outputs);
|
||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
@@ -374,8 +374,8 @@ public class ExecutorService {
|
||||
"Failed to render output values",
|
||||
e
|
||||
);
|
||||
newExecution = newExecution
|
||||
.withState(State.Type.FAILED);
|
||||
runContext.logger().error("Failed to render output values: {}", e.getMessage(), e);
|
||||
newExecution = newExecution.withState(State.Type.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,15 +58,19 @@ public abstract class FilesService {
|
||||
return inputFiles;
|
||||
}
|
||||
|
||||
public static Map<String, URI> outputFiles(RunContext runContext, List<String> outputs) throws Exception {
|
||||
List<Path> allFilesMatching = runContext.workingDir().findAllFilesMatching(outputs);
|
||||
var outputFiles = allFilesMatching.stream()
|
||||
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
|
||||
runContext.workingDir().path().relativize(path).toString(),
|
||||
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
|
||||
)))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
runContext.logger().info("Captured {} output(s).", allFilesMatching.size());
|
||||
public static Map<String, URI> outputFiles(RunContext runContext, List<String> outputs) throws Exception {
|
||||
List<String> renderedOutputs = outputs != null ? runContext.render(outputs) : null;
|
||||
List<Path> allFilesMatching = runContext.workingDir().findAllFilesMatching(renderedOutputs);
|
||||
var outputFiles = allFilesMatching.stream()
|
||||
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
|
||||
runContext.workingDir().path().relativize(path).toString(),
|
||||
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
|
||||
)))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
if (runContext.logger().isTraceEnabled()) {
|
||||
runContext.logger().trace("Captured {} output(s).", allFilesMatching.size());
|
||||
}
|
||||
|
||||
return outputFiles;
|
||||
}
|
||||
|
||||
@@ -42,6 +42,10 @@ public interface FlowExecutorInterface {
|
||||
* WARNING: this method will NOT check if the namespace is allowed, so it should not be used inside a task.
|
||||
*/
|
||||
default Optional<Flow> findByExecution(Execution execution) {
|
||||
if (execution.getFlowRevision() == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return this.findById(
|
||||
execution.getTenantId(),
|
||||
execution.getNamespace(),
|
||||
|
||||
@@ -290,10 +290,16 @@ public class FlowInputOutput {
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
List<?> asList;
|
||||
if (current instanceof List<?> list) {
|
||||
asList = list;
|
||||
} else {
|
||||
asList = JacksonMapper.toList(((String) current));
|
||||
}
|
||||
|
||||
if (elementType != null) {
|
||||
// recursively parse the elements only once
|
||||
yield JacksonMapper.toList(((String) current))
|
||||
.stream()
|
||||
yield asList.stream()
|
||||
.map(throwFunction(element -> {
|
||||
try {
|
||||
return parseType(execution, elementType, id, null, element);
|
||||
@@ -303,7 +309,7 @@ public class FlowInputOutput {
|
||||
}))
|
||||
.toList();
|
||||
} else {
|
||||
yield JacksonMapper.toList(((String) current));
|
||||
yield asList;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -274,9 +274,10 @@ public final class RunVariables {
|
||||
// adds any additional variables
|
||||
if (variables != null) {
|
||||
builder.putAll(variables);
|
||||
if (logger != null && !variables.containsKey(RunVariables.SECRET_CONSUMER_VARIABLE_NAME)) {
|
||||
builder.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) logger::usedSecret);
|
||||
}
|
||||
}
|
||||
|
||||
if (logger != null && (variables == null || !variables.containsKey(RunVariables.SECRET_CONSUMER_VARIABLE_NAME))) {
|
||||
builder.put(RunVariables.SECRET_CONSUMER_VARIABLE_NAME, (Consumer<String>) logger::usedSecret);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
|
||||
@@ -61,6 +61,7 @@ public class Extension extends AbstractExtension {
|
||||
return operators;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Map<String, Filter> getFilters() {
|
||||
Map<String, Filter> filters = new HashMap<>();
|
||||
@@ -104,6 +105,7 @@ public class Extension extends AbstractExtension {
|
||||
return tests;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Map<String, Function> getFunctions() {
|
||||
Map<String, Function> functions = new HashMap<>();
|
||||
|
||||
@@ -10,11 +10,8 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
@Deprecated
|
||||
public class JsonFilter extends ToJsonFilter {
|
||||
|
||||
@Override
|
||||
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
|
||||
log.warn("The 'json' filter is deprecated, please use 'toJson' instead");
|
||||
|
||||
return super.apply(input, args, self, context, lineNumber);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,8 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
@Deprecated
|
||||
public class JsonFunction extends FromJsonFunction {
|
||||
|
||||
@Override
|
||||
public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
|
||||
log.warn("The 'json' function is deprecated, please use 'fromJson' instead");
|
||||
|
||||
return super.execute(args, self, context, lineNumber);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,12 +73,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
private final WorkerGroupService workerGroupService;
|
||||
private final LogService logService;
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
|
||||
@@ -344,7 +346,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(conditionContext, flow, abstractTrigger, e);
|
||||
return null;
|
||||
}
|
||||
this.triggerState.save(triggerContext, scheduleContext);
|
||||
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
|
||||
} else {
|
||||
triggerContext = lastTrigger;
|
||||
}
|
||||
@@ -433,11 +435,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.peek(f -> {
|
||||
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
|
||||
this.triggerState.unlock(f.getTriggerContext());
|
||||
}
|
||||
})
|
||||
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
|
||||
.filter(this::isExecutionNotRunning)
|
||||
.map(FlowWithWorkerTriggerNextDate::of)
|
||||
@@ -473,7 +470,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
|
||||
var flowWithTrigger = f.toBuilder().triggerContext(triggerRunning).build();
|
||||
try {
|
||||
this.triggerState.save(triggerRunning, scheduleContext);
|
||||
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
|
||||
this.sendWorkerTriggerToWorker(flowWithTrigger);
|
||||
} catch (InternalException e) {
|
||||
logService.logTrigger(
|
||||
@@ -498,7 +495,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
|
||||
);
|
||||
trigger = trigger.checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
|
||||
}
|
||||
} else {
|
||||
logService.logTrigger(
|
||||
@@ -516,9 +513,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(f, e);
|
||||
}
|
||||
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
|
||||
}
|
||||
} catch (InternalException ie) {
|
||||
} catch (Exception ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we send a failed execution so the trigger is not evaluated each second.
|
||||
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
|
||||
@@ -527,12 +524,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.tenantId(f.getTriggerContext().getTenantId())
|
||||
.namespace(f.getTriggerContext().getNamespace())
|
||||
.flowId(f.getTriggerContext().getFlowId())
|
||||
.flowRevision(f.getFlow().getRevision())
|
||||
.labels(f.getFlow().getLabels())
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -572,7 +570,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||
// So we must save them by passing the scheduleContext.
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
|
||||
}
|
||||
|
||||
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
|
||||
@@ -593,8 +591,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
return true;
|
||||
}
|
||||
|
||||
// The execution is not yet started, we skip
|
||||
if (lastTrigger.getExecutionCurrentState() == null) {
|
||||
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
|
||||
|
||||
// executionState hasn't received the execution, we skip
|
||||
if (execution.isEmpty()) {
|
||||
if (lastTrigger.getUpdatedDate() != null) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
||||
@@ -628,7 +628,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Level.DEBUG,
|
||||
"Execution '{}' is still '{}', updated at '{}'",
|
||||
lastTrigger.getExecutionId(),
|
||||
lastTrigger.getExecutionCurrentState(),
|
||||
execution.get().getState().getCurrent(),
|
||||
lastTrigger.getUpdatedDate()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
|
||||
* See AbstractScheduler.handle().
|
||||
*/
|
||||
public interface ScheduleContextInterface {
|
||||
/**
|
||||
* Do trigger retrieval and updating in a single transaction.
|
||||
*/
|
||||
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
|
||||
import java.util.Optional;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> findById(String tenantId, String id) {
|
||||
return executionRepository.findById(tenantId, id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface SchedulerExecutionStateInterface {
|
||||
Optional<Execution> findById(String tenantId, String id);
|
||||
}
|
||||
@@ -20,19 +20,22 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
Trigger create(Trigger trigger) throws ConstraintViolationException;
|
||||
|
||||
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger update(Trigger trigger);
|
||||
|
||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* Used by the JDBC implementation: find triggers in all tenants.
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
/**
|
||||
* Required for Kafka
|
||||
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
/**
|
||||
* Required for Kafka
|
||||
*/
|
||||
void unlock(Trigger trigger);
|
||||
}
|
||||
|
||||
@@ -66,6 +66,10 @@ public class PluginDefaultService {
|
||||
protected List<PluginDefault> mergeAllDefaults(Flow flow) {
|
||||
List<PluginDefault> list = new ArrayList<>();
|
||||
|
||||
if (flow.getPluginDefaults() != null) {
|
||||
list.addAll(flow.getPluginDefaults());
|
||||
}
|
||||
|
||||
if (taskGlobalDefault != null && taskGlobalDefault.getDefaults() != null) {
|
||||
if (warnOnce.compareAndSet(false, true)) {
|
||||
log.warn("Global Task Defaults are deprecated, please use Global Plugin Defaults instead via the 'kestra.plugins.defaults' property.");
|
||||
@@ -77,10 +81,6 @@ public class PluginDefaultService {
|
||||
list.addAll(pluginGlobalDefault.getDefaults());
|
||||
}
|
||||
|
||||
if (flow.getPluginDefaults() != null) {
|
||||
list.addAll(flow.getPluginDefaults());
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@ public class InternalNamespace implements Namespace {
|
||||
**/
|
||||
@Override
|
||||
public List<NamespaceFile> all(final String prefix, final boolean includeDirectories) throws IOException {
|
||||
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath() + "/");
|
||||
URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath().toString().replace("\\","/") + "/");
|
||||
return storage.allByPrefix(tenant, namespacePrefix, includeDirectories)
|
||||
.stream()
|
||||
.map(uri -> new NamespaceFile(relativize(uri), uri, namespace))
|
||||
|
||||
@@ -561,6 +561,11 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
|
||||
// get the list of splits from the outputs of the split task
|
||||
String taskId = this.id.substring(0, this.id.lastIndexOf('_')) + ForEachItemExecutable.SUFFIX;
|
||||
var taskOutput = extractOutput(runContext, taskId);
|
||||
if (taskOutput == null) {
|
||||
// there were no subflow executions
|
||||
return null;
|
||||
}
|
||||
|
||||
Integer iterations = (Integer) taskOutput.get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES);
|
||||
String subflowOutputsBaseUri = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import io.kestra.core.models.tasks.VoidOutput;
|
||||
import io.kestra.core.runners.FlowableUtils;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.TruthUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.EqualsAndHashCode;
|
||||
@@ -158,7 +159,7 @@ public class If extends Task implements FlowableTask<VoidOutput> {
|
||||
@Override
|
||||
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
List<ResolvedTask> childTask = this.childTasks(runContext, parentTaskRun);
|
||||
if (childTask == null) {
|
||||
if (ListUtils.isEmpty(childTask)) {
|
||||
// no next task to run, we guess the state from the parent task
|
||||
return Optional.of(execution.guessFinalState(null, parentTaskRun, this.isAllowFailure()));
|
||||
}
|
||||
|
||||
@@ -11,31 +11,15 @@ import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowExecutorInterface;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -154,6 +138,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
|
||||
if (this.labels != null) {
|
||||
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
|
||||
labels.removeIf(label -> label.key().equals(entry.getKey()));
|
||||
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
||||
return false;
|
||||
}
|
||||
|
||||
Integer iterationCount = (Integer) parentTaskRun.getOutputs().get("iterationCount");
|
||||
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
|
||||
.map(outputs -> (Integer) outputs.get("iterationCount"))
|
||||
.orElse(0);
|
||||
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
|
||||
if (printLog) {logger.warn("Max iterations reached");}
|
||||
return true;
|
||||
@@ -225,7 +227,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
||||
|
||||
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
String value = parentTaskRun != null ?
|
||||
parentTaskRun.getOutputs().get("iterationCount").toString() : "0";
|
||||
String.valueOf(Optional.ofNullable(parentTaskRun.getOutputs())
|
||||
.map(outputs -> outputs.get("iterationCount"))
|
||||
.orElse("0")) : "0";
|
||||
|
||||
return Output.builder()
|
||||
.iterationCount(Integer.parseInt(value) + 1)
|
||||
|
||||
@@ -28,11 +28,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@@ -64,14 +60,14 @@ import jakarta.validation.constraints.NotNull;
|
||||
full = true,
|
||||
title = "Clone a Git repository into the Working Directory and run a Python script in a Docker container.",
|
||||
code = """
|
||||
id: gitPython
|
||||
id: git_python
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: wdir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: cloneRepository
|
||||
- id: clone_repository
|
||||
type: io.kestra.plugin.git.Clone
|
||||
url: https://github.com/kestra-io/examples
|
||||
branch: main
|
||||
@@ -82,100 +78,103 @@ import jakarta.validation.constraints.NotNull;
|
||||
type: io.kestra.plugin.scripts.runner.docker.Docker
|
||||
containerImage: ghcr.io/kestra-io/pydata:latest
|
||||
commands:
|
||||
- python scripts/etl_script.py"""
|
||||
- python scripts/etl_script.py
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Add input and output files within a Working Directory to use them in a Python script.",
|
||||
code = """
|
||||
id: apiJSONtoMongoDB
|
||||
id: api_json_to_mongodb
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: wdir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
outputFiles:
|
||||
- output.json
|
||||
inputFiles:
|
||||
query.sql: |
|
||||
SELECT sum(total) as total, avg(quantity) as avg_quantity
|
||||
FROM sales;
|
||||
tasks:
|
||||
- id: inlineScript
|
||||
type: io.kestra.plugin.scripts.python.Script
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.scripts.runner.docker.Docker
|
||||
containerImage: python:3.11-slim
|
||||
beforeCommands:
|
||||
- pip install requests kestra > /dev/null
|
||||
warningOnStdErr: false
|
||||
script: |
|
||||
import requests
|
||||
import json
|
||||
from kestra import Kestra
|
||||
- id: wdir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
outputFiles:
|
||||
- output.json
|
||||
inputFiles:
|
||||
query.sql: |
|
||||
SELECT sum(total) as total, avg(quantity) as avg_quantity
|
||||
FROM sales;
|
||||
tasks:
|
||||
- id: inline_script
|
||||
type: io.kestra.plugin.scripts.python.Script
|
||||
taskRunner:
|
||||
type: io.kestra.plugin.scripts.runner.docker.Docker
|
||||
containerImage: python:3.11-slim
|
||||
beforeCommands:
|
||||
- pip install requests kestra > /dev/null
|
||||
warningOnStdErr: false
|
||||
script: |
|
||||
import requests
|
||||
import json
|
||||
from kestra import Kestra
|
||||
|
||||
with open('query.sql', 'r') as input_file:
|
||||
sql = input_file.read()
|
||||
with open('query.sql', 'r') as input_file:
|
||||
sql = input_file.read()
|
||||
|
||||
response = requests.get('https://api.github.com')
|
||||
data = response.json()
|
||||
response = requests.get('https://api.github.com')
|
||||
data = response.json()
|
||||
|
||||
with open('output.json', 'w') as output_file:
|
||||
json.dump(data, output_file)
|
||||
with open('output.json', 'w') as output_file:
|
||||
json.dump(data, output_file)
|
||||
|
||||
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
|
||||
Kestra.outputs({'receivedSQL': sql, 'status': response.status_code})
|
||||
|
||||
- id: loadToMongoDB
|
||||
type: io.kestra.plugin.mongodb.Load
|
||||
connection:
|
||||
uri: mongodb://host.docker.internal:27017/
|
||||
database: local
|
||||
collection: github
|
||||
from: "{{ outputs.wdir.uris['output.json'] }}"
|
||||
- id: load_to_mongodb
|
||||
type: io.kestra.plugin.mongodb.Load
|
||||
connection:
|
||||
uri: mongodb://host.docker.internal:27017/
|
||||
database: local
|
||||
collection: github
|
||||
from: "{{ outputs.wdir.uris['output.json'] }}"
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
code = {
|
||||
"id: working-directory",
|
||||
"namespace: company.team",
|
||||
"",
|
||||
"tasks:",
|
||||
" - id: working-directory",
|
||||
" type: io.kestra.plugin.core.flow.WorkingDirectory",
|
||||
" tasks:",
|
||||
" - id: first",
|
||||
" type: io.kestra.plugin.scripts.shell.Commands",
|
||||
" commands:",
|
||||
" - 'echo \"{{ taskrun.id }}\" > {{ workingDir }}/stay.txt'",
|
||||
" - id: second",
|
||||
" type: io.kestra.plugin.scripts.shell.Commands",
|
||||
" commands:",
|
||||
" - |",
|
||||
" echo '::{\"outputs\": {\"stay\":\"'$(cat {{ workingDir }}/stay.txt)'\"}}::'"
|
||||
}
|
||||
code = """
|
||||
id: working_directory
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: working_directory
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: first
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
|
||||
- id: second
|
||||
type: io.kestra.plugin.scripts.shell.Commands
|
||||
commands:
|
||||
- |
|
||||
echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::''
|
||||
"""
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "A working directory with a cache of the node_modules directory.",
|
||||
code = """
|
||||
id: node-with-cache
|
||||
id: node_with_cache
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: working-dir
|
||||
- id: working_dir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
cache:
|
||||
patterns:
|
||||
- node_modules/**
|
||||
ttl: PT1H
|
||||
tasks:
|
||||
- id: script
|
||||
type: io.kestra.plugin.scripts.node.Script
|
||||
beforeCommands:
|
||||
- npm install colors
|
||||
script: |
|
||||
const colors = require("colors");
|
||||
console.log(colors.red("Hello"));"""
|
||||
- id: script
|
||||
type: io.kestra.plugin.scripts.node.Script
|
||||
beforeCommands:
|
||||
- npm install colors
|
||||
script: |
|
||||
const colors = require("colors");
|
||||
console.log(colors.red("Hello"));
|
||||
"""
|
||||
)
|
||||
},
|
||||
aliases = {"io.kestra.core.tasks.flows.WorkingDirectory", "io.kestra.core.tasks.flows.Worker"}
|
||||
@@ -263,7 +262,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
}
|
||||
}
|
||||
|
||||
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(this.namespaceFiles.getEnabled())) {
|
||||
runContext.storage()
|
||||
.namespace()
|
||||
.findAllFilesMatching(this.namespaceFiles.getInclude(), this.namespaceFiles.getExclude())
|
||||
@@ -333,9 +332,9 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
}
|
||||
|
||||
archive.finish();
|
||||
File archiveFile = File.createTempFile("archive", ".zip");
|
||||
Files.write(archiveFile.toPath(), bos.toByteArray());
|
||||
URI uri = runContext.storage().putCacheFile(archiveFile, getId(), taskRun.getValue());
|
||||
Path archiveFile = runContext.workingDir().createTempFile( ".zip");
|
||||
Files.write(archiveFile, bos.toByteArray());
|
||||
URI uri = runContext.storage().putCacheFile(archiveFile.toFile(), getId(), taskRun.getValue());
|
||||
runContext.logger().debug("Caching in {}", uri);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -46,7 +46,7 @@ import java.util.List;
|
||||
"- TRACE",
|
||||
"- DEBUG",
|
||||
"- INFO",
|
||||
"- WARNING",
|
||||
"- WARN",
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
package io.kestra.plugin.core.trigger;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
@@ -91,7 +95,7 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.labels(flow.getLabels())
|
||||
.labels(generateLabels(runContext, flow))
|
||||
.state(new State())
|
||||
.trigger(ExecutionTrigger.of(
|
||||
this,
|
||||
@@ -128,6 +132,34 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
||||
}
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
|
||||
final List<Label> labels = new ArrayList<>();
|
||||
|
||||
if (flow.getLabels() != null) {
|
||||
labels.addAll(flow.getLabels()); // no need for rendering
|
||||
}
|
||||
|
||||
if (this.getLabels() != null) {
|
||||
for (Label label : this.getLabels()) {
|
||||
final var value = renderLabelValue(runContext, label);
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
}
|
||||
|
||||
private String renderLabelValue(RunContext runContext, Label label) {
|
||||
try {
|
||||
return runContext.render(label.value());
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
|
||||
import com.cronutils.model.time.ExecutionTime;
|
||||
import com.cronutils.parser.CronParser;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
@@ -361,14 +362,13 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(labels)
|
||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
return Optional.of(execution);
|
||||
@@ -402,7 +402,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
} else {
|
||||
variables = scheduleDates.toMap();
|
||||
}
|
||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
||||
List<Label> labels = generateLabels(runContext, conditionContext, backfill);
|
||||
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
|
||||
|
||||
@@ -442,19 +442,29 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
return parser.parse(this.cron);
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(ConditionContext conditionContext, Backfill backfill) {
|
||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = new ArrayList<>();
|
||||
|
||||
if (conditionContext.getFlow().getLabels() != null) {
|
||||
labels.addAll(conditionContext.getFlow().getLabels());
|
||||
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
labels.addAll(backfill.getLabels());
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.getLabels() != null) {
|
||||
labels.addAll(this.getLabels());
|
||||
for (Label label : this.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
|
||||
@@ -5,10 +5,8 @@ import com.google.common.collect.ImmutableList;
|
||||
import io.kestra.core.Helpers;
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.input.StringInput;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
@@ -55,6 +53,9 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
protected ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
private LocalFlowRepositoryLoader repositoryLoader;
|
||||
|
||||
@@ -546,6 +547,67 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByExecution() {
|
||||
Flow flow = builder()
|
||||
.revision(1)
|
||||
.build();
|
||||
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.state(new State())
|
||||
.build();
|
||||
execution = executionRepository.save(execution);
|
||||
|
||||
try {
|
||||
Flow full = flowRepository.findByExecution(execution);
|
||||
assertThat(full, notNullValue());
|
||||
assertThat(full.getNamespace(), is(flow.getNamespace()));
|
||||
assertThat(full.getId(), is(flow.getId()));
|
||||
|
||||
full = flowRepository.findByExecutionWithoutAcl(execution);
|
||||
assertThat(full, notNullValue());
|
||||
assertThat(full.getNamespace(), is(flow.getNamespace()));
|
||||
assertThat(full.getId(), is(flow.getId()));
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
executionRepository.delete(execution);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByExecutionNoRevision() {
|
||||
Flow flow = builder()
|
||||
.revision(3)
|
||||
.build();
|
||||
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
|
||||
Execution execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.state(new State())
|
||||
.build();
|
||||
executionRepository.save(execution);
|
||||
|
||||
try {
|
||||
Flow full = flowRepository.findByExecution(execution);
|
||||
assertThat(full, notNullValue());
|
||||
assertThat(full.getNamespace(), is(flow.getNamespace()));
|
||||
assertThat(full.getId(), is(flow.getId()));
|
||||
|
||||
full = flowRepository.findByExecutionWithoutAcl(execution);
|
||||
assertThat(full, notNullValue());
|
||||
assertThat(full.getNamespace(), is(flow.getNamespace()));
|
||||
assertThat(full.getId(), is(flow.getId()));
|
||||
} finally {
|
||||
deleteFlow(flow);
|
||||
executionRepository.delete(execution);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteFlow(Flow flow) {
|
||||
Integer revision = flowRepository.lastRevision(flow.getTenantId(), flow.getNamespace(), flow.getId());
|
||||
flowRepository.delete(flow.toBuilder().revision(revision).build());
|
||||
|
||||
@@ -149,7 +149,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
logRepository.save(log1);
|
||||
|
||||
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", null, null, ZonedDateTime.now().plusMinutes(1));
|
||||
logRepository.deleteByQuery(null, "io.kestra.unittest", "flowId", List.of(Level.TRACE, Level.DEBUG, Level.INFO), null, ZonedDateTime.now().plusMinutes(1));
|
||||
|
||||
find = logRepository.findByExecutionId(null, log1.getExecutionId(), null, Pageable.from(1, 50));
|
||||
assertThat(find.size(), is(0));
|
||||
|
||||
@@ -6,6 +6,7 @@ import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
@@ -49,4 +50,13 @@ class FilesServiceTest {
|
||||
Map<String, URI> outputs = FilesService.outputFiles(runContext, files.keySet().stream().toList());
|
||||
assertThat(outputs.size(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
void renderOutputFiles() throws Exception {
|
||||
RunContext runContext = runContextFactory.of(Map.of("extension", "txt"));
|
||||
Map<String, String> files = FilesService.inputFiles(runContext, Map.of("file.txt", "content"));
|
||||
|
||||
Map<String, URI> outputs = FilesService.outputFiles(runContext, List.of("*.{{extension}}"));
|
||||
assertThat(outputs.size(), is(1));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,12 +8,20 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
class RunVariablesTest {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void shouldGetEmptyVariables() {
|
||||
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger());
|
||||
Assertions.assertEquals(Map.of("envs", Map.of(), "globals", Map.of()), variables);
|
||||
assertThat(variables.size(), is(3));
|
||||
assertThat((Map<String, Object>) variables.get("envs"), is(Map.of()));
|
||||
assertThat((Map<String, Object>) variables.get("globals"), is(Map.of()));
|
||||
assertThat(variables.get("addSecretConsumer"), notNullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -18,13 +18,13 @@ import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -33,6 +33,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
private static Flow createScheduleFlow() {
|
||||
Schedule schedule = Schedule.builder()
|
||||
.id("hourly")
|
||||
@@ -58,6 +61,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(4);
|
||||
|
||||
Flow flow = createScheduleFlow();
|
||||
@@ -74,6 +78,11 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(executionRepositorySpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = new JdbcScheduler(
|
||||
applicationContext,
|
||||
@@ -94,7 +103,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
});
|
||||
|
||||
scheduler.run();
|
||||
queueCount.await(30, TimeUnit.SECONDS);
|
||||
queueCount.await(15, TimeUnit.SECONDS);
|
||||
|
||||
receive.blockLast();
|
||||
|
||||
|
||||
@@ -39,6 +39,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
private SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
private SchedulerExecutionState schedulerExecutionState;
|
||||
|
||||
@Inject
|
||||
private FlowListeners flowListenersService;
|
||||
|
||||
@@ -169,6 +172,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
|
||||
assertThat(queueCount.getCount(), is(0L));
|
||||
assertThat(last.get(), notNullValue());
|
||||
assertThat(last.get().getFlowRevision(), notNullValue());
|
||||
assertThat(last.get().getState().getCurrent(), is(State.Type.FAILED));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
protected QueueInterface<LogEntry> logQueue;
|
||||
@@ -65,7 +68,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.truncatedTo(ChronoUnit.HOURS);
|
||||
}
|
||||
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
@@ -77,6 +80,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(6);
|
||||
CountDownLatch invalidLogCount = new CountDownLatch(1);
|
||||
Set<String> date = new HashSet<>();
|
||||
@@ -111,7 +115,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receiveExecutions = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -171,7 +175,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -205,7 +209,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -250,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -294,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
@@ -325,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.build();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -390,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
// Wait 3s to see if things happen
|
||||
@@ -428,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(2);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -488,7 +492,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
|
||||
@@ -19,20 +19,20 @@ import reactor.core.publisher.Flux;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected FlowListeners flowListenersService;
|
||||
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Test
|
||||
void thread() throws Exception {
|
||||
@@ -53,12 +53,17 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
|
||||
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
|
||||
|
||||
doReturn(Collections.singletonList(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(schedulerExecutionStateSpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// scheduler
|
||||
try (
|
||||
AbstractScheduler scheduler = new JdbcScheduler(
|
||||
|
||||
@@ -123,10 +123,10 @@ class YamlFlowParserTest {
|
||||
void inputs() {
|
||||
Flow flow = this.parse("flows/valids/inputs.yaml");
|
||||
|
||||
assertThat(flow.getInputs().size(), is(27));
|
||||
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(9L));
|
||||
assertThat(flow.getInputs().size(), is(28));
|
||||
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(10L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(1L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(2L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
|
||||
}
|
||||
|
||||
|
||||
@@ -25,12 +25,16 @@ import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
@@ -99,6 +103,51 @@ class PluginDefaultServiceTest {
|
||||
), result);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
void flowDefaultsOverrideGlobalDefaults(boolean flowDefaultForced, boolean globalDefaultForced, String fooValue, String barValue, String bazValue) {
|
||||
final DefaultPrecedenceTester task = DefaultPrecedenceTester.builder()
|
||||
.id("test")
|
||||
.type(DefaultPrecedenceTester.class.getName())
|
||||
.propBaz("taskValue")
|
||||
.build();
|
||||
|
||||
final PluginDefault flowDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), flowDefaultForced, ImmutableMap.of(
|
||||
"propBar", "flowValue",
|
||||
"propBaz", "flowValue"
|
||||
));
|
||||
final PluginDefault globalDefault = new PluginDefault(DefaultPrecedenceTester.class.getName(), globalDefaultForced, ImmutableMap.of(
|
||||
"propFoo", "globalValue",
|
||||
"propBar", "globalValue",
|
||||
"propBaz", "globalValue"
|
||||
));
|
||||
|
||||
final Flow flowWithPluginDefault = Flow.builder()
|
||||
.tasks(Collections.singletonList(task))
|
||||
.pluginDefaults(List.of(flowDefault))
|
||||
.build();
|
||||
|
||||
final PluginGlobalDefaultConfiguration pluginGlobalDefaultConfiguration = new PluginGlobalDefaultConfiguration();
|
||||
pluginGlobalDefaultConfiguration.defaults = List.of(globalDefault);
|
||||
|
||||
pluginDefaultService.pluginGlobalDefault = pluginGlobalDefaultConfiguration;
|
||||
|
||||
final Flow injected = pluginDefaultService.injectDefaults(flowWithPluginDefault);
|
||||
|
||||
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropFoo(), is(fooValue));
|
||||
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBar(), is(barValue));
|
||||
assertThat(((DefaultPrecedenceTester) injected.getTasks().getFirst()).getPropBaz(), is(bazValue));
|
||||
}
|
||||
|
||||
private static Stream<Arguments> flowDefaultsOverrideGlobalDefaults() {
|
||||
return Stream.of(
|
||||
Arguments.of(false, false, "globalValue", "flowValue", "taskValue"),
|
||||
Arguments.of(false, true, "globalValue", "globalValue", "globalValue"),
|
||||
Arguments.of(true, false, "globalValue", "flowValue", "flowValue"),
|
||||
Arguments.of(true, true, "globalValue", "flowValue", "flowValue")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void injectFlowAndGlobals() {
|
||||
DefaultTester task = DefaultTester.builder()
|
||||
@@ -297,4 +346,23 @@ class PluginDefaultServiceTest {
|
||||
private Map<String, String> val;
|
||||
}
|
||||
}
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = "io.kestra.core.services.DefaultPrecedenceTesterAlias")
|
||||
public static class DefaultPrecedenceTester extends Task implements RunnableTask<VoidOutput> {
|
||||
private String propFoo;
|
||||
|
||||
private String propBar;
|
||||
|
||||
private String propBaz;
|
||||
|
||||
@Override
|
||||
public VoidOutput run(RunContext runContext) throws Exception {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,25 +5,33 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.aMapWithSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
class FlowOutputTest extends AbstractMemoryRunnerTest {
|
||||
|
||||
static final String NAMESPACE = "io.kestra.tests";
|
||||
|
||||
@Test
|
||||
void shouldGetSuccessExecutionForFlowFlowWithOutputs() throws TimeoutException {
|
||||
void shouldGetSuccessExecutionForFlowWithOutputs() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs", null, null);
|
||||
assertThat(execution.getOutputs(), aMapWithSize(1));
|
||||
assertThat(execution.getOutputs().get("key"), is("{\"value\":\"flow-with-outputs\"}"));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void shouldGetSuccessExecutionForFlowWithArrayOutputs() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-array-outputs", null, null);
|
||||
assertThat(execution.getOutputs(), aMapWithSize(1));
|
||||
assertThat((List<String>) execution.getOutputs().get("myout"), hasItems("1rstValue", "2ndValue"));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetFailExecutionForFlowWithInvalidOutputs() throws TimeoutException {
|
||||
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs-failed", null, null);
|
||||
|
||||
@@ -102,6 +102,21 @@ public class ForEachItemCaseTest {
|
||||
assertThat(triggered.get().getTaskRunList(), hasSize(1));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void forEachItemEmptyItems() throws TimeoutException, URISyntaxException, IOException {
|
||||
URI file = emptyItems();
|
||||
Map<String, Object> inputs = Map.of("file", file.toString());
|
||||
Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item", null,
|
||||
(flow, execution1) -> flowIO.typedInputs(flow, execution1, inputs),
|
||||
Duration.ofSeconds(30));
|
||||
|
||||
// assert on the main flow execution
|
||||
assertThat(execution.getTaskRunList(), hasSize(4));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
|
||||
assertThat(outputs, nullValue());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void forEachItemNoWait() throws TimeoutException, InterruptedException, URISyntaxException, IOException {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(26);
|
||||
@@ -261,6 +276,16 @@ public class ForEachItemCaseTest {
|
||||
);
|
||||
}
|
||||
|
||||
private URI emptyItems() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
return storageInterface.put(
|
||||
null,
|
||||
new URI("/file/storage/file.txt"),
|
||||
new FileInputStream(tempFile)
|
||||
);
|
||||
}
|
||||
|
||||
private List<String> content() {
|
||||
return IntStream
|
||||
.range(0, 102)
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@KestraTest
|
||||
@@ -109,4 +110,49 @@ class FlowTest {
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void success_withLabels() {
|
||||
var flow = io.kestra.core.models.flows.Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace("io.kestra.unittest")
|
||||
.revision(1)
|
||||
.labels(List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")
|
||||
))
|
||||
.tasks(Collections.singletonList(Return.builder()
|
||||
.id("test")
|
||||
.type(Return.class.getName())
|
||||
.format("test")
|
||||
.build()))
|
||||
.build();
|
||||
var execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("flow-with-flow-trigger")
|
||||
.flowRevision(1)
|
||||
.state(State.of(State.Type.RUNNING, Collections.emptyList()))
|
||||
.build();
|
||||
var flowTrigger = Flow.builder()
|
||||
.id("flow")
|
||||
.type(Flow.class.getName())
|
||||
.labels(List.of(
|
||||
new Label("trigger-label-1", "trigger-label-1"),
|
||||
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||
new Label("trigger-label-3", "{{ null }}"), // should return an empty string
|
||||
new Label("trigger-label-4", "{{ foobar }}") // should fail
|
||||
))
|
||||
.build();
|
||||
|
||||
Optional<Execution> evaluate = flowTrigger.evaluate(runContextFactory.of(), flow, execution);
|
||||
|
||||
assertThat(evaluate.isPresent(), is(true));
|
||||
assertThat(evaluate.get().getLabels(), hasSize(5));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +135,35 @@ class ScheduleTest {
|
||||
assertThat(inputs.get("input2"), is("default"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void success_withLabels() throws Exception {
|
||||
var scheduleTrigger = Schedule.builder()
|
||||
.id("schedule")
|
||||
.cron("0 0 1 * *")
|
||||
.labels(List.of(
|
||||
new Label("trigger-label-1", "trigger-label-1"),
|
||||
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||
new Label("trigger-label-3", "{{ null }}")
|
||||
))
|
||||
.build();
|
||||
var conditionContext = conditionContext(scheduleTrigger);
|
||||
var date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
var triggerContext = triggerContext(date, scheduleTrigger);
|
||||
|
||||
Optional<Execution> evaluate = scheduleTrigger.evaluate(conditionContext, triggerContext);
|
||||
|
||||
assertThat(evaluate.isPresent(), is(true));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void everyMinute() throws Exception {
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
id: flow-with-array-outputs
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.output.OutputValues
|
||||
values:
|
||||
one: "1rstValue"
|
||||
two: "2ndValue"
|
||||
|
||||
outputs:
|
||||
- id: myout
|
||||
type: ARRAY
|
||||
value:
|
||||
- "{{ outputs.return.values.one }}"
|
||||
- "{{ outputs.return.values.two }}"
|
||||
@@ -95,6 +95,11 @@ inputs:
|
||||
- name: array
|
||||
type: ARRAY
|
||||
itemType: INT
|
||||
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
|
||||
- name: empty
|
||||
type: STRING
|
||||
defaults: ''
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: string
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
version=0.18.0-SNAPSHOT
|
||||
version=0.18.22
|
||||
|
||||
org.gradle.parallel=true
|
||||
org.gradle.caching=true
|
||||
org.gradle.priority=low
|
||||
org.gradle.priority=low
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -2,9 +2,13 @@ package io.kestra.repository.mysql;
|
||||
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.DMLQuery;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Record;
|
||||
|
||||
@Singleton
|
||||
@MysqlRepositoryEnabled
|
||||
@@ -13,4 +17,13 @@ public class MysqlFlowTopologyRepository extends AbstractJdbcFlowTopologyReposit
|
||||
public MysqlFlowTopologyRepository(@Named("flowtopologies") MysqlRepository<FlowTopology> repository) {
|
||||
super(repository);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
|
||||
return context.insertInto(this.jdbcRepository.getTable())
|
||||
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
|
||||
.set(this.jdbcRepository.persistFields(flowTopology))
|
||||
.onDuplicateKeyUpdate()
|
||||
.set(this.jdbcRepository.persistFields(flowTopology));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,9 +7,17 @@ import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class MysqlQueue<T> extends JdbcQueue<T> {
|
||||
|
||||
// TODO - remove once 'queue' table is re-designed
|
||||
private static final MysqlQueueConsumers QUEUE_CONSUMERS = new MysqlQueueConsumers();
|
||||
|
||||
public MysqlQueue(Class<T> cls, ApplicationContext applicationContext) {
|
||||
super(cls, applicationContext);
|
||||
}
|
||||
@@ -59,7 +67,7 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
|
||||
.where(AbstractJdbcRepository.field("type").eq(this.cls.getName()))
|
||||
.and(DSL.or(List.of(
|
||||
AbstractJdbcRepository.field("consumers").isNull(),
|
||||
DSL.condition("NOT(FIND_IN_SET(?, consumers) > 0)", queueType)
|
||||
AbstractJdbcRepository.field("consumers").in(QUEUE_CONSUMERS.allForConsumerNotIn(queueType))
|
||||
)));
|
||||
|
||||
if (consumerGroup != null) {
|
||||
@@ -101,4 +109,38 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
|
||||
|
||||
update.execute();
|
||||
}
|
||||
|
||||
private static final class MysqlQueueConsumers {
|
||||
|
||||
private static final Set<String> CONSUMERS;
|
||||
|
||||
static {
|
||||
CONSUMERS = new HashSet<>();
|
||||
String[] elements = {"indexer", "executor", "worker", "scheduler"};
|
||||
List<String> results = new ArrayList<>();
|
||||
// Generate all combinations and their permutations
|
||||
generateCombinations(elements, new boolean[elements.length], new ArrayList<>(), results);
|
||||
CONSUMERS.addAll(results);
|
||||
}
|
||||
|
||||
public Set<String> allForConsumerNotIn(String consumer) {
|
||||
return CONSUMERS.stream().filter(s -> !s.contains(consumer)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private static void generateCombinations(String[] elements, boolean[] used, List<String> current, List<String> results) {
|
||||
if (!current.isEmpty()) {
|
||||
results.add(String.join(",", current));
|
||||
}
|
||||
|
||||
for (int i = 0; i < elements.length; i++) {
|
||||
if (!used[i]) {
|
||||
used[i] = true;
|
||||
current.add(elements[i]);
|
||||
generateCombinations(elements, used, current, results);
|
||||
current.removeLast();
|
||||
used[i] = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -2,9 +2,13 @@ package io.kestra.repository.postgres;
|
||||
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.DMLQuery;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Record;
|
||||
|
||||
@Singleton
|
||||
@PostgresRepositoryEnabled
|
||||
@@ -13,4 +17,14 @@ public class PostgresFlowTopologyRepository extends AbstractJdbcFlowTopologyRepo
|
||||
public PostgresFlowTopologyRepository(@Named("flowtopologies") PostgresRepository<FlowTopology> repository) {
|
||||
super(repository);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
|
||||
return context.insertInto(this.jdbcRepository.getTable())
|
||||
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
|
||||
.set(this.jdbcRepository.persistFields(flowTopology))
|
||||
.onConflict(AbstractJdbcRepository.field("key"))
|
||||
.doUpdate()
|
||||
.set(this.jdbcRepository.persistFields(flowTopology));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.jooq.impl.DSL;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@@ -27,10 +28,9 @@ public class PostgresLogRepository extends AbstractJdbcLogRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Condition minLevel(Level minLevel) {
|
||||
protected Condition levelsCondition(List<Level> levels) {
|
||||
return DSL.condition("level in (" +
|
||||
LogEntry
|
||||
.findLevelsByMin(minLevel)
|
||||
levels
|
||||
.stream()
|
||||
.map(s -> "'" + s + "'::log_level")
|
||||
.collect(Collectors.joining(", ")) +
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.jdbc.JdbcMapper;
|
||||
@@ -21,6 +22,7 @@ import org.jooq.Result;
|
||||
import org.jooq.SelectConditionStep;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -52,12 +54,10 @@ public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Map<Field<Object>, Object> persistFields(T entity) {
|
||||
Map<Field<Object>, Object> fields = super.persistFields(entity);
|
||||
|
||||
String json = JdbcMapper.of().writeValueAsString(entity);
|
||||
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));
|
||||
|
||||
return fields;
|
||||
return new HashMap<>(ImmutableMap
|
||||
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)))
|
||||
);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -59,6 +59,8 @@ import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
|
||||
private static final int FETCH_SIZE = 100;
|
||||
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
private final ApplicationContext applicationContext;
|
||||
@@ -108,10 +110,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("trigger_execution_id").eq(triggerExecutionId));
|
||||
|
||||
select.fetch()
|
||||
.map(this.jdbcRepository::map)
|
||||
.forEach(emitter::next);
|
||||
emitter.complete();
|
||||
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
@@ -213,11 +218,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
deleted
|
||||
);
|
||||
|
||||
select.fetch()
|
||||
.map(this.jdbcRepository::map)
|
||||
.forEach(emitter::next);
|
||||
|
||||
emitter.complete();
|
||||
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
|
||||
@@ -4,14 +4,13 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.jdbc.runner.JdbcIndexerInterface;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.jooq.*;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRepository implements FlowTopologyRepositoryInterface, JdbcIndexerInterface<FlowTopology> {
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<FlowTopology> jdbcRepository;
|
||||
@@ -105,10 +104,7 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
|
||||
context
|
||||
.batch(flowTopologies
|
||||
.stream()
|
||||
.map(flowTopology -> context.insertInto(this.jdbcRepository.getTable())
|
||||
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
|
||||
.set(this.jdbcRepository.persistFields(flowTopology))
|
||||
)
|
||||
.map(flowTopology -> buildMergeStatement(context, flowTopology))
|
||||
.toList()
|
||||
)
|
||||
.execute();
|
||||
@@ -116,6 +112,17 @@ public abstract class AbstractJdbcFlowTopologyRepository extends AbstractJdbcRep
|
||||
});
|
||||
}
|
||||
|
||||
protected DMLQuery<Record> buildMergeStatement(DSLContext context, FlowTopology flowTopology) {
|
||||
return context.mergeInto(this.jdbcRepository.getTable())
|
||||
.using(context.selectOne())
|
||||
.on(AbstractJdbcRepository.field("key").eq(this.jdbcRepository.key(flowTopology)))
|
||||
.whenMatchedThenUpdate()
|
||||
.set(this.jdbcRepository.persistFields(flowTopology))
|
||||
.whenNotMatchedThenInsert()
|
||||
.set(AbstractJdbcRepository.field("key"), this.jdbcRepository.key(flowTopology))
|
||||
.set(this.jdbcRepository.persistFields(flowTopology));
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowTopology save(FlowTopology flowTopology) {
|
||||
this.jdbcRepository.persist(flowTopology);
|
||||
|
||||
@@ -350,7 +350,10 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
return context.delete(this.jdbcRepository.getTable())
|
||||
.where(field("execution_id", String.class).eq(execution.getId()))
|
||||
// The deleted field is not used, so ti will always be false.
|
||||
// We add it here to be sure to use the correct index.
|
||||
.where(field("deleted", Boolean.class).eq(false))
|
||||
.and(field("execution_id", String.class).eq(execution.getId()))
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
@@ -441,7 +444,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
}
|
||||
|
||||
if (logLevels != null) {
|
||||
delete = delete.and(field("level").in(logLevels));
|
||||
delete = delete.and(levelsCondition(logLevels));
|
||||
}
|
||||
|
||||
return delete.execute();
|
||||
@@ -493,7 +496,11 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
});
|
||||
}
|
||||
|
||||
protected Condition minLevel(Level minLevel) {
|
||||
return field("level").in(LogEntry.findLevelsByMin(minLevel));
|
||||
private Condition minLevel(Level minLevel) {
|
||||
return levelsCondition(LogEntry.findLevelsByMin(minLevel));
|
||||
}
|
||||
|
||||
protected Condition levelsCondition(List<Level> levels) {
|
||||
return field("level").in(levels.stream().map(level -> level.name()).toList());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,7 +150,10 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
DSLContext context = DSL.using(configuration);
|
||||
|
||||
return context.delete(this.jdbcRepository.getTable())
|
||||
.where(field("execution_id", String.class).eq(execution.getId()))
|
||||
// The deleted field is not used, so ti will always be false.
|
||||
// We add it here to be sure to use the correct index.
|
||||
.where(field("deleted", Boolean.class).eq(false))
|
||||
.and(field("execution_id", String.class).eq(execution.getId()))
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
@@ -168,8 +171,7 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.selectDistinct(field(field))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
@@ -185,8 +187,7 @@ public abstract class AbstractJdbcMetricRepository extends AbstractJdbcRepositor
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
DSLContext context = DSL.using(configuration);
|
||||
SelectConditionStep<Record1<Object>> select = DSL
|
||||
.using(configuration)
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
|
||||
@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
Trigger current = optionalTrigger.get();
|
||||
current = current.toBuilder()
|
||||
.executionId(trigger.getExecutionId())
|
||||
.executionCurrentState(trigger.getExecutionCurrentState())
|
||||
.updatedDate(trigger.getUpdatedDate())
|
||||
.build();
|
||||
this.save(context, current);
|
||||
|
||||
@@ -31,10 +31,10 @@ import java.util.function.BiConsumer;
|
||||
public class JdbcScheduler extends AbstractScheduler {
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final TriggerRepositoryInterface triggerRepository;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final JooqDSLContextWrapper dslContextWrapper;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -48,6 +48,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
|
||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||
conditionService = applicationContext.getBean(ConditionService.class);
|
||||
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||
@@ -75,14 +76,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
|
||||
});
|
||||
} else {
|
||||
// update execution state on each state change so the scheduler knows the execution is running
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
|
||||
.ifPresent(trigger -> {
|
||||
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -105,7 +98,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
|
||||
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
|
||||
|
||||
schedulerContext.startTransaction(scheduleContextInterface -> {
|
||||
schedulerContext.doInTransaction(scheduleContextInterface -> {
|
||||
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
|
||||
|
||||
consumer.accept(triggers, scheduleContextInterface);
|
||||
|
||||
@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
|
||||
this.dslContextWrapper = dslContextWrapper;
|
||||
}
|
||||
|
||||
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
@Override
|
||||
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
this.dslContextWrapper.transaction(configuration -> {
|
||||
this.context = DSL.using(configuration);
|
||||
|
||||
consumer.accept(this);
|
||||
|
||||
this.commit();
|
||||
this.context.commit();
|
||||
});
|
||||
}
|
||||
|
||||
public void commit() {
|
||||
this.context.commit();
|
||||
}
|
||||
}
|
||||
@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger create(Trigger trigger, String headerContent) {
|
||||
return this.triggerRepository.create(trigger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
|
||||
this.triggerRepository.save(trigger, scheduleContextInterface);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger create(Trigger trigger) {
|
||||
|
||||
@@ -84,7 +96,4 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock(Trigger trigger) {}
|
||||
}
|
||||
|
||||
@@ -276,6 +276,11 @@ public abstract class JdbcRunnerTest {
|
||||
forEachItemCaseTest.forEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
void forEachItemEmptyItems() throws URISyntaxException, IOException, TimeoutException {
|
||||
forEachItemCaseTest.forEachItemEmptyItems();
|
||||
}
|
||||
|
||||
@Test
|
||||
void forEachItemNoWait() throws URISyntaxException, IOException, InterruptedException, TimeoutException {
|
||||
forEachItemCaseTest.forEachItemNoWait();
|
||||
|
||||
@@ -119,6 +119,7 @@ dependencies {
|
||||
api "org.junit-pioneer:junit-pioneer:2.2.0"
|
||||
api 'org.hamcrest:hamcrest:2.2'
|
||||
api 'org.hamcrest:hamcrest-library:2.2'
|
||||
api 'org.assertj:assertj-core:3.27.3'
|
||||
api group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
|
||||
api 'com.github.tomakehurst:wiremock-jre8:3.0.1'
|
||||
api "org.apache.kafka:kafka-streams-test-utils:$kafkaVersion"
|
||||
|
||||
@@ -132,7 +132,7 @@ public class CommandsWrapper implements TaskCommands {
|
||||
}
|
||||
|
||||
public ScriptOutput run() throws Exception {
|
||||
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||
if (this.namespaceFiles != null && !Boolean.FALSE.equals(this.namespaceFiles.getEnabled())) {
|
||||
|
||||
List<NamespaceFile> matchedNamespaceFiles = runContext.storage()
|
||||
.namespace()
|
||||
|
||||
@@ -342,7 +342,7 @@ public class Docker extends TaskRunner {
|
||||
CreateContainerResponse exec = container.exec();
|
||||
logger.debug("Container created: {}", exec.getId());
|
||||
|
||||
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths();
|
||||
List<Path> relativeWorkingDirectoryFilesPaths = taskCommands.relativeWorkingDirectoryFilesPaths(true);
|
||||
boolean hasFilesToUpload = !ListUtils.isEmpty(relativeWorkingDirectoryFilesPaths);
|
||||
boolean hasFilesToDownload = !ListUtils.isEmpty(filesToDownload);
|
||||
boolean outputDirectoryEnabled = taskCommands.outputDirectoryEnabled();
|
||||
|
||||
@@ -7,6 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
class DockerTest extends AbstractTaskRunnerTest {
|
||||
@Override
|
||||
protected TaskRunner taskRunner() {
|
||||
return Docker.builder().image("centos").build();
|
||||
return Docker.builder().image("rockylinux:9.3-minimal").build();
|
||||
}
|
||||
}
|
||||
@@ -201,7 +201,7 @@ public abstract class AbstractTaskRunnerTest {
|
||||
Mockito.when(commands.getEnableOutputDirectory()).thenReturn(true);
|
||||
Mockito.when(commands.outputDirectoryEnabled()).thenReturn(true);
|
||||
Mockito.when(commands.getTimeout()).thenReturn(null);
|
||||
Mockito.when(commands.relativeWorkingDirectoryFilesPaths()).thenCallRealMethod();
|
||||
Mockito.when(commands.relativeWorkingDirectoryFilesPaths(true)).thenCallRealMethod();
|
||||
|
||||
return commands;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,2 @@
|
||||
public/vscode/
|
||||
public/vscode-web/
|
||||
|
||||
node/
|
||||
node_modules/
|
||||
@@ -30,8 +30,8 @@
|
||||
<strong>We're sorry but Kestra doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
|
||||
</noscript>
|
||||
|
||||
<div id="loader-wrapper">
|
||||
<div id="loader"></div>
|
||||
<div id="loader-wrapper" data-test-id="loader-wrapper">
|
||||
<div id="loader" data-test-id="loader"></div>
|
||||
</div>
|
||||
|
||||
<div id="app-container">
|
||||
|
||||
2443
ui/package-lock.json
generated
2443
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,9 @@
|
||||
{
|
||||
"name": "kestra",
|
||||
"version": "0.1.0",
|
||||
"version": "0.18.6",
|
||||
"private": true,
|
||||
"packageManager": "npm@9.8.1",
|
||||
"type": "module",
|
||||
"packageManager": "npm@9.9.3",
|
||||
"scripts": {
|
||||
"dev": "vite --host",
|
||||
"build": "vite build --emptyOutDir",
|
||||
@@ -12,25 +13,25 @@
|
||||
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
|
||||
},
|
||||
"dependencies": {
|
||||
"@kestra-io/ui-libs": "^0.0.53",
|
||||
"@kestra-io/ui-libs": "^0.0.59",
|
||||
"@vue-flow/background": "^1.3.0",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.39.1",
|
||||
"@vue-flow/core": "^1.41.1",
|
||||
"ansi-to-html": "^0.7.2",
|
||||
"axios": "^1.7.2",
|
||||
"axios": "^1.7.7",
|
||||
"bootstrap": "^5.3.3",
|
||||
"buffer": "^6.0.3",
|
||||
"chart.js": "^4.4.3",
|
||||
"chart.js": "^4.4.4",
|
||||
"chartjs-chart-treemap": "^2.3.1",
|
||||
"core-js": "^3.37.1",
|
||||
"core-js": "^3.38.1",
|
||||
"cronstrue": "^2.50.0",
|
||||
"dagre": "^0.8.5",
|
||||
"element-plus": "^2.7.8",
|
||||
"element-plus": "^2.8.2",
|
||||
"humanize-duration": "^3.32.1",
|
||||
"js-yaml": "^4.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"markdown-it": "^14.1.0",
|
||||
"markdown-it-anchor": "^9.0.1",
|
||||
"markdown-it-anchor": "^9.2.0",
|
||||
"markdown-it-container": "^4.0.0",
|
||||
"markdown-it-mark": "^4.0.0",
|
||||
"markdown-it-meta": "0.0.1",
|
||||
@@ -39,51 +40,51 @@
|
||||
"moment": "^2.30.1",
|
||||
"moment-range": "4.0.2",
|
||||
"moment-timezone": "^0.5.45",
|
||||
"node-modules-polyfill": "^0.1.4",
|
||||
"nprogress": "^0.2.0",
|
||||
"pdfjs-dist": "^4.5.136",
|
||||
"posthog-js": "^1.150.1",
|
||||
"pdfjs-dist": "^4.6.82",
|
||||
"posthog-js": "^1.160.3",
|
||||
"throttle-debounce": "^5.0.2",
|
||||
"vite-plugin-eslint": "^1.8.1",
|
||||
"vue": "^3.4.34",
|
||||
"vue": "^3.5.3",
|
||||
"vue-axios": "3.5.2",
|
||||
"vue-chartjs": "^5.3.1",
|
||||
"vue-gtag": "^2.0.1",
|
||||
"vue-i18n": "^9.13.1",
|
||||
"vue-i18n": "^9.14.0",
|
||||
"vue-material-design-icons": "^5.3.0",
|
||||
"vue-router": "^4.4.0",
|
||||
"vue-sidebar-menu": "^5.4.0",
|
||||
"vue-router": "^4.4.3",
|
||||
"vue-sidebar-menu": "^5.4.1",
|
||||
"vue-virtual-scroller": "^2.0.0-beta.8",
|
||||
"vue3-popper": "^1.5.0",
|
||||
"vue3-runtime-template": "^1.0.2",
|
||||
"vue3-tour": "github:kestra-io/vue3-tour",
|
||||
"vuex": "^4.1.0",
|
||||
"xss": "^1.0.15",
|
||||
"yaml": "^2.5.0"
|
||||
"yaml": "^2.5.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
|
||||
"@rushstack/eslint-patch": "^1.10.4",
|
||||
"@shikijs/markdown-it": "^1.6.3",
|
||||
"@typescript-eslint/parser": "^7.17.0",
|
||||
"@vitejs/plugin-vue": "^5.1.1",
|
||||
"@shikijs/markdown-it": "^1.16.2",
|
||||
"@typescript-eslint/parser": "^8.4.0",
|
||||
"@vitejs/plugin-vue": "^5.1.3",
|
||||
"@vue/eslint-config-prettier": "^9.0.0",
|
||||
"@vue/test-utils": "^2.4.6",
|
||||
"decompress": "^4.2.1",
|
||||
"eslint": "^8.57.0",
|
||||
"eslint-plugin-vue": "^9.27.0",
|
||||
"jsdom": "^24.1.1",
|
||||
"monaco-editor": "^0.50.0",
|
||||
"eslint-plugin-vue": "^9.28.0",
|
||||
"jsdom": "^25.0.0",
|
||||
"monaco-editor": "^0.51.0",
|
||||
"monaco-yaml": "^5.2.2",
|
||||
"prettier": "^3.3.3",
|
||||
"rollup-plugin-copy": "^3.5.0",
|
||||
"rollup-plugin-visualizer": "^5.12.0",
|
||||
"sass": "^1.77.4",
|
||||
"sass": "^1.78.0",
|
||||
"typescript": "^5.5.4",
|
||||
"vite": "^5.3.5",
|
||||
"vitest": "^2.0.4"
|
||||
"vite": "^5.4.6",
|
||||
"vite-plugin-eslint": "^1.8.1",
|
||||
"vitest": "^2.0.5"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.19.0"
|
||||
"@rollup/rollup-linux-x64-gnu": "^4.21.2"
|
||||
},
|
||||
"overrides": {
|
||||
"bootstrap": {
|
||||
|
||||
30
ui/plugins/commit.ts
Normal file
30
ui/plugins/commit.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
import type {Plugin} from "vite";
|
||||
import {execSync} from "child_process";
|
||||
|
||||
const getInfo = (formats: string[]): string[] => formats.map(format => execSync(`git log -1 --format=${format}`).toString().trim());
|
||||
|
||||
const comment = (message: string, author: string, date: string): string => `
|
||||
<!--
|
||||
|
||||
Last Commit:
|
||||
|
||||
${message}
|
||||
----------
|
||||
Author: ${author}
|
||||
Date: ${date}
|
||||
|
||||
-->`;
|
||||
|
||||
export const commit = (): Plugin => {
|
||||
const [message, author, date] = getInfo(["%s", "%an", "%cd"]);
|
||||
|
||||
return {
|
||||
name: "commit",
|
||||
transformIndexHtml: {
|
||||
order: "pre",
|
||||
handler(html: string): string {
|
||||
return comment(message, author, date) + html;
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
@@ -1,29 +0,0 @@
|
||||
import type {Plugin} from "vite";
|
||||
import {execSync} from "child_process";
|
||||
|
||||
const comment = (hash: string, date: string): string => {
|
||||
return `
|
||||
<!--
|
||||
|
||||
Last Commit:
|
||||
|
||||
URL: https://github.com/kestra-io/kestra/commit/${hash}
|
||||
Date: ${date}
|
||||
|
||||
-->`;
|
||||
};
|
||||
|
||||
export const details = (): Plugin => {
|
||||
const hash: string = execSync("git rev-parse --short HEAD").toString().trim();
|
||||
const date: string = execSync("git log -1 --format=%cd").toString().trim();
|
||||
|
||||
return {
|
||||
name: "details",
|
||||
transformIndexHtml: {
|
||||
order: "pre",
|
||||
handler(html: string): string {
|
||||
return comment(hash, date) + html;
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
@@ -68,6 +68,7 @@
|
||||
},
|
||||
methods: {
|
||||
changeVisibility(visible = true) {
|
||||
if (visible) document.querySelector(".ee-tooltip")?.remove();
|
||||
this.visible = visible
|
||||
}
|
||||
},
|
||||
|
||||
@@ -37,7 +37,7 @@
|
||||
v-on="activeTab['v-on'] ?? {}"
|
||||
ref="tabContent"
|
||||
:is="activeTab.component"
|
||||
embed
|
||||
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
|
||||
/>
|
||||
</section>
|
||||
</template>
|
||||
|
||||
@@ -86,6 +86,7 @@
|
||||
:label="$t('id')"
|
||||
/>
|
||||
<el-table-column
|
||||
prop="flowId"
|
||||
sortable="custom"
|
||||
:sort-orders="['ascending', 'descending']"
|
||||
:label="$t('flow')"
|
||||
@@ -124,16 +125,6 @@
|
||||
</router-link>
|
||||
</template>
|
||||
</el-table-column>
|
||||
|
||||
<el-table-column :label="$t('state')">
|
||||
<template #default="scope">
|
||||
<status
|
||||
v-if="scope.row.executionCurrentState"
|
||||
:status="scope.row.executionCurrentState"
|
||||
size="small"
|
||||
/>
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column prop="workerId" :label="$t('workerId')">
|
||||
<template #default="scope">
|
||||
<id
|
||||
@@ -269,7 +260,6 @@
|
||||
import RefreshButton from "../layout/RefreshButton.vue";
|
||||
import DateAgo from "../layout/DateAgo.vue";
|
||||
import Id from "../Id.vue";
|
||||
import Status from "../Status.vue";
|
||||
import {mapState} from "vuex";
|
||||
import SelectTableActions from "../../mixins/selectTableActions";
|
||||
import _merge from "lodash/merge";
|
||||
@@ -284,7 +274,6 @@
|
||||
SearchField,
|
||||
NamespaceSelect,
|
||||
DateAgo,
|
||||
Status,
|
||||
Id,
|
||||
LogsWrapper
|
||||
},
|
||||
|
||||
@@ -61,6 +61,14 @@
|
||||
@update:filter-value="onDataTableValue"
|
||||
/>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<scope-filter-buttons
|
||||
:label="$t('executions')"
|
||||
:value="$route.query.scope"
|
||||
:system="namespace === 'system'"
|
||||
@update:model-value="onDataTableValue('scope', $event)"
|
||||
/>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<label-filter
|
||||
:model-value="$route.query.labels"
|
||||
@@ -106,6 +114,13 @@
|
||||
/>
|
||||
</el-select>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-switch
|
||||
:model-value="showChart"
|
||||
@update:model-value="onShowChartChange"
|
||||
:active-text="$t('show chart')"
|
||||
/>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<filters :storage-key="filterStorageKey" />
|
||||
</el-form-item>
|
||||
@@ -118,7 +133,7 @@
|
||||
</el-form-item>
|
||||
</template>
|
||||
|
||||
<template #top v-if="isDisplayedTop">
|
||||
<template #top v-if="showStatChart()">
|
||||
<state-global-chart
|
||||
v-if="daily"
|
||||
class="mb-4"
|
||||
@@ -410,7 +425,6 @@
|
||||
import StatusFilterButtons from "../layout/StatusFilterButtons.vue"
|
||||
import StateGlobalChart from "../../components/stats/StateGlobalChart.vue";
|
||||
import TriggerAvatar from "../../components/flows/TriggerAvatar.vue";
|
||||
import DateAgo from "../layout/DateAgo.vue";
|
||||
import Kicon from "../Kicon.vue"
|
||||
import Labels from "../layout/Labels.vue"
|
||||
import RestoreUrl from "../../mixins/restoreUrl";
|
||||
@@ -423,6 +437,7 @@
|
||||
import {storageKeys} from "../../utils/constants";
|
||||
import LabelInput from "../../components/labels/LabelInput.vue";
|
||||
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert, ElCheckbox} from "element-plus";
|
||||
import DateAgo from "../layout/DateAgo.vue";
|
||||
import {h, ref} from "vue";
|
||||
|
||||
import {filterLabels} from "./utils"
|
||||
@@ -493,6 +508,7 @@
|
||||
dblClickRouteName: "executions/update",
|
||||
flowTriggerDetails: undefined,
|
||||
recomputeInterval: false,
|
||||
showChart: ["true", null].includes(localStorage.getItem(storageKeys.SHOW_CHART)),
|
||||
optionalColumns: [
|
||||
{
|
||||
label: "start date",
|
||||
@@ -568,7 +584,6 @@
|
||||
}
|
||||
this.displayColumns = localStorage.getItem(this.storageKey)?.split(",")
|
||||
|| this.optionalColumns.filter(col => col.default).map(col => col.prop);
|
||||
|
||||
},
|
||||
computed: {
|
||||
...mapState("execution", ["executions", "total"]),
|
||||
@@ -641,6 +656,17 @@
|
||||
displayColumn(column) {
|
||||
return this.hidden ? !this.hidden.includes(column) : this.displayColumns.includes(column);
|
||||
},
|
||||
onShowChartChange(value) {
|
||||
this.showChart = value;
|
||||
localStorage.setItem(storageKeys.SHOW_CHART, value);
|
||||
|
||||
if (this.showChart) {
|
||||
this.loadStats();
|
||||
}
|
||||
},
|
||||
showStatChart() {
|
||||
return this.isDisplayedTop && this.showChart;
|
||||
},
|
||||
refresh() {
|
||||
this.recomputeInterval = !this.recomputeInterval;
|
||||
this.load();
|
||||
@@ -676,19 +702,22 @@
|
||||
|
||||
return _merge(base, queryFilter)
|
||||
},
|
||||
loadStats() {
|
||||
this.dailyReady = false;
|
||||
|
||||
this.$store
|
||||
.dispatch("stat/daily", this.loadQuery({
|
||||
startDate: this.$moment(this.startDate).toISOString(true),
|
||||
endDate: this.$moment(this.endDate).toISOString(true)
|
||||
}, true))
|
||||
.then(() => {
|
||||
this.dailyReady = true;
|
||||
});
|
||||
},
|
||||
loadData(callback) {
|
||||
this.refreshDates = !this.refreshDates;
|
||||
if (this.isDisplayedTop) {
|
||||
this.dailyReady = false;
|
||||
|
||||
this.$store
|
||||
.dispatch("stat/daily", this.loadQuery({
|
||||
startDate: this.$moment(this.startDate).toISOString(true),
|
||||
endDate: this.$moment(this.endDate).toISOString(true)
|
||||
}, true))
|
||||
.then(() => {
|
||||
this.dailyReady = true;
|
||||
});
|
||||
if (this.showStatChart()) {
|
||||
this.loadStats();
|
||||
}
|
||||
|
||||
this.$store.dispatch("execution/findExecutions", this.loadQuery({
|
||||
@@ -765,7 +794,7 @@
|
||||
},
|
||||
deleteExecutions() {
|
||||
const includeNonTerminated = ref(false);
|
||||
|
||||
|
||||
const deleteLogs = ref(true);
|
||||
const deleteMetrics = ref(true);
|
||||
const deleteStorage = ref(true);
|
||||
@@ -784,7 +813,7 @@
|
||||
"onUpdate:modelValue": (val) => {
|
||||
includeNonTerminated.value = val
|
||||
},
|
||||
}),
|
||||
}),
|
||||
]),
|
||||
h(ElAlert, {
|
||||
title: this.$t("execution-warn-deleting-still-running"),
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-button-group>
|
||||
<restart :execution="execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
|
||||
<el-button @click="downloadContent()">
|
||||
<kicon :tooltip="$t('download logs')">
|
||||
<download />
|
||||
@@ -79,6 +80,7 @@
|
||||
import State from "../../utils/state";
|
||||
import Utils from "../../utils/utils";
|
||||
import LogLine from "../logs/LogLine.vue";
|
||||
import Restart from "./Restart.vue";
|
||||
|
||||
export default {
|
||||
components: {
|
||||
@@ -88,7 +90,8 @@
|
||||
Kicon,
|
||||
Download,
|
||||
Magnify,
|
||||
Collapse
|
||||
Collapse,
|
||||
Restart
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
|
||||
@@ -10,6 +10,13 @@
|
||||
</el-button>
|
||||
</el-button-group>
|
||||
|
||||
<el-button-group v-else-if="isURI(value)">
|
||||
<a class="el-button el-button--small el-button--primary" :href="value" target="_blank">
|
||||
<OpenInNew />
|
||||
{{ $t('open') }}
|
||||
</a>
|
||||
</el-button-group>
|
||||
|
||||
<span v-else-if="value === null">
|
||||
<em>null</em>
|
||||
</span>
|
||||
@@ -20,6 +27,7 @@
|
||||
|
||||
<script setup>
|
||||
import Download from "vue-material-design-icons/Download.vue";
|
||||
import OpenInNew from "vue-material-design-icons/OpenInNew.vue";
|
||||
import FilePreview from "./FilePreview.vue";
|
||||
</script>
|
||||
|
||||
@@ -37,6 +45,14 @@
|
||||
isFile(value) {
|
||||
return typeof(value) === "string" && value.startsWith("kestra:///")
|
||||
},
|
||||
isURI(value) {
|
||||
try {
|
||||
new URL(value);
|
||||
return true;
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
},
|
||||
itemUrl(value) {
|
||||
return `${apiUrl(this.$store)}/executions/${this.execution.id}/file?path=${value}`;
|
||||
},
|
||||
|
||||
@@ -17,12 +17,12 @@
|
||||
@expand-change="() => scrollRight()"
|
||||
>
|
||||
<template #default="{data}">
|
||||
<div v-if="data.heading" class="pe-none d-flex fs-5">
|
||||
<div v-if="data.heading" @click="expandedValue = data.path" class="pe-none d-flex fs-5">
|
||||
<component :is="data.component" class="me-2" />
|
||||
<span>{{ data.label }}</span>
|
||||
</div>
|
||||
|
||||
<div v-else class="w-100 d-flex justify-content-between">
|
||||
<div v-else @click="expandedValue = data.path" class="w-100 d-flex justify-content-between">
|
||||
<div class="pe-5 d-flex task">
|
||||
<TaskIcon v-if="data.icon" :icons="allIcons" :cls="icons[data.taskId]" only-icon />
|
||||
<span :class="{'ms-3': data.icon}">{{ data.label }}</span>
|
||||
@@ -65,6 +65,7 @@
|
||||
:input="true"
|
||||
:navbar="false"
|
||||
:model-value="computedDebugValue"
|
||||
@confirm="onDebugExpression($event)"
|
||||
class="w-100"
|
||||
/>
|
||||
|
||||
@@ -91,9 +92,13 @@
|
||||
</el-collapse-item>
|
||||
</el-collapse>
|
||||
|
||||
<el-alert v-if="debugError" type="error" :closable="false">
|
||||
<el-alert v-if="debugError" type="error" :closable="false" class="overflow-auto">
|
||||
<p><strong>{{ debugError }}</strong></p>
|
||||
<pre class="mb-0">{{ debugStackTrace }}</pre>
|
||||
<div class="my-2">
|
||||
<CopyToClipboard :text="debugError" label="Copy Error" class="d-inline-block me-2" />
|
||||
<CopyToClipboard :text="debugStackTrace" label="Copy Stack Trace" class="d-inline-block" />
|
||||
</div>
|
||||
<pre class="mb-0" style="overflow: scroll;">{{ debugStackTrace }}</pre>
|
||||
</el-alert>
|
||||
|
||||
<VarValue :value="selectedValue" :execution="execution" />
|
||||
@@ -115,15 +120,25 @@
|
||||
|
||||
import {apiUrl} from "override/utils/route";
|
||||
|
||||
import CopyToClipboard from "../../layout/CopyToClipboard.vue"
|
||||
|
||||
import Editor from "../../inputs/Editor.vue";
|
||||
const debugEditor = ref(null);
|
||||
const debugExpression = ref("");
|
||||
const computedDebugValue = computed(() => `{{ outputs${selectedTask()?.taskId ? `.${selectedTask().taskId}` : ""} }}`);
|
||||
const computedDebugValue = computed(() => {
|
||||
const task = selectedTask()?.taskId;
|
||||
if(!task) return "";
|
||||
|
||||
const path = expandedValue.value;
|
||||
if(!path) return `{{ outputs.${task} }}`
|
||||
|
||||
return `{{ outputs.${path} }}`
|
||||
});
|
||||
const debugError = ref("");
|
||||
const debugStackTrace = ref("");
|
||||
const isJSON = ref(false);
|
||||
const selectedTask = () => {
|
||||
const filter = selected.value.length ? selected.value[0] : (cascader.value as any).menuList?.[0]?.panel?.expandingNode?.label;
|
||||
const filter = selected.value?.length ? selected.value[0] : (cascader.value as any).menuList?.[0]?.panel?.expandingNode?.label;
|
||||
const taskRunList = [...execution.value.taskRunList];
|
||||
return taskRunList.find(e => e.taskId === filter);
|
||||
};
|
||||
@@ -162,22 +177,41 @@
|
||||
|
||||
const execution = computed(() => store.state.execution.execution);
|
||||
|
||||
const processedValue = (data): { label: string, regular: boolean; } => {
|
||||
function isValidURL(url) {
|
||||
try {
|
||||
new URL(url);
|
||||
return true;
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const processedValue = (data) => {
|
||||
const regular = false;
|
||||
|
||||
if (!data.value && !data.children?.length) return {label: data.value, regular};
|
||||
else if (data?.children?.length) {
|
||||
if (!data.value && !data.children?.length) {
|
||||
return {label: data.value, regular};
|
||||
} else if (data?.children?.length) {
|
||||
const message = (length) => ({label: `${length} items`, regular});
|
||||
const length = data.children.length;
|
||||
|
||||
return data.children[0].isFirstPass ? message(length - 1) : message(length);
|
||||
}
|
||||
return data.value.toString().startsWith("kestra:///") ? {label: "Internal link", regular} : {label: trim(data.value), regular: true};
|
||||
|
||||
// Check if the value is a valid URL and not an internal "kestra:///" link
|
||||
if (isValidURL(data.value)) {
|
||||
return data.value.startsWith("kestra:///")
|
||||
? {label: "Internal link", regular}
|
||||
: {label: "External link", regular};
|
||||
}
|
||||
|
||||
return {label: trim(data.value), regular: true};
|
||||
};
|
||||
|
||||
const expandedValue = ref([])
|
||||
const selected = ref([]);
|
||||
const selectedValue = computed(() => {
|
||||
if (selected.value.length) return selected.value[selected.value.length - 1];
|
||||
if (selected.value?.length) return selected.value[selected.value.length - 1];
|
||||
return undefined;
|
||||
});
|
||||
const selectedNode = () => {
|
||||
@@ -190,21 +224,34 @@
|
||||
return {label, value};
|
||||
};
|
||||
|
||||
const transform = (o, isFirstPass = true) => {
|
||||
const transform = (o, isFirstPass, path = "") => {
|
||||
const result = Object.keys(o).map(key => {
|
||||
const value = o[key];
|
||||
const isObject = typeof value === "object" && value !== null;
|
||||
|
||||
const currentPath = `${path}["${key}"]`;
|
||||
|
||||
// If the value is an array with exactly one element, use that element as the value
|
||||
if (Array.isArray(value) && value.length === 1) {
|
||||
return {label: key, value: value[0], children: []};
|
||||
return {label: key, value: value[0], children: [], path: currentPath};
|
||||
}
|
||||
|
||||
return {label: key, value: isObject && !Array.isArray(value) ? null : value, children: isObject ? transform(value, false) : []};
|
||||
return {
|
||||
label: key,
|
||||
value: isObject && !Array.isArray(value) ? key : value,
|
||||
children: isObject ? transform(value, false, currentPath) : [],
|
||||
path: currentPath
|
||||
};
|
||||
});
|
||||
|
||||
if (isFirstPass) {
|
||||
const OUTPUTS = {label: t("outputs"), heading: true, component: shallowRef(TextBoxSearchOutline), isFirstPass: true};
|
||||
const OUTPUTS = {
|
||||
label: t("outputs"),
|
||||
heading: true,
|
||||
component: shallowRef(TextBoxSearchOutline),
|
||||
isFirstPass: true,
|
||||
path: path
|
||||
};
|
||||
result.unshift(OUTPUTS);
|
||||
}
|
||||
|
||||
@@ -212,7 +259,7 @@
|
||||
};
|
||||
const outputs = computed(() => {
|
||||
const tasks = store.state.execution.execution.taskRunList.map((task) => {
|
||||
return {label: task.taskId, value: task.taskId, ...task, icon: true, children: task?.outputs ? transform(task.outputs) : []};
|
||||
return {label: task.taskId, value: task.taskId, ...task, icon: true, children: task?.outputs ? transform(task.outputs, true, task.taskId) : []};
|
||||
});
|
||||
|
||||
const HEADING = {label: t("tasks"), heading: true, component: shallowRef(TimelineTextOutline)};
|
||||
|
||||
@@ -36,9 +36,23 @@
|
||||
default: true
|
||||
}
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
exampleFileName: "kestra.json"
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
curlCommand() {
|
||||
return this.generateCurlCommand();
|
||||
const mainCommand = this.generateCurlCommand();
|
||||
|
||||
if (this.flow.inputs && this.flow.inputs.find((input) => input.type === "FILE")) {
|
||||
return `${this.toShell(this.generatePrefix())} && \\\n${this.toShell(mainCommand)}`;
|
||||
} else {
|
||||
return `${this.toShell(mainCommand)}`;
|
||||
}
|
||||
},
|
||||
exampleFileInputUrl() {
|
||||
return `https://huggingface.co/datasets/kestra/datasets/resolve/main/json/${this.exampleFileName}`;
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
@@ -55,16 +69,25 @@
|
||||
|
||||
switch (input.type) {
|
||||
case "FILE": {
|
||||
const fileInput = this.inputs[input.id];
|
||||
if (fileInput) {
|
||||
inputValue = fileInput.name;
|
||||
}
|
||||
inputValue = this.exampleFileName;
|
||||
break;
|
||||
}
|
||||
case "SECRET": {
|
||||
inputValue = this.inputs[input.id] ? "******" : undefined;
|
||||
break;
|
||||
}
|
||||
case "DURATION": {
|
||||
inputValue = this.$moment.duration(this.$moment(this.inputs[input.id]).format("hh:mm:ss")).toJSON();
|
||||
break;
|
||||
}
|
||||
case "DATE": {
|
||||
inputValue = this.$moment(this.inputs[input.id]).format("YYYY-MM-DD");
|
||||
break;
|
||||
}
|
||||
case "TIME": {
|
||||
inputValue = this.$moment(this.inputs[input.id]).format("hh:mm:ss");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
inputValue = this.inputs[input.id];
|
||||
}
|
||||
@@ -115,8 +138,22 @@
|
||||
|
||||
command.push(`'${this.generateUrl()}'`);
|
||||
|
||||
return command
|
||||
},
|
||||
generatePrefix() {
|
||||
return ["curl", "-O", `'${this.exampleFileInputUrl}'`];
|
||||
},
|
||||
toShell(command) {
|
||||
return command.join(" ");
|
||||
}
|
||||
}
|
||||
}
|
||||
</script>
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
/* Allow line-wraps */
|
||||
code {
|
||||
display: block;
|
||||
white-space: pre-wrap;
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -59,7 +59,7 @@
|
||||
async setupFlow() {
|
||||
if (this.$route.query.copy && this.flow){
|
||||
this.source = this.flow.source;
|
||||
} else if (this.$route.query.blueprintId) {
|
||||
} else if (this.$route.query.blueprintId && this.$route.query.blueprintSource) {
|
||||
this.source = await this.queryBlueprint(this.$route.query.blueprintId)
|
||||
} else {
|
||||
this.source = `id: myflow
|
||||
@@ -86,7 +86,7 @@ tasks:
|
||||
};
|
||||
},
|
||||
blueprintUri() {
|
||||
return `${apiUrl(this.$store)}/blueprints/community`
|
||||
return `${apiUrl(this.$store)}/blueprints/${this.$route.query.blueprintSource}`
|
||||
},
|
||||
flowParsed() {
|
||||
return YamlUtils.parse(this.source);
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
>
|
||||
<el-table-column type="expand">
|
||||
<template #default="props">
|
||||
<LogsWrapper class="m-3" :filters="props.row" :charts="false" embed />
|
||||
<LogsWrapper class="m-3" :filters="props.row" purge-filters :charts="false" embed />
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column prop="id" :label="$t('id')">
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
:icon="ContentSave"
|
||||
@click="saveTask"
|
||||
v-if="canSave && !readOnly"
|
||||
:disabled="errors"
|
||||
:disabled="errors && !!errors.length"
|
||||
type="primary"
|
||||
>
|
||||
{{ $t("save task") }}
|
||||
@@ -160,8 +160,7 @@
|
||||
if (this.task) {
|
||||
this.taskYaml = YamlUtils.stringify(this.task);
|
||||
if (this.task.type) {
|
||||
this.$store
|
||||
.dispatch("plugin/load", {cls: this.task.type})
|
||||
this.$store.dispatch("plugin/load", {cls: this.task.type})
|
||||
}
|
||||
} else {
|
||||
this.taskYaml = "";
|
||||
@@ -173,8 +172,7 @@
|
||||
handler() {
|
||||
const task = YamlUtils.parse(this.taskYaml);
|
||||
if (task?.type && task.type !== this.type) {
|
||||
this.$store
|
||||
.dispatch("plugin/load", {cls: task.type})
|
||||
this.$store.dispatch("plugin/load", {cls: task.type})
|
||||
this.type = task.type
|
||||
}
|
||||
},
|
||||
|
||||
@@ -109,7 +109,7 @@
|
||||
type: this.selectedTaskType
|
||||
};
|
||||
|
||||
if (this.section !== SECTIONS.TRIGGERS) {
|
||||
if (this.section !== SECTIONS.TRIGGERS && this.section !== SECTIONS.TASK_RUNNERS) {
|
||||
value["id"] = this.taskObject && this.taskObject.id ? this.taskObject.id : "";
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
:expanded-subflows="expandedSubflows"
|
||||
view-type="topology"
|
||||
@expand-subflow="onExpandSubflow($event)"
|
||||
@on-edit="(event) => emit('on-edit', event, true)"
|
||||
/>
|
||||
<el-alert v-else type="warning" :closable="false">
|
||||
{{ $t("unable to generate graph") }}
|
||||
@@ -27,7 +28,7 @@
|
||||
LowCodeEditor,
|
||||
},
|
||||
emits: [
|
||||
"expand-subflow"
|
||||
"expand-subflow", "on-edit"
|
||||
],
|
||||
props: {
|
||||
isReadOnly: {
|
||||
|
||||
@@ -41,8 +41,17 @@
|
||||
<el-col :md="24" :lg="embed ? 24 : 18">
|
||||
<h4>{{ $t("source") }}</h4>
|
||||
<el-card>
|
||||
<editor class="position-relative" :read-only="true" :input="true" :full-height="false" :minimap="false" :model-value="blueprint.flow" lang="yaml">
|
||||
<template #nav>
|
||||
<editor
|
||||
class="position-relative"
|
||||
:read-only="true"
|
||||
:input="true"
|
||||
:full-height="false"
|
||||
:minimap="false"
|
||||
:model-value="blueprint.flow"
|
||||
lang="yaml"
|
||||
:navbar="false"
|
||||
>
|
||||
<template #absolute>
|
||||
<copy-to-clipboard class="position-absolute" :text="blueprint.flow" />
|
||||
</template>
|
||||
</editor>
|
||||
@@ -91,7 +100,7 @@
|
||||
label: this.$t("blueprints.title"),
|
||||
link: {
|
||||
name: "blueprints",
|
||||
params: this.$route.params
|
||||
params: this.$route.params.tab ? this.$route.params.tab : {...this.$route.params, tab: this.tab},
|
||||
}
|
||||
}
|
||||
]
|
||||
@@ -109,6 +118,10 @@
|
||||
tab: {
|
||||
type: String,
|
||||
default: "community"
|
||||
},
|
||||
blueprintBaseUri: {
|
||||
type: String,
|
||||
default: undefined,
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
@@ -119,22 +132,26 @@
|
||||
this.$router.push({
|
||||
name: "blueprints",
|
||||
params: {
|
||||
tenant: this.$route.params.tenant
|
||||
tenant: this.$route.params.tenant,
|
||||
tab: this.tab
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
async created() {
|
||||
this.blueprint = (await this.$http.get(`${this.blueprintBaseUri}/${this.blueprintId}`)).data
|
||||
const TAB = this.$route.query?.tab ?? (this.embed ? this.tab : (this.$route?.params?.tab ?? "community"));
|
||||
const URL = this.blueprintBaseUri ?? `${apiUrl(this.$store)}/blueprints/` + TAB;
|
||||
|
||||
this.blueprint = (await this.$http.get(`${URL}/${this.blueprintId}`)).data;
|
||||
|
||||
try {
|
||||
if (this.blueprintBaseUri.endsWith("community")) {
|
||||
if (this.blueprintBaseUri?.endsWith("community")) {
|
||||
this.flowGraph = (await this.$http.get(`${this.blueprintBaseUri}/${this.blueprintId}/graph`, {
|
||||
validateStatus: (status) => {
|
||||
return status === 200;
|
||||
}
|
||||
})).data;
|
||||
}))?.data;
|
||||
} else {
|
||||
this.flowGraph = await this.$store.dispatch("flow/getGraphFromSourceResponse", {
|
||||
flow: this.blueprint.flow, config: {
|
||||
@@ -159,9 +176,6 @@
|
||||
...YamlUtils.parse(this.blueprint.flow),
|
||||
source: this.blueprint.flow
|
||||
}
|
||||
},
|
||||
blueprintBaseUri() {
|
||||
return `${apiUrl(this.$store)}/blueprints/` + (this.embed ? this.tab : (this.$route?.params?.tab ?? "community"));
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -257,4 +271,4 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -49,6 +49,10 @@ export default {
|
||||
return "condition"
|
||||
}
|
||||
|
||||
if (property.$ref.includes("tasks.runners.TaskRunner")) {
|
||||
return "task-runner"
|
||||
}
|
||||
|
||||
return "complex";
|
||||
}
|
||||
|
||||
|
||||
@@ -32,13 +32,13 @@
|
||||
mixins: [Task],
|
||||
emits: ["update:modelValue"],
|
||||
created() {
|
||||
if (!Array.isArray(this.modelValue)) {
|
||||
if (!Array.isArray(this.modelValue) && this.modelValue !== undefined) {
|
||||
this.$emit("update:modelValue", []);
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
values() {
|
||||
if (this.modelValue === undefined || (Array.isArray(this.modelValue) && this.modelValue.length === 0)) {
|
||||
if (this.modelValue === undefined) {
|
||||
return this.schema.default || [undefined];
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user