mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
134 Commits
feat/agent
...
v0.17.26
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27f0bfe1ac | ||
|
|
0143faed71 | ||
|
|
20891bf911 | ||
|
|
093ca322bb | ||
|
|
b3b788ca38 | ||
|
|
a3c85e3c19 | ||
|
|
debca279c0 | ||
|
|
254486df63 | ||
|
|
5e6a22c78c | ||
|
|
5613f74b93 | ||
|
|
6a915cb08b | ||
|
|
3056e9c402 | ||
|
|
dc7fef20b6 | ||
|
|
678205cfc0 | ||
|
|
87acffa052 | ||
|
|
c7f8132bfe | ||
|
|
af2a73f397 | ||
|
|
b0cff46b81 | ||
|
|
5ec053a824 | ||
|
|
c2cf0a90e5 | ||
|
|
0303e62b3d | ||
|
|
96b67b73bd | ||
|
|
6cced7a9e7 | ||
|
|
9259a805d8 | ||
|
|
595459fc12 | ||
|
|
42a637de7c | ||
|
|
ba7d4d9501 | ||
|
|
76b5022c08 | ||
|
|
1620c21c36 | ||
|
|
202c321a8a | ||
|
|
ea0b4e7469 | ||
|
|
c0975773a3 | ||
|
|
23e7af0d77 | ||
|
|
dcfc9acf74 | ||
|
|
b790d5e1d1 | ||
|
|
5213945a41 | ||
|
|
150eed2eff | ||
|
|
887a3a5dc4 | ||
|
|
d7109aa375 | ||
|
|
f2a33ebbca | ||
|
|
b130ac8de9 | ||
|
|
8f43ec13f3 | ||
|
|
5ae985ab16 | ||
|
|
b2d04925cb | ||
|
|
37ff254c9c | ||
|
|
7e968d16b6 | ||
|
|
2e4bf3338a | ||
|
|
0f3c455afc | ||
|
|
7439ea4a66 | ||
|
|
6546ce49f6 | ||
|
|
57bc235db6 | ||
|
|
2f416daac0 | ||
|
|
97779f3bc4 | ||
|
|
54c4f1d702 | ||
|
|
f2cc5c0da6 | ||
|
|
c2fc728414 | ||
|
|
90492354e2 | ||
|
|
856a5f5a73 | ||
|
|
f09fa74129 | ||
|
|
936e4019b6 | ||
|
|
4c44090462 | ||
|
|
9feac234f6 | ||
|
|
935212344c | ||
|
|
5d4c4dc214 | ||
|
|
a152354bcd | ||
|
|
08455bbdf0 | ||
|
|
64e9d8d43f | ||
|
|
dd1b435720 | ||
|
|
9fca8c9148 | ||
|
|
09d6e0f092 | ||
|
|
c3abfe08ad | ||
|
|
f2d5df082d | ||
|
|
16c15116fe | ||
|
|
f9f8d93ad7 | ||
|
|
c3b1ceb289 | ||
|
|
f9f33b96c8 | ||
|
|
658f847c48 | ||
|
|
dcec04e20e | ||
|
|
b38ec8a21b | ||
|
|
35d801cbc2 | ||
|
|
511a10a65c | ||
|
|
c5daebe4aa | ||
|
|
bb800948e5 | ||
|
|
65f921d456 | ||
|
|
ded21b0902 | ||
|
|
7b109baac2 | ||
|
|
c11fe3466f | ||
|
|
94c5b7a6e4 | ||
|
|
99ab5be8b9 | ||
|
|
aaa3a0ace0 | ||
|
|
acc5a24d9a | ||
|
|
892bb114ca | ||
|
|
30e4fe4e0b | ||
|
|
fcada08edd | ||
|
|
a2df125a62 | ||
|
|
9d9c5dc1d1 | ||
|
|
dbb1a8eaa5 | ||
|
|
1db6b57091 | ||
|
|
934ea201a5 | ||
|
|
5dcd5b5af8 | ||
|
|
000124f3dd | ||
|
|
c37f104446 | ||
|
|
3cfa48987f | ||
|
|
79c22ee22c | ||
|
|
d3a2fa13a5 | ||
|
|
20078f1e19 | ||
|
|
72b86d9edf | ||
|
|
59634133bc | ||
|
|
1e34a5528b | ||
|
|
4aa3bd3ef2 | ||
|
|
f6581de304 | ||
|
|
fd225d87b4 | ||
|
|
9bb3f576ee | ||
|
|
30cdb373cc | ||
|
|
59c7d6a567 | ||
|
|
9e4e5f891e | ||
|
|
ea3ba991d1 | ||
|
|
1024c77289 | ||
|
|
36b29d6065 | ||
|
|
1c8177e185 | ||
|
|
3dd5d6bb71 | ||
|
|
16a641693a | ||
|
|
efdb075155 | ||
|
|
a99d52a406 | ||
|
|
852edea36e | ||
|
|
defa426259 | ||
|
|
3aadcfd683 | ||
|
|
0f5d59103a | ||
|
|
50b9120434 | ||
|
|
896c761502 | ||
|
|
381d1b381f | ||
|
|
72a428a439 | ||
|
|
7447e61dbc | ||
|
|
45ffc3cc22 |
29
.github/workflows/docker.yml
vendored
29
.github/workflows/docker.yml
vendored
@@ -7,14 +7,7 @@ on:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
skip-test:
|
||||
description: 'Skip test'
|
||||
required: false
|
||||
type: string
|
||||
default: "false"
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
@@ -125,6 +118,16 @@ jobs:
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.10
|
||||
@@ -137,14 +140,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
@@ -179,7 +174,7 @@ jobs:
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{1}', matrix.image.name) }}
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM eclipse-temurin:21-jre
|
||||
FROM eclipse-temurin:21-jre-jammy
|
||||
|
||||
ARG KESTRA_PLUGINS=""
|
||||
ARG APT_PACKAGES=""
|
||||
|
||||
@@ -204,6 +204,8 @@ subprojects {
|
||||
testImplementation 'org.hamcrest:hamcrest:2.2'
|
||||
testImplementation 'org.hamcrest:hamcrest-library:2.2'
|
||||
testImplementation group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
|
||||
|
||||
testImplementation 'org.assertj:assertj-core:3.27.3'
|
||||
}
|
||||
|
||||
test {
|
||||
@@ -454,7 +456,7 @@ subprojects {
|
||||
}
|
||||
|
||||
maven.pom {
|
||||
description 'The modern, scalable orchestrator & scheduler open source platform'
|
||||
description = 'The modern, scalable orchestrator & scheduler open source platform'
|
||||
|
||||
developers {
|
||||
developer {
|
||||
|
||||
@@ -37,6 +37,12 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "a list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||
private List<String> startExecutors = Collections.emptyList();
|
||||
|
||||
@@ -54,6 +60,8 @@ public class ExecutorCommand extends AbstractServerCommand {
|
||||
public Integer call() throws Exception {
|
||||
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||
|
||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ public class LocalCommand extends StandAloneCommand {
|
||||
"kestra.queue.type", "h2",
|
||||
"kestra.storage.type", "local",
|
||||
"kestra.storage.local.base-path", data.toString(),
|
||||
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
|
||||
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
|
||||
"datasources.h2.username", "sa",
|
||||
"datasources.h2.password", "",
|
||||
"datasources.h2.driverClassName", "org.h2.Driver",
|
||||
|
||||
@@ -49,6 +49,12 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipFlows = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipNamespaces = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||
boolean tutorialsDisabled = false;
|
||||
|
||||
@@ -74,6 +80,8 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
public Integer call() throws Exception {
|
||||
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||
|
||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||
|
||||
|
||||
@@ -23,10 +23,12 @@ import org.apache.commons.lang3.ArrayUtils;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class MetricRegistry {
|
||||
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
|
||||
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
|
||||
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
|
||||
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
|
||||
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
|
||||
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
|
||||
public final static String METRIC_WORKER_RETRYED_COUNT = "worker.retryed.count";
|
||||
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
|
||||
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
|
||||
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
|
||||
@@ -143,7 +145,7 @@ public class MetricRegistry {
|
||||
*
|
||||
* @param workerTask the current WorkerTask
|
||||
* @param workerGroup the worker group, optional
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
|
||||
var baseTags = ArrayUtils.addAll(
|
||||
@@ -164,7 +166,7 @@ public class MetricRegistry {
|
||||
*
|
||||
* @param workerTrigger the current WorkerTask
|
||||
* @param workerGroup the worker group, optional
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
|
||||
var baseTags = ArrayUtils.addAll(
|
||||
@@ -184,7 +186,7 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link WorkerTaskResult}
|
||||
*
|
||||
* @param workerTaskResult the current WorkerTaskResult
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
|
||||
var baseTags = ArrayUtils.addAll(
|
||||
@@ -200,7 +202,7 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link WorkerTaskResult}
|
||||
*
|
||||
* @param subflowExecutionResult the current WorkerTaskResult
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(SubflowExecutionResult subflowExecutionResult, String... tags) {
|
||||
var baseTags = ArrayUtils.addAll(
|
||||
@@ -216,7 +218,7 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link Task}
|
||||
*
|
||||
* @param task the current Task
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(Task task) {
|
||||
return new String[]{
|
||||
@@ -240,7 +242,7 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link Execution}
|
||||
*
|
||||
* @param execution the current Execution
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(Execution execution) {
|
||||
var baseTags = new String[]{
|
||||
@@ -255,33 +257,21 @@ public class MetricRegistry {
|
||||
* Return tags for current {@link TriggerContext}
|
||||
*
|
||||
* @param triggerContext the current TriggerContext
|
||||
* @param workerGroup the worker group, optional
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(TriggerContext triggerContext, String workerGroup) {
|
||||
public String[] tags(TriggerContext triggerContext) {
|
||||
var baseTags = new String[]{
|
||||
TAG_FLOW_ID, triggerContext.getFlowId(),
|
||||
TAG_NAMESPACE_ID, triggerContext.getNamespace()
|
||||
};
|
||||
baseTags = workerGroup == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_WORKER_GROUP, workerGroup);
|
||||
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link TriggerContext}
|
||||
*
|
||||
* @param triggerContext the current TriggerContext
|
||||
* @return tags to applied to metrics
|
||||
*/
|
||||
public String[] tags(TriggerContext triggerContext) {
|
||||
return tags(triggerContext, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link SchedulerExecutionWithTrigger}.
|
||||
*
|
||||
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
|
||||
return ArrayUtils.addAll(
|
||||
@@ -294,7 +284,7 @@ public class MetricRegistry {
|
||||
/**
|
||||
* Return globals tags
|
||||
*
|
||||
* @return tags to applied to metrics
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public Tags tags(String... tags) {
|
||||
return Tags.of(tags);
|
||||
|
||||
@@ -9,6 +9,22 @@ import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
/**
|
||||
* The Kestra event for killing an execution. A {@link ExecutionKilled} can be in two states:
|
||||
* <p>
|
||||
* <pre>
|
||||
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
|
||||
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
|
||||
* </pre>
|
||||
*
|
||||
* A {@link ExecutionKilled} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
|
||||
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
|
||||
* to be killed no matter what the circumstances.
|
||||
* <p>
|
||||
* IMPORTANT: A {@link ExecutionKilled} is considered to be a fire-and-forget event. As a result, we do not manage a
|
||||
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilled}
|
||||
* before considering an execution to be KILLED.
|
||||
*/
|
||||
@Getter
|
||||
@SuperBuilder
|
||||
@EqualsAndHashCode
|
||||
|
||||
@@ -8,22 +8,6 @@ import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
/**
|
||||
* The Kestra event for killing an execution. A {@link ExecutionKilledExecution} can be in two states:
|
||||
* <p>
|
||||
* <pre>
|
||||
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
|
||||
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
|
||||
* </pre>
|
||||
*
|
||||
* A {@link ExecutionKilledExecution} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
|
||||
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
|
||||
* to be killed no matter what the circumstances.
|
||||
* <p>
|
||||
* IMPORTANT: A {@link ExecutionKilledExecution} is considered to be a fire-and-forget event. As a result, we do not manage a
|
||||
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilledExecution}
|
||||
* before considering an execution to be KILLED.
|
||||
*/
|
||||
@Getter
|
||||
@SuperBuilder
|
||||
@EqualsAndHashCode
|
||||
@@ -47,8 +31,9 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
|
||||
Boolean isOnKillCascade;
|
||||
|
||||
public boolean isEqual(WorkerTask workerTask) {
|
||||
return (workerTask.getTaskRun().getTenantId() == null || (workerTask.getTaskRun().getTenantId() != null && workerTask.getTaskRun().getTenantId().equals(this.tenantId))) &&
|
||||
workerTask.getTaskRun().getExecutionId().equals(this.executionId);
|
||||
String taskTenantId = workerTask.getTaskRun().getTenantId();
|
||||
String taskExecutionId = workerTask.getTaskRun().getExecutionId();
|
||||
return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
@@ -39,7 +38,6 @@ import jakarta.validation.constraints.Pattern;
|
||||
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
|
||||
@JsonSubTypes.Type(value = URIInput.class, name = "URI")
|
||||
})
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public abstract class Input<T> implements Data {
|
||||
@NotNull
|
||||
@NotBlank
|
||||
|
||||
@@ -24,7 +24,7 @@ public class EnumInput extends Input<String> {
|
||||
|
||||
@Override
|
||||
public void validate(String input) throws ConstraintViolationException {
|
||||
if (!values.contains(input)) {
|
||||
if (!values.contains(input) & this.getRequired()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + values + "`",
|
||||
this,
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class FileInput extends Input<URI> {
|
||||
@Builder.Default
|
||||
public String extension = ".upl";
|
||||
|
||||
@Override
|
||||
public void validate(URI input) throws ConstraintViolationException {
|
||||
// no validation yet
|
||||
|
||||
@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
|
||||
@Nullable
|
||||
private String executionId;
|
||||
|
||||
@Nullable
|
||||
private State.Type executionCurrentState;
|
||||
|
||||
@Nullable
|
||||
private Instant updatedDate;
|
||||
|
||||
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
|
||||
protected Trigger(TriggerBuilder<?, ?> b) {
|
||||
super(b);
|
||||
this.executionId = b.executionId;
|
||||
this.executionCurrentState = b.executionCurrentState;
|
||||
this.updatedDate = b.updatedDate;
|
||||
this.evaluateRunningDate = b.evaluateRunningDate;
|
||||
}
|
||||
@@ -79,7 +75,7 @@ public class Trigger extends TriggerContext {
|
||||
}
|
||||
|
||||
public String flowUid() {
|
||||
return Flow.uid(this.getTenantId(), this.getNamespace(), this.getFlowId(), Optional.of(this.getFlowRevision()));
|
||||
return Flow.uidWithoutRevision(this.getTenantId(), this.getNamespace(), this.getFlowId());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -90,7 +86,6 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.triggerId(abstractTrigger.getId())
|
||||
.stopAfter(abstractTrigger.getStopAfter())
|
||||
.build();
|
||||
@@ -137,12 +132,10 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(execution.getTenantId())
|
||||
.namespace(execution.getNamespace())
|
||||
.flowId(execution.getFlowId())
|
||||
.flowRevision(execution.getFlowRevision())
|
||||
.triggerId(execution.getTrigger().getId())
|
||||
.date(trigger.getDate())
|
||||
.nextExecutionDate(trigger.getNextExecutionDate())
|
||||
.executionId(execution.getId())
|
||||
.executionCurrentState(execution.getState().getCurrent())
|
||||
.updatedDate(Instant.now())
|
||||
.backfill(trigger.getBackfill())
|
||||
.stopAfter(trigger.getStopAfter())
|
||||
@@ -175,7 +168,6 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.triggerId(abstractTrigger.getId())
|
||||
.date(ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS))
|
||||
.nextExecutionDate(nextDate)
|
||||
@@ -225,7 +217,6 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(this.getTenantId())
|
||||
.namespace(this.getNamespace())
|
||||
.flowId(this.getFlowId())
|
||||
.flowRevision(this.getFlowRevision())
|
||||
.triggerId(this.getTriggerId())
|
||||
.date(this.getDate())
|
||||
.nextExecutionDate(nextExecutionDate)
|
||||
@@ -240,7 +231,6 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(this.getTenantId())
|
||||
.namespace(this.getNamespace())
|
||||
.flowId(this.getFlowId())
|
||||
.flowRevision(this.getFlowRevision())
|
||||
.triggerId(this.getTriggerId())
|
||||
.date(this.getDate())
|
||||
.nextExecutionDate(this.getNextExecutionDate())
|
||||
@@ -301,7 +291,6 @@ public class Trigger extends TriggerContext {
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(triggerContext.getFlowRevision())
|
||||
.triggerId(triggerContext.getTriggerId())
|
||||
.date(triggerContext.getDate())
|
||||
.backfill(triggerContext.getBackfill())
|
||||
|
||||
@@ -29,9 +29,6 @@ public class TriggerContext {
|
||||
@NotNull
|
||||
private String flowId;
|
||||
|
||||
@NotNull
|
||||
private Integer flowRevision;
|
||||
|
||||
@NotNull
|
||||
private String triggerId;
|
||||
|
||||
@@ -53,7 +50,6 @@ public class TriggerContext {
|
||||
this.tenantId = b.tenantId;
|
||||
this.namespace = b.namespace;
|
||||
this.flowId = b.flowId;
|
||||
this.flowRevision = b.flowRevision;
|
||||
this.triggerId = b.triggerId;
|
||||
this.date = b.date;
|
||||
this.nextExecutionDate = b.nextExecutionDate;
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
@@ -22,7 +20,7 @@ public abstract class TriggerService {
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
|
||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
|
||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||
}
|
||||
|
||||
public static Execution generateExecution(
|
||||
@@ -34,30 +32,32 @@ public abstract class TriggerService {
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
|
||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
|
||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||
}
|
||||
|
||||
public static Execution generateRealtimeExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
Output output
|
||||
) {
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
||||
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger);
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
TriggerContext context,
|
||||
ExecutionTrigger executionTrigger
|
||||
ExecutionTrigger executionTrigger,
|
||||
Integer flowRevision
|
||||
) {
|
||||
return Execution.builder()
|
||||
.id(id)
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(context.getFlowRevision())
|
||||
.flowRevision(flowRevision)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.labels(trigger.getLabels() == null ? null : trigger.getLabels())
|
||||
|
||||
@@ -51,7 +51,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
@Nullable ChildFilter childFilter
|
||||
);
|
||||
|
||||
Flux<Execution> find(
|
||||
default Flux<Execution> find(
|
||||
@Nullable String query,
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@@ -62,6 +62,22 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
@Nullable Map<String, String> labels,
|
||||
@Nullable String triggerExecutionId,
|
||||
@Nullable ChildFilter childFilter
|
||||
) {
|
||||
return find(query, tenantId, namespace, flowId, startDate, endDate, state, labels, triggerExecutionId, childFilter, false);
|
||||
}
|
||||
|
||||
Flux<Execution> find(
|
||||
@Nullable String query,
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable ZonedDateTime startDate,
|
||||
@Nullable ZonedDateTime endDate,
|
||||
@Nullable List<State.Type> state,
|
||||
@Nullable Map<String, String> labels,
|
||||
@Nullable String triggerExecutionId,
|
||||
@Nullable ChildFilter childFilter,
|
||||
boolean allowDeleted
|
||||
);
|
||||
|
||||
ArrayListTotal<TaskRun> findTaskRun(
|
||||
|
||||
@@ -35,6 +35,11 @@ public class Executor {
|
||||
private ExecutionResumed executionResumed;
|
||||
private ExecutionResumed joinedExecutionResumed;
|
||||
|
||||
/**
|
||||
* The sequence id should be incremented each time the execution is persisted after mutation.
|
||||
*/
|
||||
private long seqId = 0L;
|
||||
|
||||
/**
|
||||
* List of {@link ExecutionKilled} to be propagated part of the execution.
|
||||
*/
|
||||
@@ -45,6 +50,12 @@ public class Executor {
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
public Executor(Execution execution, Long offset, long seqId) {
|
||||
this.execution = execution;
|
||||
this.offset = offset;
|
||||
this.seqId = seqId;
|
||||
}
|
||||
|
||||
public Executor(WorkerTaskResult workerTaskResult) {
|
||||
this.joinedWorkerTaskResult = workerTaskResult;
|
||||
}
|
||||
@@ -148,7 +159,18 @@ public class Executor {
|
||||
public Executor serialize() {
|
||||
return new Executor(
|
||||
this.execution,
|
||||
this.offset
|
||||
this.offset,
|
||||
this.seqId
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments and returns the execution sequence id.
|
||||
*
|
||||
* @return the sequence id.
|
||||
*/
|
||||
public long incrementAndGetSeqId() {
|
||||
this.seqId++;
|
||||
return seqId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.tasks.runners.PluginUtilsService;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
@@ -44,17 +45,16 @@ public abstract class FilesService {
|
||||
file.getParentFile().mkdirs();
|
||||
}
|
||||
|
||||
var fileContent = runContext.render(input, additionalVars);
|
||||
if (fileContent == null) {
|
||||
if (input == null) {
|
||||
file.createNewFile();
|
||||
} else {
|
||||
if (fileContent.startsWith("kestra://")) {
|
||||
try (var is = runContext.storage().getFile(URI.create(fileContent));
|
||||
if (input.startsWith("kestra://")) {
|
||||
try (var is = runContext.storage().getFile(URI.create(input));
|
||||
var out = new FileOutputStream(file)) {
|
||||
IOUtils.copyLarge(is, out);
|
||||
}
|
||||
} else {
|
||||
Files.write(file.toPath(), fileContent.getBytes());
|
||||
Files.write(file.toPath(), input.getBytes());
|
||||
}
|
||||
}
|
||||
}));
|
||||
@@ -85,10 +85,14 @@ public abstract class FilesService {
|
||||
.filter(path -> pathMatcher.matches(runContext.tempDir().relativize(path)))
|
||||
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
|
||||
runContext.tempDir().relativize(path).toString(),
|
||||
runContext.storage().putFile(path.toFile())
|
||||
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
|
||||
)))
|
||||
.toList()
|
||||
.stream();
|
||||
}
|
||||
}
|
||||
|
||||
private static String resolveUniqueNameForFile(final Path path) {
|
||||
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.ArrayInput;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -85,7 +86,9 @@ public class FlowInputOutput {
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.map(throwFunction(input -> {
|
||||
if (input instanceof CompletedFileUpload fileUpload) {
|
||||
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", ".upl");
|
||||
String fileExtension = inputs.stream().filter(flowInput -> flowInput instanceof FileInput && flowInput.getId().equals(fileUpload.getFilename())).map(flowInput -> ((FileInput) flowInput).getExtension()).findFirst().orElse(".upl");
|
||||
fileExtension = fileExtension.startsWith(".") ? fileExtension : "." + fileExtension;
|
||||
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", fileExtension);
|
||||
try (var inputStream = fileUpload.getInputStream();
|
||||
var outputStream = new FileOutputStream(tempFile)) {
|
||||
long transferredBytes = inputStream.transferTo(outputStream);
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.kestra.core.services.FlowListenersInterface;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
@@ -98,13 +99,13 @@ public class FlowListeners implements FlowListenersInterface {
|
||||
private Optional<Flow> previous(Flow flow) {
|
||||
return flows
|
||||
.stream()
|
||||
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
||||
.filter(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
private boolean remove(Flow flow) {
|
||||
synchronized (this) {
|
||||
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
||||
boolean remove = flows.removeIf(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
||||
if (!remove && flow.isDeleted()) {
|
||||
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class NamespaceFilesService {
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception {
|
||||
if (!namespaceFiles.getEnabled()) {
|
||||
if (!Boolean.TRUE.equals(namespaceFiles.getEnabled())) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
||||
@@ -339,7 +339,7 @@ public class RunContext {
|
||||
if (execution.getTaskRunList() != null) {
|
||||
Map<String, Object> outputs = new HashMap<>(execution.outputs());
|
||||
if (decryptVariables) {
|
||||
decryptOutputs(outputs);
|
||||
outputs = decryptOutputs(outputs);
|
||||
}
|
||||
builder.put("outputs", outputs);
|
||||
}
|
||||
@@ -401,25 +401,37 @@ public class RunContext {
|
||||
));
|
||||
}
|
||||
|
||||
if (this.runContextLogger != null) {
|
||||
builder.put("addSecretConsumer", (Consumer<String>) s -> this.runContextLogger.usedSecret(s));
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private void decryptOutputs(Map<String, Object> outputs) {
|
||||
for (var entry: outputs.entrySet()) {
|
||||
private Map<String, Object> decryptOutputs(Map<String, Object> mapToDecrypt) {
|
||||
if (mapToDecrypt == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, Object> decryptedMap = new HashMap<>();
|
||||
for (var entry: mapToDecrypt.entrySet()) {
|
||||
decryptedMap.put(entry.getKey(), entry.getValue());
|
||||
if (entry.getValue() instanceof Map map) {
|
||||
// if some outputs are of type EncryptedString we decode them and replace the object
|
||||
// if some value are of type EncryptedString we decode them and replace the object
|
||||
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
||||
try {
|
||||
String decoded = decrypt((String) map.get("value"));
|
||||
outputs.put(entry.getKey(), decoded);
|
||||
decryptedMap.put(entry.getKey(), decoded);
|
||||
} catch (GeneralSecurityException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
decryptOutputs((Map<String, Object>) map);
|
||||
decryptedMap.put(entry.getKey(), decryptOutputs((Map<String, Object>) map));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return decryptedMap;
|
||||
}
|
||||
|
||||
private Map<String, Object> variables(TaskRun taskRun) {
|
||||
@@ -502,6 +514,8 @@ public class RunContext {
|
||||
runContext.runContextLogger = this.runContextLogger;
|
||||
runContext.tempBasedPath = this.tempBasedPath;
|
||||
runContext.temporaryDirectory = this.temporaryDirectory;
|
||||
runContext.pluginConfiguration = this.pluginConfiguration;
|
||||
runContext.secretKey = this.secretKey;
|
||||
|
||||
return runContext;
|
||||
}
|
||||
@@ -583,6 +597,17 @@ public class RunContext {
|
||||
return newContext;
|
||||
}
|
||||
|
||||
public RunContext forWorkingDirectoryTask(final Task task) {
|
||||
Map<String, Object> decryptedVariables = new HashMap<>(this.variables);
|
||||
if (this.variables.get("outputs") != null) {
|
||||
decryptedVariables.put("outputs", decryptOutputs((Map<String, Object>) this.variables.get("outputs")));
|
||||
}
|
||||
|
||||
RunContext newRunContext = this.clone(decryptedVariables);
|
||||
newRunContext.initPluginConfiguration(applicationContext, task.getClass(), task.getType());
|
||||
return newRunContext;
|
||||
}
|
||||
|
||||
public RunContext forTaskRunner(TaskRunner taskRunner) {
|
||||
this.initPluginConfiguration(applicationContext, taskRunner.getClass(), taskRunner.getType());
|
||||
|
||||
|
||||
@@ -166,7 +166,7 @@ public class VariableRenderer {
|
||||
}
|
||||
|
||||
public Map<String, Object> render(Map<String, Object> in, Map<String, Object> variables, boolean recursive) throws IllegalVariableEvaluationException {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
Map<String, Object> map = new LinkedHashMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> r : in.entrySet()) {
|
||||
String key = this.render(r.getKey(), variables);
|
||||
|
||||
@@ -34,6 +34,7 @@ import io.micronaut.context.annotation.Parameter;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
@@ -126,6 +127,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
||||
|
||||
private final Integer numThreads;
|
||||
private final AtomicInteger pendingJobCount = new AtomicInteger(0);
|
||||
private final AtomicInteger runningJobCount = new AtomicInteger(0);
|
||||
/**
|
||||
* Creates a new {@link Worker} instance.
|
||||
*
|
||||
@@ -143,6 +147,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
ExecutorsUtils executorsUtils
|
||||
) {
|
||||
this.id = workerId;
|
||||
this.numThreads = numThreads;
|
||||
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.executorService = executorsUtils.maxCachedThreadPool(numThreads, "worker");
|
||||
@@ -168,6 +173,15 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
context.inject(this);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void initMetrics() {
|
||||
String[] tags = this.workerGroup == null ? new String[0] : new String[] { MetricRegistry.TAG_WORKER_GROUP, this.workerGroup };
|
||||
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
|
||||
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
setState(ServiceState.RUNNING);
|
||||
@@ -208,19 +222,29 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
this.workerGroup,
|
||||
Worker.class,
|
||||
either -> {
|
||||
pendingJobCount.incrementAndGet();
|
||||
|
||||
executorService.execute(() -> {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||
handleDeserializationError(either.getRight());
|
||||
return;
|
||||
pendingJobCount.decrementAndGet();
|
||||
runningJobCount.incrementAndGet();
|
||||
|
||||
try {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||
handleDeserializationError(either.getRight());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerJob workerTask = either.getLeft();
|
||||
if (workerTask instanceof WorkerTask task) {
|
||||
handleTask(task);
|
||||
} else if (workerTask instanceof WorkerTrigger trigger) {
|
||||
handleTrigger(trigger);
|
||||
}
|
||||
} finally {
|
||||
runningJobCount.decrementAndGet();
|
||||
}
|
||||
|
||||
WorkerJob workerTask = either.getLeft();
|
||||
if (workerTask instanceof WorkerTask task) {
|
||||
handleTask(task);
|
||||
} else if (workerTask instanceof WorkerTrigger trigger) {
|
||||
handleTrigger(trigger);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
@@ -285,7 +309,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
WorkerTask currentWorkerTask = workingDirectory.workerTask(
|
||||
workerTask.getTaskRun(),
|
||||
currentTask,
|
||||
runContext
|
||||
runContext.forWorkingDirectoryTask(currentTask)
|
||||
);
|
||||
|
||||
// all tasks will be handled immediately by the worker
|
||||
@@ -378,13 +402,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
.increment();
|
||||
|
||||
this.metricRegistry
|
||||
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
|
||||
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||
.record(() -> {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
|
||||
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
|
||||
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
|
||||
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
|
||||
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
|
||||
|
||||
try {
|
||||
|
||||
@@ -70,12 +70,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
private final WorkerGroupService workerGroupService;
|
||||
private final LogService logService;
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
|
||||
@@ -178,7 +180,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
if (abstractTrigger instanceof WorkerTriggerInterface) {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger trigger = Trigger.of(flow, abstractTrigger);
|
||||
|
||||
try {
|
||||
this.triggerState.update(flow, abstractTrigger, conditionContext);
|
||||
@@ -186,6 +187,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(conditionContext, flow, abstractTrigger, e);
|
||||
}
|
||||
|
||||
Trigger trigger = Trigger.of(flow, abstractTrigger);
|
||||
this.executionKilledQueue.emit(ExecutionKilledTrigger
|
||||
.builder()
|
||||
.tenantId(trigger.getTenantId())
|
||||
@@ -255,7 +257,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.tenantId(flowAndTrigger.flow().getTenantId())
|
||||
.namespace(flowAndTrigger.flow().getNamespace())
|
||||
.flowId(flowAndTrigger.flow().getId())
|
||||
.flowRevision(flowAndTrigger.flow().getRevision())
|
||||
.triggerId(flowAndTrigger.trigger().getId())
|
||||
.date(now())
|
||||
.nextExecutionDate(nextExecutionDate)
|
||||
@@ -343,7 +344,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(conditionContext, flow, abstractTrigger, e);
|
||||
return null;
|
||||
}
|
||||
this.triggerState.save(triggerContext, scheduleContext);
|
||||
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
|
||||
} else {
|
||||
triggerContext = lastTrigger;
|
||||
}
|
||||
@@ -368,7 +369,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
public List<FlowWithTriggers> schedulerTriggers() {
|
||||
Map<String, Flow> flows = this.flowListeners.flows()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Flow::uid, Function.identity()));
|
||||
.collect(Collectors.toMap(Flow::uidWithoutRevision, Function.identity()));
|
||||
|
||||
return this.triggerState.findAllForAllTenants().stream()
|
||||
.filter(trigger -> flows.containsKey(trigger.flowUid()))
|
||||
@@ -432,11 +433,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.peek(f -> {
|
||||
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
|
||||
this.triggerState.unlock(f.getTriggerContext());
|
||||
}
|
||||
})
|
||||
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
|
||||
.filter(this::isExecutionNotRunning)
|
||||
.map(FlowWithWorkerTriggerNextDate::of)
|
||||
@@ -472,7 +468,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
|
||||
|
||||
try {
|
||||
this.triggerState.save(triggerRunning, scheduleContext);
|
||||
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
|
||||
this.sendWorkerTriggerToWorker(f);
|
||||
} catch (InternalException e) {
|
||||
logService.logTrigger(
|
||||
@@ -497,7 +493,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
|
||||
);
|
||||
trigger = trigger.checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
|
||||
}
|
||||
} else {
|
||||
logService.logTrigger(
|
||||
@@ -515,7 +511,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(f, e);
|
||||
}
|
||||
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
|
||||
}
|
||||
} catch (InternalException ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
@@ -526,13 +522,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.tenantId(f.getTriggerContext().getTenantId())
|
||||
.namespace(f.getTriggerContext().getNamespace())
|
||||
.flowId(f.getTriggerContext().getFlowId())
|
||||
.flowRevision(f.getTriggerContext().getFlowRevision())
|
||||
.labels(f.getFlow().getLabels())
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -572,7 +567,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||
// So we must save them by passing the scheduleContext.
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
|
||||
}
|
||||
|
||||
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
|
||||
@@ -593,8 +588,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
return true;
|
||||
}
|
||||
|
||||
// The execution is not yet started, we skip
|
||||
if (lastTrigger.getExecutionCurrentState() == null) {
|
||||
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
|
||||
|
||||
// executionState hasn't received the execution, we skip
|
||||
if (execution.isEmpty()) {
|
||||
if (lastTrigger.getUpdatedDate() != null) {
|
||||
metricRegistry
|
||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
||||
@@ -621,6 +618,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
||||
}
|
||||
|
||||
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
|
||||
// check that if an executionId but no state, this means the execution is not started
|
||||
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
|
||||
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
|
||||
if (log.isDebugEnabled()) {
|
||||
logService.logTrigger(
|
||||
f.getTriggerContext(),
|
||||
@@ -628,7 +629,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Level.DEBUG,
|
||||
"Execution '{}' is still '{}', updated at '{}'",
|
||||
lastTrigger.getExecutionId(),
|
||||
lastTrigger.getExecutionCurrentState(),
|
||||
execution.get().getState().getCurrent(),
|
||||
lastTrigger.getUpdatedDate()
|
||||
);
|
||||
}
|
||||
@@ -849,7 +850,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.tenantId(f.getTriggerContext().getTenantId())
|
||||
.namespace(f.getTriggerContext().getNamespace())
|
||||
.flowId(f.getTriggerContext().getFlowId())
|
||||
.flowRevision(f.getTriggerContext().getFlowRevision())
|
||||
.triggerId(f.getTriggerContext().getTriggerId())
|
||||
.date(f.getTriggerContext().getNextExecutionDate())
|
||||
.nextExecutionDate(f.getTriggerContext().getNextExecutionDate())
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
// For tests purpose
|
||||
public class DefaultScheduleContext implements ScheduleContextInterface {}
|
||||
public class DefaultScheduleContext implements ScheduleContextInterface {
|
||||
@Override
|
||||
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
consumer.accept(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
|
||||
public DefaultScheduler(
|
||||
ApplicationContext applicationContext,
|
||||
FlowListenersInterface flowListeners,
|
||||
SchedulerExecutionStateInterface executionState,
|
||||
SchedulerTriggerStateInterface triggerState
|
||||
) {
|
||||
super(applicationContext, flowListeners);
|
||||
this.triggerState = triggerState;
|
||||
this.executionState = executionState;
|
||||
|
||||
this.conditionService = applicationContext.getBean(ConditionService.class);
|
||||
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
|
||||
* See AbstractScheduler.handle().
|
||||
*/
|
||||
public interface ScheduleContextInterface {
|
||||
/**
|
||||
* Do trigger retrieval and updating in a single transaction.
|
||||
*/
|
||||
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
|
||||
import java.util.Optional;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Override
|
||||
public Optional<Execution> findById(String tenantId, String id) {
|
||||
return executionRepository.findById(tenantId, id);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface SchedulerExecutionStateInterface {
|
||||
Optional<Execution> findById(String tenantId, String id);
|
||||
}
|
||||
@@ -20,19 +20,22 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
Trigger create(Trigger trigger) throws ConstraintViolationException;
|
||||
|
||||
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger update(Trigger trigger);
|
||||
|
||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* Used by the JDBC implementation: find triggers in all tenants.
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
/**
|
||||
* Required for Kafka
|
||||
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
|
||||
*/
|
||||
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||
|
||||
/**
|
||||
* Required for Kafka
|
||||
*/
|
||||
void unlock(Trigger trigger);
|
||||
}
|
||||
|
||||
@@ -345,7 +345,8 @@ public class ExecutionService {
|
||||
state,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
true
|
||||
)
|
||||
.map(throwFunction(execution -> {
|
||||
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
|
||||
@@ -478,6 +479,7 @@ public class ExecutionService {
|
||||
.executionId(childExecution.getId())
|
||||
.isOnKillCascade(true)
|
||||
.state(ExecutionKilled.State.REQUESTED) // Event will be reentrant in the Executor.
|
||||
.tenantId(tenantId)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -12,13 +12,23 @@ import java.util.List;
|
||||
public class SkipExecutionService {
|
||||
private volatile List<String> skipExecutions = Collections.emptyList();
|
||||
private volatile List<FlowId> skipFlows = Collections.emptyList();
|
||||
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
|
||||
private volatile List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
public synchronized void setSkipExecutions(List<String> skipExecutions) {
|
||||
this.skipExecutions = skipExecutions;
|
||||
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
|
||||
}
|
||||
|
||||
public synchronized void setSkipFlows(List<String> skipFlows) {
|
||||
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(flow -> FlowId.from(flow)).toList();
|
||||
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).toList();
|
||||
}
|
||||
|
||||
public synchronized void setSkipNamespaces(List<String> skipNamespaces) {
|
||||
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).toList();
|
||||
}
|
||||
|
||||
public synchronized void setSkipTenants(List<String> skipTenants) {
|
||||
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -38,17 +48,30 @@ public class SkipExecutionService {
|
||||
|
||||
@VisibleForTesting
|
||||
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
|
||||
return skipExecutions.contains(executionId) ||
|
||||
skipFlows.contains(new FlowId(tenant, namespace, flow));
|
||||
return (tenant != null && skipTenants.contains(tenant)) ||
|
||||
skipNamespaces.contains(new NamespaceId(tenant, namespace)) ||
|
||||
skipFlows.contains(new FlowId(tenant, namespace, flow)) ||
|
||||
(executionId != null && skipExecutions.contains(executionId));
|
||||
}
|
||||
|
||||
private static String[] splitIdParts(String id) {
|
||||
return id.split("\\|");
|
||||
}
|
||||
|
||||
record FlowId(String tenant, String namespace, String flow) {
|
||||
static FlowId from(String flowId) {
|
||||
String[] parts = flowId.split("\\|");
|
||||
String[] parts = SkipExecutionService.splitIdParts(flowId);
|
||||
if (parts.length == 3) {
|
||||
return new FlowId(parts[0], parts[1], parts[2]);
|
||||
}
|
||||
return new FlowId(null, parts[0], parts[1]);
|
||||
}
|
||||
};
|
||||
|
||||
record NamespaceId(String tenant, String namespace) {
|
||||
static NamespaceId from(String namespaceId) {
|
||||
String[] parts = SkipExecutionService.splitIdParts(namespaceId);
|
||||
return new NamespaceId(parts[0], parts[1]);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -152,7 +152,6 @@ abstract public class TestsUtils {
|
||||
.triggerId(trigger.getId())
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowRevision(flow.getRevision())
|
||||
.date(ZonedDateTime.now())
|
||||
.build();
|
||||
|
||||
|
||||
@@ -11,30 +11,15 @@ import io.kestra.core.models.executions.TaskRunAttempt;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.runners.ExecutableUtils;
|
||||
import io.kestra.core.runners.FlowExecutorInterface;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.SubflowExecution;
|
||||
import io.kestra.core.runners.SubflowExecutionResult;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -153,6 +138,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
|
||||
if (this.labels != null) {
|
||||
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
|
||||
labels.removeIf(label -> label.key().equals(entry.getKey()));
|
||||
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
||||
return false;
|
||||
}
|
||||
|
||||
Integer iterationCount = (Integer) parentTaskRun.getOutputs().get("iterationCount");
|
||||
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
|
||||
.map(outputs -> (Integer) outputs.get("iterationCount"))
|
||||
.orElse(0);
|
||||
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
|
||||
if (printLog) {logger.warn("Max iterations reached");}
|
||||
return true;
|
||||
@@ -236,7 +238,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
||||
|
||||
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||
String value = parentTaskRun != null ?
|
||||
parentTaskRun.getOutputs().get("iterationCount").toString() : "0";
|
||||
String.valueOf(Optional.ofNullable(parentTaskRun.getOutputs())
|
||||
.map(outputs -> outputs.get("iterationCount"))
|
||||
.orElse("0")) : "0";
|
||||
|
||||
return Output.builder()
|
||||
.iterationCount(Integer.parseInt(value) + 1)
|
||||
|
||||
@@ -270,7 +270,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
||||
}
|
||||
}
|
||||
|
||||
if (this.namespaceFiles != null ) {
|
||||
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
||||
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.net.URI;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@@ -122,6 +123,16 @@ public class Request extends AbstractHttp implements RunnableTask<Request.Output
|
||||
response = client
|
||||
.toBlocking()
|
||||
.exchange(request, Argument.STRING, Argument.STRING);
|
||||
|
||||
// check that the string is a valid Unicode string
|
||||
if (response.getBody().isPresent()) {
|
||||
OptionalInt illegalChar = response.body().chars().filter(c -> !Character.isDefined(c)).findFirst();
|
||||
if (illegalChar.isPresent()) {
|
||||
throw new IllegalArgumentException("Illegal unicode code point in request body: " + illegalChar.getAsInt() +
|
||||
", the Request task only support valid Unicode strings as body.\n" +
|
||||
"You can try using the Download task instead.");
|
||||
}
|
||||
}
|
||||
} catch (HttpClientResponseException e) {
|
||||
if (!allowFailed) {
|
||||
throw e;
|
||||
|
||||
@@ -80,7 +80,6 @@ public class DeleteFiles extends Task implements RunnableTask<DeleteFiles.Output
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "A file or a list of files from the given namespace.",
|
||||
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
||||
|
||||
@@ -11,7 +11,6 @@ import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -84,7 +83,6 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "A file or a list of files from the given namespace.",
|
||||
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
||||
@@ -93,11 +91,19 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
||||
@PluginProperty(dynamic = true)
|
||||
private Object files;
|
||||
|
||||
@Schema(
|
||||
title = "The folder where the downloaded files will be stored"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
@Builder.Default
|
||||
private String destination = "";
|
||||
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Logger logger = runContext.logger();
|
||||
String renderedNamespace = runContext.render(namespace);
|
||||
String renderedDestination = runContext.render(destination);
|
||||
// Check if namespace is allowed
|
||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||
FlowService flowService = runContext.getApplicationContext().getBean(FlowService.class);
|
||||
@@ -120,7 +126,7 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
||||
namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> {
|
||||
if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) {
|
||||
try (InputStream inputStream = namespaceFilesService.content(flowInfo.tenantId(), renderedNamespace, uri)) {
|
||||
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, uri.getPath()));
|
||||
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, destination + uri.getPath()));
|
||||
logger.debug(String.format("Downloaded %s", uri));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,10 +138,10 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
|
||||
});
|
||||
|
||||
// check for file in current tempDir that match regexs
|
||||
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + reg)).toList();
|
||||
for (File file : Objects.requireNonNull(runContext.tempDir().toFile().listFiles())) {
|
||||
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + runContext.tempDir().toString() + checkLeadingSlash(reg))).toList();
|
||||
for (File file : Objects.requireNonNull(listFilesRecursively(runContext.tempDir().toFile()))) {
|
||||
if (patterns.stream().anyMatch(p -> p.matches(Path.of(file.toURI().getPath())))) {
|
||||
String newFilePath = buildPath(renderedDestination, file.getName());
|
||||
String newFilePath = buildPath(renderedDestination, file.getPath().replace(runContext.tempDir().toString(), ""));
|
||||
storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, new FileInputStream(file));
|
||||
}
|
||||
}
|
||||
@@ -199,6 +199,24 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
|
||||
}
|
||||
}
|
||||
|
||||
private List<File> listFilesRecursively(File directory) throws IOException {
|
||||
List<File> files = new ArrayList<>();
|
||||
if (directory == null || !directory.isDirectory()) {
|
||||
return files; // Handle invalid directory or not a directory
|
||||
}
|
||||
|
||||
for (File file : directory.listFiles()) {
|
||||
if (file.isFile()) {
|
||||
files.add(file);
|
||||
} else {
|
||||
// Recursively call for subdirectories
|
||||
files.addAll(listFilesRecursively(file));
|
||||
}
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
package io.kestra.plugin.core.trigger;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
@@ -91,7 +95,7 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.labels(flow.getLabels())
|
||||
.labels(generateLabels(runContext, flow))
|
||||
.state(new State())
|
||||
.trigger(ExecutionTrigger.of(
|
||||
this,
|
||||
@@ -128,6 +132,34 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
||||
}
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
|
||||
final List<Label> labels = new ArrayList<>();
|
||||
|
||||
if (flow.getLabels() != null) {
|
||||
labels.addAll(flow.getLabels()); // no need for rendering
|
||||
}
|
||||
|
||||
if (this.getLabels() != null) {
|
||||
for (Label label : this.getLabels()) {
|
||||
final var value = renderLabelValue(runContext, label);
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
}
|
||||
|
||||
private String renderLabelValue(RunContext runContext, Label label) {
|
||||
try {
|
||||
return runContext.render(label.value());
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Builder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
|
||||
import com.cronutils.model.time.ExecutionTime;
|
||||
import com.cronutils.parser.CronParser;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
@@ -349,14 +350,13 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
// validate schedule condition can fail to render variables
|
||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(triggerContext.getFlowRevision())
|
||||
.labels(labels)
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||
.state(new State().withState(State.Type.FAILED))
|
||||
.build();
|
||||
return Optional.of(execution);
|
||||
@@ -390,7 +390,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
} else {
|
||||
variables = scheduleDates.toMap();
|
||||
}
|
||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
||||
List<Label> labels = generateLabels(runContext, conditionContext, backfill);
|
||||
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
|
||||
|
||||
@@ -399,7 +399,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
.tenantId(triggerContext.getTenantId())
|
||||
.namespace(triggerContext.getNamespace())
|
||||
.flowId(triggerContext.getFlowId())
|
||||
.flowRevision(triggerContext.getFlowRevision())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.labels(labels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
@@ -425,19 +425,29 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
||||
.orElse(RecoverMissedSchedules.ALL);
|
||||
}
|
||||
|
||||
private List<Label> generateLabels(ConditionContext conditionContext, Backfill backfill) {
|
||||
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||
List<Label> labels = new ArrayList<>();
|
||||
|
||||
if (conditionContext.getFlow().getLabels() != null) {
|
||||
labels.addAll(conditionContext.getFlow().getLabels());
|
||||
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
|
||||
}
|
||||
|
||||
if (backfill != null && backfill.getLabels() != null) {
|
||||
labels.addAll(backfill.getLabels());
|
||||
for (Label label : backfill.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.getLabels() != null) {
|
||||
labels.addAll(this.getLabels());
|
||||
for (Label label : this.getLabels()) {
|
||||
final var value = runContext.render(label.value());
|
||||
if (value != null) {
|
||||
labels.add(new Label(label.key(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return labels;
|
||||
|
||||
@@ -26,7 +26,6 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
return Trigger.builder()
|
||||
.flowId(IdUtils.create())
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.flowRevision(1)
|
||||
.triggerId(IdUtils.create())
|
||||
.executionId(IdUtils.create())
|
||||
.date(ZonedDateTime.now());
|
||||
|
||||
@@ -88,8 +88,7 @@ public class DeserializationIssuesCaseTest {
|
||||
"date": "2023-11-24T15:48:57.632881597Z",
|
||||
"flowId": "http-trigger",
|
||||
"namespace": "dev",
|
||||
"triggerId": "http",
|
||||
"flowRevision": 3
|
||||
"triggerId": "http"
|
||||
},
|
||||
"conditionContext": {
|
||||
"flow": {
|
||||
|
||||
@@ -34,6 +34,13 @@ class FilesServiceTest {
|
||||
assertThat(content.get("file.txt"), is("Hello World"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void renderRawFile() throws Exception {
|
||||
RunContext runContext = runContextFactory.of(Map.of("filename", "file.txt", "content", "Hello World"));
|
||||
Map<String, String> content = FilesService.inputFiles(runContext, Map.of("{{filename}}", "{% raw %}{{content}}{% endraw %}"));
|
||||
assertThat(content.get("file.txt"), is("{{content}}"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void outputFiles() throws Exception {
|
||||
RunContext runContext = runContextFactory.of();
|
||||
|
||||
@@ -104,6 +104,13 @@ abstract public class FlowListenersTest {
|
||||
assertThat(count.get(), is(2));
|
||||
assertThat(flowListenersService.flows().size(), is(2));
|
||||
});
|
||||
|
||||
Flow withTenant = first.toBuilder().tenantId("some-tenant").build();
|
||||
flowRepository.create(withTenant, withTenant.generateSource(), pluginDefaultService.injectDefaults(withTenant));
|
||||
wait(ref, () -> {
|
||||
assertThat(count.get(), is(3));
|
||||
assertThat(flowListenersService.flows().size(), is(3));
|
||||
});
|
||||
}
|
||||
|
||||
public static class Ref {
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.LinkedHashMap;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -18,6 +22,9 @@ class VariableRendererTest {
|
||||
@Inject
|
||||
VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
@Inject
|
||||
VariableRenderer variableRenderer;
|
||||
|
||||
@Test
|
||||
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
|
||||
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
|
||||
@@ -25,6 +32,25 @@ class VariableRendererTest {
|
||||
Assertions.assertEquals("result", render);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldKeepKeyOrderWhenRenderingMap() throws IllegalVariableEvaluationException {
|
||||
final Map<String, Object> input = new LinkedHashMap<>();
|
||||
input.put("foo-1", "A");
|
||||
input.put("foo-2", "B");
|
||||
|
||||
final Map<String, Object> input_value3 = new LinkedHashMap<>();
|
||||
input_value3.put("bar-1", "C");
|
||||
input_value3.put("bar-2", "D");
|
||||
input_value3.put("bar-3", "E");
|
||||
//
|
||||
input.put("foo-3", input_value3);
|
||||
|
||||
final Map<String, Object> result = variableRenderer.render(input, Map.of());
|
||||
assertThat(result.keySet(), contains("foo-1", "foo-2", "foo-3"));
|
||||
|
||||
final Map<String, Object> result_value3 = (Map<String, Object>) result.get("foo-3");
|
||||
assertThat(result_value3.keySet(), contains("bar-1", "bar-2", "bar-3"));
|
||||
}
|
||||
|
||||
public static class TestVariableRenderer extends VariableRenderer {
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ abstract public class AbstractSchedulerTest {
|
||||
.id(IdUtils.create())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(context.getFlowRevision())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.state(new State())
|
||||
.trigger(ExecutionTrigger.builder()
|
||||
.id(this.getId())
|
||||
|
||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.TestMethodScopedWorker;
|
||||
import io.kestra.core.runners.Worker;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -18,13 +17,13 @@ import java.time.ZonedDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
private static Flow createScheduleFlow() {
|
||||
Schedule schedule = Schedule.builder()
|
||||
.id("hourly")
|
||||
@@ -58,6 +60,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(4);
|
||||
|
||||
Flow flow = createScheduleFlow();
|
||||
@@ -65,7 +68,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
triggerState.create(Trigger.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.triggerId("hourly")
|
||||
.date(ZonedDateTime.parse("2021-09-06T02:00:00+01:00[Europe/Paris]"))
|
||||
.build()
|
||||
@@ -75,12 +77,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(executionRepositorySpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// start the worker as it execute polling triggers
|
||||
Worker worker = new Worker(applicationContext, 8, null);
|
||||
worker.run();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
triggerState);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
|
||||
executionRepositorySpy,
|
||||
triggerState
|
||||
)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -97,8 +109,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
|
||||
scheduler.run();
|
||||
queueCount.await(15, TimeUnit.SECONDS);
|
||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
||||
assertionStop.run();
|
||||
|
||||
assertThat(queueCount.getCount(), is(0L));
|
||||
}
|
||||
|
||||
@@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
private SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
private SchedulerExecutionState schedulerExecutionState;
|
||||
|
||||
@Inject
|
||||
private FlowListeners flowListenersService;
|
||||
|
||||
@@ -188,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
return new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
schedulerExecutionState,
|
||||
triggerState
|
||||
);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
protected QueueInterface<LogEntry> logQueue;
|
||||
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.truncatedTo(ChronoUnit.HOURS);
|
||||
}
|
||||
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
executionStateSpy,
|
||||
triggerState
|
||||
);
|
||||
}
|
||||
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
void schedule() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
|
||||
CountDownLatch queueCount = new CountDownLatch(6);
|
||||
CountDownLatch invalidLogCount = new CountDownLatch(1);
|
||||
Set<String> date = new HashSet<>();
|
||||
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.build();
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
Await.until(() -> {
|
||||
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
triggerState.create(trigger);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
scheduler.run();
|
||||
|
||||
// Wait 3s to see if things happen
|
||||
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(2);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Runnable assertionStop = executionQueue.receive(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
|
||||
@@ -42,6 +42,9 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
private static Flow createFlow(Boolean failed) {
|
||||
RealtimeUnitTest schedule = RealtimeUnitTest.builder()
|
||||
.id("stream")
|
||||
@@ -75,6 +78,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
executionState,
|
||||
triggerState
|
||||
);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||
|
||||
@@ -16,14 +16,14 @@ import org.junit.jupiter.api.Test;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
public static Flow createThreadFlow() {
|
||||
return createThreadFlow(null);
|
||||
}
|
||||
@@ -72,17 +75,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
|
||||
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
|
||||
|
||||
doReturn(Collections.singletonList(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// mock the backfill execution is ended
|
||||
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||
.when(schedulerExecutionStateSpy)
|
||||
.findById(any(), any());
|
||||
|
||||
// scheduler
|
||||
try (
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy,
|
||||
schedulerExecutionStateSpy,
|
||||
triggerState
|
||||
);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||
|
||||
@@ -52,6 +52,9 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
public static Flow createFlow(Duration sleep) {
|
||||
SleepTriggerTest schedule = SleepTriggerTest.builder()
|
||||
.id("sleep")
|
||||
@@ -101,6 +104,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
applicationContext,
|
||||
flowListenersService,
|
||||
executionState,
|
||||
triggerState
|
||||
);
|
||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||
|
||||
@@ -21,7 +21,6 @@ public abstract class SchedulerTriggerStateInterfaceTest {
|
||||
return Trigger.builder()
|
||||
.flowId(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowRevision(1)
|
||||
.triggerId(IdUtils.create())
|
||||
.executionId(IdUtils.create())
|
||||
.date(ZonedDateTime.now());
|
||||
|
||||
@@ -123,10 +123,10 @@ class YamlFlowParserTest {
|
||||
void inputs() {
|
||||
Flow flow = this.parse("flows/valids/inputs.yaml");
|
||||
|
||||
assertThat(flow.getInputs().size(), is(27));
|
||||
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(9L));
|
||||
assertThat(flow.getInputs().size(), is(28));
|
||||
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(10L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(1L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(2L));
|
||||
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
@@ -16,6 +17,14 @@ class SkipExecutionServiceTest {
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
|
||||
@BeforeEach
|
||||
void resetAll() {
|
||||
skipExecutionService.setSkipExecutions(null);
|
||||
skipExecutionService.setSkipFlows(null);
|
||||
skipExecutionService.setSkipNamespaces(null);
|
||||
skipExecutionService.setSkipTenants(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void skipExecutionByExecutionId() {
|
||||
var executionToSkip = "aaabbbccc";
|
||||
@@ -65,4 +74,25 @@ class SkipExecutionServiceTest {
|
||||
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void skipExecutionByNamespace() {
|
||||
skipExecutionService.setSkipNamespaces(List.of("tenant|namespace"));
|
||||
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
|
||||
assertThat(skipExecutionService.skipExecution(null, "namespace", "someFlow", "someExecution"), is(false));
|
||||
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "anotherFlow", "anotherExecution"), is(true));
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "other.namespace", "someFlow", "someExecution"), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
void skipExecutionByTenantId() {
|
||||
skipExecutionService.setSkipTenants(List.of("tenant"));
|
||||
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
|
||||
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
|
||||
assertThat(skipExecutionService.skipExecution("tenant", "another.namespace", "someFlow", "someExecution"), is(true));
|
||||
assertThat(skipExecutionService.skipExecution("anotherTenant", "another.namespace", "someFlow", "someExecution"), is(false));
|
||||
}
|
||||
}
|
||||
@@ -6,22 +6,23 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.storages.InternalStorage;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -30,7 +31,6 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.io.FileMatchers.anExistingFile;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@@ -38,6 +38,9 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
@Inject
|
||||
Suite suite;
|
||||
|
||||
@Inject
|
||||
RunContextFactory runContextFactory;
|
||||
|
||||
@Test
|
||||
void success() throws TimeoutException {
|
||||
suite.success(runnerUtils);
|
||||
@@ -83,6 +86,11 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
suite.outputFiles(runnerUtils);
|
||||
}
|
||||
|
||||
@Test
|
||||
void encryption() throws Exception {
|
||||
suite.encryption(runnerUtils, runContextFactory);
|
||||
}
|
||||
|
||||
@Singleton
|
||||
public static class Suite {
|
||||
@Inject
|
||||
@@ -154,8 +162,15 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
storageContext
|
||||
, storageInterface
|
||||
);
|
||||
URI fileURI = URI.create("kestra:" + storageContext.getContextStorageURI() + "/input.txt");
|
||||
assertThat(new String(storage.getFile(fileURI).readAllBytes()), is("Hello World"));
|
||||
|
||||
TaskRun taskRun = execution.getTaskRunList().get(1);
|
||||
Map<String, Object> outputs = taskRun.getOutputs();
|
||||
assertThat(outputs, hasKey("uris"));
|
||||
|
||||
URI uri = URI.create(((Map<String, String>) outputs.get("uris")).get("input.txt"));
|
||||
|
||||
assertTrue(uri.toString().endsWith("input.txt"));
|
||||
assertThat(new String(storage.getFile(uri).readAllBytes()), is("Hello World"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -236,6 +251,18 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
||||
assertThat(execution.findTaskRunsByTaskId("t3").get(0).getOutputs().get("value"), is("third"));
|
||||
}
|
||||
|
||||
public void encryption(RunnerUtils runnerUtils, RunContextFactory runContextFactory) throws TimeoutException, GeneralSecurityException {
|
||||
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-taskrun-encrypted");
|
||||
|
||||
assertThat(execution.getTaskRunList(), hasSize(3));
|
||||
Map<String, Object> encryptedString = (Map<String, Object>) execution.findTaskRunsByTaskId("encrypted").get(0).getOutputs().get("value");
|
||||
assertThat(encryptedString.get("type"), is(EncryptedString.TYPE));
|
||||
String encryptedValue = (String) encryptedString.get("value");
|
||||
assertThat(encryptedValue, is(not("Hello World")));
|
||||
assertThat(runContextFactory.of().decrypt(encryptedValue), is("Hello World"));
|
||||
assertThat(execution.findTaskRunsByTaskId("decrypted").get(0).getOutputs().get("value"), is("Hello World"));
|
||||
}
|
||||
|
||||
private void put(String path, String content) throws IOException {
|
||||
storageInterface.put(
|
||||
null,
|
||||
|
||||
@@ -30,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@MicronautTest
|
||||
class DownloadTest {
|
||||
public static final String FILE = "http://speedtest.ftp.otenet.gr/files/test1Mb.db";
|
||||
public static final String FILE = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@@ -56,11 +56,11 @@ class DownloadTest {
|
||||
IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8),
|
||||
is(IOUtils.toString(new URI(FILE).toURL().openStream(), StandardCharsets.UTF_8))
|
||||
);
|
||||
assertThat(output.getUri().toString(), endsWith(".db"));
|
||||
assertThat(output.getUri().toString(), endsWith(".csv"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void noResponse() throws Exception {
|
||||
void noResponse() {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
@@ -100,7 +100,7 @@ class DownloadTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void error() throws Exception {
|
||||
void error() {
|
||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.http.*;
|
||||
import io.micronaut.http.annotation.Controller;
|
||||
import io.micronaut.http.annotation.Get;
|
||||
import io.micronaut.http.annotation.Head;
|
||||
import io.micronaut.http.annotation.Post;
|
||||
import io.micronaut.http.multipart.StreamingFileUpload;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
@@ -67,7 +68,7 @@ class RequestTest {
|
||||
|
||||
@Test
|
||||
void head() throws Exception {
|
||||
final String url = "http://speedtest.ftp.otenet.gr/files/test100Mb.db";
|
||||
final String url = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
|
||||
|
||||
Request task = Request.builder()
|
||||
.id(RequestTest.class.getSimpleName())
|
||||
@@ -81,7 +82,7 @@ class RequestTest {
|
||||
Request.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getUri(), is(URI.create(url)));
|
||||
assertThat(output.getHeaders().get("Content-Length").get(0), is("104857600"));
|
||||
assertThat(output.getHeaders().get("content-length").getFirst(), is("512789"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -260,6 +261,11 @@ class RequestTest {
|
||||
return HttpResponse.ok("{ \"hello\": \"world\" }");
|
||||
}
|
||||
|
||||
@Head("/hello")
|
||||
HttpResponse<String> head() {
|
||||
return HttpResponse.ok();
|
||||
}
|
||||
|
||||
@Get("/hello417")
|
||||
HttpResponse<String> hello417() {
|
||||
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("{ \"hello\": \"world\" }");
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.Worker;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.DefaultScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -31,6 +32,9 @@ class TriggerTest {
|
||||
@Inject
|
||||
private SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
private FlowListenersInterface flowListenersService;
|
||||
|
||||
@@ -51,6 +55,7 @@ class TriggerTest {
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
this.applicationContext,
|
||||
this.flowListenersService,
|
||||
this.executionState,
|
||||
this.triggerState
|
||||
);
|
||||
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
|
||||
@@ -89,6 +94,7 @@ class TriggerTest {
|
||||
AbstractScheduler scheduler = new DefaultScheduler(
|
||||
this.applicationContext,
|
||||
this.flowListenersService,
|
||||
this.executionState,
|
||||
this.triggerState
|
||||
);
|
||||
) {
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@MicronautTest
|
||||
@@ -109,4 +110,49 @@ class FlowTest {
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void success_withLabels() {
|
||||
var flow = io.kestra.core.models.flows.Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace("io.kestra.unittest")
|
||||
.revision(1)
|
||||
.labels(List.of(
|
||||
new Label("flow-label-1", "flow-label-1"),
|
||||
new Label("flow-label-2", "flow-label-2")
|
||||
))
|
||||
.tasks(Collections.singletonList(Return.builder()
|
||||
.id("test")
|
||||
.type(Return.class.getName())
|
||||
.format("test")
|
||||
.build()))
|
||||
.build();
|
||||
var execution = Execution.builder()
|
||||
.id(IdUtils.create())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("flow-with-flow-trigger")
|
||||
.flowRevision(1)
|
||||
.state(State.of(State.Type.RUNNING, Collections.emptyList()))
|
||||
.build();
|
||||
var flowTrigger = Flow.builder()
|
||||
.id("flow")
|
||||
.type(Flow.class.getName())
|
||||
.labels(List.of(
|
||||
new Label("trigger-label-1", "trigger-label-1"),
|
||||
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||
new Label("trigger-label-3", "{{ null }}"), // should return an empty string
|
||||
new Label("trigger-label-4", "{{ foobar }}") // should fail
|
||||
))
|
||||
.build();
|
||||
|
||||
Optional<Execution> evaluate = flowTrigger.evaluate(runContextFactory.of(), flow, execution);
|
||||
|
||||
assertThat(evaluate.isPresent(), is(true));
|
||||
assertThat(evaluate.get().getLabels(), hasSize(5));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,6 @@ class ScheduleTest {
|
||||
return TriggerContext.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getNamespace())
|
||||
.flowRevision(flow.getRevision())
|
||||
.triggerId(schedule.getId())
|
||||
.date(date)
|
||||
.build();
|
||||
@@ -131,6 +130,35 @@ class ScheduleTest {
|
||||
assertThat(inputs.get("input2"), is("default"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void success_withLabels() throws Exception {
|
||||
var scheduleTrigger = Schedule.builder()
|
||||
.id("schedule")
|
||||
.cron("0 0 1 * *")
|
||||
.labels(List.of(
|
||||
new Label("trigger-label-1", "trigger-label-1"),
|
||||
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||
new Label("trigger-label-3", "{{ null }}")
|
||||
))
|
||||
.build();
|
||||
var conditionContext = conditionContext(scheduleTrigger);
|
||||
var date = ZonedDateTime.now()
|
||||
.withDayOfMonth(1)
|
||||
.withHour(0)
|
||||
.withMinute(0)
|
||||
.withSecond(0)
|
||||
.truncatedTo(ChronoUnit.SECONDS)
|
||||
.minusMonths(1);
|
||||
var triggerContext = triggerContext(date, scheduleTrigger);
|
||||
|
||||
Optional<Execution> evaluate = scheduleTrigger.evaluate(conditionContext, triggerContext);
|
||||
|
||||
assertThat(evaluate.isPresent(), is(true));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void everyMinute() throws Exception {
|
||||
|
||||
@@ -95,6 +95,11 @@ inputs:
|
||||
- name: array
|
||||
type: ARRAY
|
||||
itemType: INT
|
||||
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
|
||||
- name: empty
|
||||
type: STRING
|
||||
defaults: ''
|
||||
required: true
|
||||
|
||||
tasks:
|
||||
- id: string
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
id: working-directory-taskrun-encrypted
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: workingDir
|
||||
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||
tasks:
|
||||
- id: encrypted
|
||||
type: io.kestra.core.tasks.test.Encrypted
|
||||
format: "Hello World"
|
||||
- id: decrypted
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{outputs.encrypted.value}}"
|
||||
@@ -1,4 +1,4 @@
|
||||
version=0.17.0
|
||||
version=0.17.26
|
||||
|
||||
jacksonVersion=2.16.2
|
||||
micronautVersion=4.4.3
|
||||
@@ -7,4 +7,4 @@ slf4jVersion=2.0.13
|
||||
|
||||
org.gradle.parallel=true
|
||||
org.gradle.caching=true
|
||||
org.gradle.priority=low
|
||||
org.gradle.priority=low
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
datasources:
|
||||
h2:
|
||||
url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
|
||||
url: jdbc:h2:mem:public;TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
|
||||
username: sa
|
||||
password: ""
|
||||
driverClassName: org.h2.Driver
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.repository.postgres;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.jdbc.JdbcMapper;
|
||||
@@ -21,6 +22,7 @@ import org.jooq.Result;
|
||||
import org.jooq.SelectConditionStep;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -52,12 +54,10 @@ public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Map<Field<Object>, Object> persistFields(T entity) {
|
||||
Map<Field<Object>, Object> fields = super.persistFields(entity);
|
||||
|
||||
String json = JdbcMapper.of().writeValueAsString(entity);
|
||||
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));
|
||||
|
||||
return fields;
|
||||
return new HashMap<>(ImmutableMap
|
||||
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)))
|
||||
);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
|
||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
|
||||
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
|
||||
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||
@Override
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||
return new JdbcScheduler(
|
||||
applicationContext,
|
||||
flowListenersServiceSpy
|
||||
|
||||
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
|
||||
private static final int FETCH_SIZE = 100;
|
||||
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||
private final ApplicationContext applicationContext;
|
||||
@@ -110,10 +112,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(field("trigger_execution_id").eq(triggerExecutionId));
|
||||
|
||||
select.fetch()
|
||||
.map(this.jdbcRepository::map)
|
||||
.forEach(emitter::next);
|
||||
emitter.complete();
|
||||
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
@@ -172,7 +177,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
state,
|
||||
labels,
|
||||
triggerExecutionId,
|
||||
childFilter
|
||||
childFilter,
|
||||
false
|
||||
);
|
||||
|
||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||
@@ -190,7 +196,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
@Nullable List<State.Type> state,
|
||||
@Nullable Map<String, String> labels,
|
||||
@Nullable String triggerExecutionId,
|
||||
@Nullable ChildFilter childFilter
|
||||
@Nullable ChildFilter childFilter,
|
||||
boolean deleted
|
||||
) {
|
||||
return Flux.create(
|
||||
emitter -> this.jdbcRepository
|
||||
@@ -209,14 +216,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
state,
|
||||
labels,
|
||||
triggerExecutionId,
|
||||
childFilter
|
||||
childFilter,
|
||||
deleted
|
||||
);
|
||||
|
||||
select.fetch()
|
||||
.map(this.jdbcRepository::map)
|
||||
.forEach(emitter::next);
|
||||
|
||||
emitter.complete();
|
||||
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||
} finally {
|
||||
emitter.complete();
|
||||
}
|
||||
}),
|
||||
FluxSink.OverflowStrategy.BUFFER
|
||||
);
|
||||
@@ -233,7 +243,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
@Nullable List<State.Type> state,
|
||||
@Nullable Map<String, String> labels,
|
||||
@Nullable String triggerExecutionId,
|
||||
@Nullable ChildFilter childFilter
|
||||
@Nullable ChildFilter childFilter,
|
||||
boolean deleted
|
||||
) {
|
||||
SelectConditionStep<Record1<Object>> select = context
|
||||
.select(
|
||||
@@ -241,7 +252,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
)
|
||||
.hint(context.configuration().dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.defaultFilter(tenantId));
|
||||
.where(this.defaultFilter(tenantId, deleted));
|
||||
|
||||
select = filteringQuery(select, namespace, flowId, null, query, labels, triggerExecutionId, childFilter);
|
||||
|
||||
|
||||
@@ -23,6 +23,11 @@ public abstract class AbstractJdbcRepository {
|
||||
return tenant.and(field("deleted", Boolean.class).eq(false));
|
||||
}
|
||||
|
||||
protected Condition defaultFilter(String tenantId, Boolean allowDeleted) {
|
||||
var tenant = buildTenantCondition(tenantId);
|
||||
return allowDeleted ? tenant : tenant.and(field("deleted", Boolean.class).eq(false));
|
||||
}
|
||||
|
||||
protected Condition buildTenantCondition(String tenantId) {
|
||||
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
|
||||
}
|
||||
|
||||
@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
||||
Trigger current = optionalTrigger.get();
|
||||
current = current.toBuilder()
|
||||
.executionId(trigger.getExecutionId())
|
||||
.executionCurrentState(trigger.getExecutionCurrentState())
|
||||
.updatedDate(trigger.getUpdatedDate())
|
||||
.build();
|
||||
this.save(context, current);
|
||||
|
||||
@@ -706,6 +706,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
.executionId(killedExecution.getExecutionId())
|
||||
.isOnKillCascade(false)
|
||||
.state(ExecutionKilled.State.EXECUTED)
|
||||
.tenantId(killedExecution.getTenantId())
|
||||
.build()
|
||||
);
|
||||
|
||||
|
||||
@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
|
||||
public class JdbcScheduler extends AbstractScheduler {
|
||||
private final QueueInterface<Execution> executionQueue;
|
||||
private final TriggerRepositoryInterface triggerRepository;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final JooqDSLContextWrapper dslContextWrapper;
|
||||
private final ConditionService conditionService;
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
|
||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||
conditionService = applicationContext.getBean(ConditionService.class);
|
||||
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
public void run() {
|
||||
super.run();
|
||||
|
||||
// reset scheduler trigger at end
|
||||
executionQueue.receive(
|
||||
Scheduler.class,
|
||||
either -> {
|
||||
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
.ifPresent(trigger -> {
|
||||
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
|
||||
});
|
||||
} else {
|
||||
// update execution state on each state change so the scheduler knows the execution is running
|
||||
triggerRepository
|
||||
.findByExecution(execution)
|
||||
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
|
||||
.ifPresent(trigger -> {
|
||||
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
||||
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
|
||||
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
|
||||
|
||||
schedulerContext.startTransaction(scheduleContextInterface -> {
|
||||
schedulerContext.doInTransaction(scheduleContextInterface -> {
|
||||
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
|
||||
|
||||
consumer.accept(triggers, scheduleContextInterface);
|
||||
|
||||
@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
|
||||
this.dslContextWrapper = dslContextWrapper;
|
||||
}
|
||||
|
||||
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
@Override
|
||||
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||
this.dslContextWrapper.transaction(configuration -> {
|
||||
this.context = DSL.using(configuration);
|
||||
|
||||
consumer.accept(this);
|
||||
|
||||
this.commit();
|
||||
this.context.commit();
|
||||
});
|
||||
}
|
||||
|
||||
public void commit() {
|
||||
this.context.commit();
|
||||
}
|
||||
}
|
||||
@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger create(Trigger trigger, String headerContent) {
|
||||
return this.triggerRepository.create(trigger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
|
||||
this.triggerRepository.save(trigger, scheduleContextInterface);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger create(Trigger trigger) {
|
||||
|
||||
@@ -84,7 +96,4 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock(Trigger trigger) {}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,11 @@ public class MemoryExecutionRepository implements ExecutionRepositoryInterface {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Execution> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> state, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter, boolean allowDeleted) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayListTotal<TaskRun> findTaskRun(Pageable pageable, @Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> states, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter) {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -56,6 +56,22 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
|
||||
triggers.put(trigger.uid(), trigger);
|
||||
triggerQueue.emit(trigger);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger create(Trigger trigger, String headerContent) {
|
||||
triggers.put(trigger.uid(), trigger);
|
||||
triggerQueue.emit(trigger);
|
||||
|
||||
return trigger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Trigger update(Trigger trigger) {
|
||||
triggers.put(trigger.uid(), trigger);
|
||||
@@ -79,7 +95,4 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
|
||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock(Trigger trigger) {}
|
||||
}
|
||||
|
||||
@@ -131,10 +131,10 @@ public class CommandsWrapper implements TaskCommands {
|
||||
@SuppressWarnings("unchecked")
|
||||
public ScriptOutput run() throws Exception {
|
||||
List<String> filesToUpload = new ArrayList<>();
|
||||
if (this.namespaceFiles != null) {
|
||||
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
|
||||
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
|
||||
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
|
||||
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
|
||||
|
||||
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
||||
List<URI> injectedFiles = namespaceFilesService.inject(
|
||||
runContext,
|
||||
|
||||
@@ -7,6 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
class DockerTest extends AbstractTaskRunnerTest {
|
||||
@Override
|
||||
protected TaskRunner taskRunner() {
|
||||
return Docker.builder().image("centos").build();
|
||||
return Docker.builder().image("rockylinux:9.3-minimal").build();
|
||||
}
|
||||
}
|
||||
@@ -78,7 +78,7 @@ public class LocalStorage implements StorageInterface {
|
||||
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
|
||||
uris.add(URI.create(file.toString()));
|
||||
uris.add(URI.create(file.toString().replace("\\", "/")));
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ public class LocalStorage implements StorageInterface {
|
||||
}
|
||||
});
|
||||
|
||||
URI fsPathUri = URI.create(fsPath.toString());
|
||||
URI fsPathUri = URI.create(fsPath.toString().replace("\\", "/"));
|
||||
return uris.stream().sorted(Comparator.reverseOrder())
|
||||
.map(fsPathUri::relativize)
|
||||
.map(URI::getPath)
|
||||
@@ -115,7 +115,7 @@ public class LocalStorage implements StorageInterface {
|
||||
URI relative = URI.create(
|
||||
getPath(tenantId, null).relativize(
|
||||
Path.of(file.toUri())
|
||||
).toString()
|
||||
).toString().replace("\\", "/")
|
||||
);
|
||||
return getAttributes(tenantId, relative);
|
||||
}))
|
||||
|
||||
1884
ui/package-lock.json
generated
1884
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -12,20 +12,20 @@
|
||||
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
|
||||
},
|
||||
"dependencies": {
|
||||
"@kestra-io/ui-libs": "^0.0.47",
|
||||
"@kestra-io/ui-libs": "^0.0.48",
|
||||
"@vue-flow/background": "^1.3.0",
|
||||
"@vue-flow/controls": "^1.1.1",
|
||||
"@vue-flow/core": "^1.33.6",
|
||||
"ansi-to-html": "^0.7.2",
|
||||
"axios": "^1.6.8",
|
||||
"axios": "^1.7.2",
|
||||
"bootstrap": "^5.3.3",
|
||||
"buffer": "^6.0.3",
|
||||
"chart.js": "^4.4.2",
|
||||
"chart.js": "^4.4.3",
|
||||
"chartjs-chart-treemap": "^2.3.1",
|
||||
"core-js": "^3.37.0",
|
||||
"core-js": "^3.37.1",
|
||||
"dagre": "^0.8.5",
|
||||
"element-plus": "^2.7.2",
|
||||
"humanize-duration": "^3.32.0",
|
||||
"element-plus": "^2.7.5",
|
||||
"humanize-duration": "^3.32.1",
|
||||
"js-yaml": "^4.1.0",
|
||||
"lodash": "^4.17.21",
|
||||
"markdown-it": "^14.1.0",
|
||||
@@ -40,45 +40,49 @@
|
||||
"moment-timezone": "^0.5.45",
|
||||
"node-modules-polyfill": "^0.1.4",
|
||||
"nprogress": "^0.2.0",
|
||||
"posthog-js": "^1.130.2",
|
||||
"posthog-js": "^1.138.2",
|
||||
"cronstrue": "^2.50.0",
|
||||
"throttle-debounce": "^5.0.0",
|
||||
"vite-plugin-eslint": "^1.8.1",
|
||||
"vue": "^3.4.26",
|
||||
"vue": "^3.4.27",
|
||||
"vue-axios": "3.5.2",
|
||||
"vue-chartjs": "^5.3.1",
|
||||
"vue-gtag": "^2.0.1",
|
||||
"vue-i18n": "^9.13.1",
|
||||
"vue-material-design-icons": "^5.3.0",
|
||||
"vue-router": "^4.3.2",
|
||||
"vue-sidebar-menu": "^5.3.1",
|
||||
"vue-sidebar-menu": "^5.4.0",
|
||||
"vue-virtual-scroller": "^2.0.0-beta.8",
|
||||
"vue3-popper": "^1.5.0",
|
||||
"vue3-tour": "github:kestra-io/vue3-tour",
|
||||
"vuex": "^4.1.0",
|
||||
"xss": "^1.0.15",
|
||||
"yaml": "^2.4.2"
|
||||
"yaml": "^2.4.5"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@rushstack/eslint-patch": "^1.10.2",
|
||||
"@shikijs/markdown-it": "^1.4.0",
|
||||
"@typescript-eslint/parser": "^7.8.0",
|
||||
"@vitejs/plugin-vue": "^5.0.4",
|
||||
"@rushstack/eslint-patch": "^1.10.3",
|
||||
"@shikijs/markdown-it": "^1.6.3",
|
||||
"@typescript-eslint/parser": "^7.12.0",
|
||||
"@vitejs/plugin-vue": "^5.0.5",
|
||||
"@vue/eslint-config-prettier": "^9.0.0",
|
||||
"@vue/test-utils": "^2.4.5",
|
||||
"@vue/test-utils": "^2.4.6",
|
||||
"decompress": "^4.2.1",
|
||||
"eslint": "^8.57.0",
|
||||
"eslint-plugin-vue": "^9.25.0",
|
||||
"jsdom": "^24.0.0",
|
||||
"monaco-editor": "^0.48.0",
|
||||
"monaco-yaml": "^5.1.1",
|
||||
"prettier": "^3.2.5",
|
||||
"eslint-plugin-vue": "^9.26.0",
|
||||
"jsdom": "^24.1.0",
|
||||
"monaco-editor": "^0.49.0",
|
||||
"monaco-yaml": "^5.2.0",
|
||||
"prettier": "^3.3.1",
|
||||
"rollup-plugin-copy": "^3.5.0",
|
||||
"rollup-plugin-visualizer": "^5.12.0",
|
||||
"sass": "^1.76.0",
|
||||
"sass": "^1.77.4",
|
||||
"typescript": "^5.4.5",
|
||||
"vite": "^5.2.11",
|
||||
"vite-plugin-rewrite-all": "1.0.1",
|
||||
"vitest": "^1.5.3"
|
||||
"vite": "^5.2.13",
|
||||
"vitest": "^1.6.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@rollup/rollup-linux-x64-gnu": "4.18.0"
|
||||
},
|
||||
"overrides": {
|
||||
"bootstrap": {
|
||||
|
||||
@@ -209,7 +209,7 @@
|
||||
},
|
||||
watch: {
|
||||
$route(to) {
|
||||
if (this.user && to.name === "home" && this.overallTotal === 0) {
|
||||
if (to.name === "home" && this.overallTotal === 0) {
|
||||
this.$router.push({
|
||||
name: "welcome",
|
||||
params: {
|
||||
|
||||
BIN
ui/src/assets/errors/kestra-error.png
Normal file
BIN
ui/src/assets/errors/kestra-error.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 73 KiB |
File diff suppressed because one or more lines are too long
|
Before Width: | Height: | Size: 4.9 KiB |
@@ -1,6 +1,10 @@
|
||||
<template>
|
||||
<el-tooltip :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
|
||||
<el-tooltip :visible="visible" :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
|
||||
<template #content v-if="link">
|
||||
<el-button circle class="ee-tooltip-close" @click="changeVisibility(false)">
|
||||
<Close />
|
||||
</el-button>
|
||||
|
||||
<p>{{ $t("ee-tooltip.features-blocked") }}</p>
|
||||
|
||||
<a
|
||||
@@ -13,7 +17,7 @@
|
||||
</a>
|
||||
</template>
|
||||
<template #default>
|
||||
<span ref="slot-container">
|
||||
<span ref="slot-container" class="cursor-pointer" @click="changeVisibility()">
|
||||
<slot />
|
||||
<lock v-if="disabled" />
|
||||
</span>
|
||||
@@ -22,10 +26,11 @@
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import Close from "vue-material-design-icons/Close.vue";
|
||||
import Lock from "vue-material-design-icons/Lock.vue";
|
||||
|
||||
export default {
|
||||
components: {Lock},
|
||||
components: {Close, Lock},
|
||||
props: {
|
||||
top: {
|
||||
type: Boolean,
|
||||
@@ -48,6 +53,16 @@
|
||||
default: undefined
|
||||
},
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
visible: false,
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
changeVisibility(visible = true) {
|
||||
this.visible = visible
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
link() {
|
||||
|
||||
@@ -83,5 +98,13 @@
|
||||
:deep(.material-design-icon) > .material-design-icon__svg {
|
||||
bottom: -0.125em;
|
||||
}
|
||||
|
||||
.ee-tooltip-close {
|
||||
position: absolute;
|
||||
top: 0;
|
||||
right: 0;
|
||||
border: none;
|
||||
margin: 0.5rem;
|
||||
}
|
||||
</style>
|
||||
|
||||
|
||||
@@ -77,12 +77,6 @@
|
||||
</router-link>
|
||||
</template>
|
||||
</el-table-column>
|
||||
|
||||
<el-table-column :label="$t('state')">
|
||||
<template #default="scope">
|
||||
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
|
||||
</template>
|
||||
</el-table-column>
|
||||
<el-table-column :label="$t('date')">
|
||||
<template #default="scope">
|
||||
<date-ago :inverted="true" :date="scope.row.date" />
|
||||
@@ -171,7 +165,6 @@
|
||||
import RefreshButton from "../layout/RefreshButton.vue";
|
||||
import DateAgo from "../layout/DateAgo.vue";
|
||||
import Id from "../Id.vue";
|
||||
import Status from "../Status.vue";
|
||||
import {mapState} from "vuex";
|
||||
|
||||
export default {
|
||||
@@ -183,7 +176,6 @@
|
||||
SearchField,
|
||||
NamespaceSelect,
|
||||
DateAgo,
|
||||
Status,
|
||||
Id,
|
||||
},
|
||||
data() {
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
<el-form-item
|
||||
:label="$t('password')"
|
||||
required
|
||||
prop="password"
|
||||
>
|
||||
<el-input v-model="form.password" type="password" show-password />
|
||||
</el-form-item>
|
||||
@@ -62,6 +63,28 @@
|
||||
trigger: ["blur"],
|
||||
pattern: "^$|^[a-zA-Z0-9_!#$%&’*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$"
|
||||
},
|
||||
{
|
||||
validator: (rule, value, callback) => {
|
||||
if (value && value.length > 256) {
|
||||
callback(new Error(this.$t("email length constraint")));
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
},
|
||||
trigger: ["blur", "change"]
|
||||
}
|
||||
],
|
||||
password: [
|
||||
{
|
||||
validator: (rule, value, callback) => {
|
||||
if (value && value.length > 256) {
|
||||
callback(new Error(this.$t("password length constraint")));
|
||||
} else {
|
||||
callback();
|
||||
}
|
||||
},
|
||||
trigger: ["blur", "change"]
|
||||
}
|
||||
],
|
||||
confirmPassword: [
|
||||
{
|
||||
|
||||
@@ -7,6 +7,10 @@
|
||||
<p>
|
||||
<span v-html="$t('errors.' + code + '.content')" />
|
||||
</p>
|
||||
|
||||
<el-button tag="router-link" :to="{name: 'home'}" type="primary">
|
||||
{{ $t("back_to_dashboard") }}
|
||||
</el-button>
|
||||
</section>
|
||||
</template>
|
||||
|
||||
@@ -42,19 +46,23 @@
|
||||
|
||||
<style lang="scss" scoped>
|
||||
.errors {
|
||||
h2 {
|
||||
margin-bottom: calc(var(--spacer) * 2);
|
||||
}
|
||||
|
||||
width: 100%;
|
||||
margin-top: 10em;
|
||||
text-align: center;
|
||||
|
||||
.img {
|
||||
display: inline-block;
|
||||
background: url("../../assets/errors/sorry.svg") no-repeat;
|
||||
background: url("../../assets/errors/kestra-error.png") no-repeat center;
|
||||
background-size: contain;
|
||||
height: 300px;
|
||||
width: 300px;
|
||||
}
|
||||
|
||||
h2 {
|
||||
line-height: 30px;
|
||||
font-size: 20px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
p {
|
||||
line-height: 22px;
|
||||
font-size: 14px;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -69,6 +69,14 @@
|
||||
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
|
||||
this.follow()
|
||||
}
|
||||
// if we change the execution id, we need to close the sse
|
||||
if (this.$route.params.id != this.execution.id) {
|
||||
this.closeSSE();
|
||||
window.removeEventListener("popstate", this.follow)
|
||||
this.$store.commit("execution/setExecution", undefined);
|
||||
this.$store.commit("flow/setFlow", undefined);
|
||||
this.$store.commit("flow/setFlowGraph", undefined);
|
||||
}
|
||||
},
|
||||
},
|
||||
methods: {
|
||||
@@ -91,13 +99,16 @@
|
||||
}
|
||||
// sse.onerror doesnt return the details of the error
|
||||
// but as our emitter can only throw an error on 404
|
||||
// we can safely assume that the error
|
||||
// we can safely assume that the error is a 404
|
||||
// if execution is not defined
|
||||
this.sse.onerror = () => {
|
||||
this.$store.dispatch("core/showMessage", {
|
||||
variant: "error",
|
||||
title: this.$t("error"),
|
||||
message: this.$t("errors.404.flow or execution"),
|
||||
});
|
||||
if (!this.execution) {
|
||||
this.$store.dispatch("core/showMessage", {
|
||||
variant: "error",
|
||||
title: this.$t("error"),
|
||||
message: this.$t("errors.404.flow or execution"),
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
@@ -425,6 +425,8 @@
|
||||
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert} from "element-plus";
|
||||
import {h, ref} from "vue";
|
||||
|
||||
import {filterLabels} from "./utils"
|
||||
|
||||
export default {
|
||||
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
|
||||
components: {
|
||||
@@ -809,6 +811,13 @@
|
||||
);
|
||||
},
|
||||
setLabels() {
|
||||
const filtered = filterLabels(this.executionLabels)
|
||||
|
||||
if(filtered.error) {
|
||||
this.$toast().error(this.$t("wrong labels"))
|
||||
return;
|
||||
}
|
||||
|
||||
this.$toast().confirm(
|
||||
this.$t("bulk set labels", {"executionCount": this.queryBulkAction ? this.total : this.selection.length}),
|
||||
() => {
|
||||
@@ -819,7 +828,7 @@
|
||||
sort: this.$route.query.sort || "state.startDate:desc",
|
||||
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
|
||||
}, false),
|
||||
data: this.executionLabels
|
||||
data: filtered.labels
|
||||
})
|
||||
.then(r => {
|
||||
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
||||
@@ -829,7 +838,7 @@
|
||||
return this.$store
|
||||
.dispatch("execution/bulkSetLabels", {
|
||||
executionsId: this.selection,
|
||||
executionLabels: this.executionLabels
|
||||
executionLabels: filtered.labels
|
||||
})
|
||||
.then(r => {
|
||||
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
||||
|
||||
@@ -53,6 +53,8 @@
|
||||
import LabelInput from "../../components/labels/LabelInput.vue";
|
||||
import State from "../../utils/state";
|
||||
|
||||
import {filterLabels} from "./utils"
|
||||
|
||||
export default {
|
||||
components: {LabelInput,},
|
||||
props: {
|
||||
@@ -71,9 +73,16 @@
|
||||
},
|
||||
methods: {
|
||||
setLabels() {
|
||||
const filtered = filterLabels(this.executionLabels)
|
||||
|
||||
if(filtered.error) {
|
||||
this.$toast().error(this.$t("wrong labels"))
|
||||
return;
|
||||
}
|
||||
|
||||
this.isOpen = false;
|
||||
this.$store.dispatch("execution/setLabels", {
|
||||
labels: this.executionLabels,
|
||||
labels: filtered.labels,
|
||||
executionId: this.execution.id
|
||||
}).then(response => {
|
||||
this.$store.commit("execution/setExecution", response.data)
|
||||
|
||||
@@ -263,6 +263,8 @@
|
||||
return this.attempts(taskRun)[this.selectedAttemptNumberByTaskRunId[taskRun.id] ?? 0];
|
||||
},
|
||||
taskType(taskRun) {
|
||||
if(!taskRun) return undefined;
|
||||
|
||||
const task = FlowUtils.findTaskById(this.flow, taskRun.taskId);
|
||||
const parentTaskRunId = taskRun.parentTaskRunId;
|
||||
if (task === undefined && parentTaskRunId) {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
:start-date="startDate"
|
||||
:end-date="endDate"
|
||||
@update:model-value="onAbsFilterChange"
|
||||
class="w-auto"
|
||||
/>
|
||||
<relative-date-select
|
||||
v-if="selectedFilterType === filterType.RELATIVE"
|
||||
|
||||
14
ui/src/components/executions/utils.ts
Normal file
14
ui/src/components/executions/utils.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
interface Label {
|
||||
key: string | null;
|
||||
value: string | null;
|
||||
}
|
||||
|
||||
interface FilterResult {
|
||||
labels: Label[];
|
||||
error?: boolean;
|
||||
}
|
||||
|
||||
export const filterLabels = (labels: Label[]): FilterResult => {
|
||||
const invalid = labels.some(label => label.key === null || label.value === null);
|
||||
return invalid ? {labels, error: true} : {labels};
|
||||
};
|
||||
@@ -41,7 +41,7 @@
|
||||
if (this.$route.query.reset) {
|
||||
localStorage.setItem("tourDoneOrSkip", undefined);
|
||||
this.$store.commit("core/setGuidedProperties", {tourStarted: false});
|
||||
this.$tours["guidedTour"].start();
|
||||
this.$tours["guidedTour"]?.start();
|
||||
}
|
||||
this.setupFlow()
|
||||
},
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user