mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
134 Commits
dependabot
...
v0.17.26
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27f0bfe1ac | ||
|
|
0143faed71 | ||
|
|
20891bf911 | ||
|
|
093ca322bb | ||
|
|
b3b788ca38 | ||
|
|
a3c85e3c19 | ||
|
|
debca279c0 | ||
|
|
254486df63 | ||
|
|
5e6a22c78c | ||
|
|
5613f74b93 | ||
|
|
6a915cb08b | ||
|
|
3056e9c402 | ||
|
|
dc7fef20b6 | ||
|
|
678205cfc0 | ||
|
|
87acffa052 | ||
|
|
c7f8132bfe | ||
|
|
af2a73f397 | ||
|
|
b0cff46b81 | ||
|
|
5ec053a824 | ||
|
|
c2cf0a90e5 | ||
|
|
0303e62b3d | ||
|
|
96b67b73bd | ||
|
|
6cced7a9e7 | ||
|
|
9259a805d8 | ||
|
|
595459fc12 | ||
|
|
42a637de7c | ||
|
|
ba7d4d9501 | ||
|
|
76b5022c08 | ||
|
|
1620c21c36 | ||
|
|
202c321a8a | ||
|
|
ea0b4e7469 | ||
|
|
c0975773a3 | ||
|
|
23e7af0d77 | ||
|
|
dcfc9acf74 | ||
|
|
b790d5e1d1 | ||
|
|
5213945a41 | ||
|
|
150eed2eff | ||
|
|
887a3a5dc4 | ||
|
|
d7109aa375 | ||
|
|
f2a33ebbca | ||
|
|
b130ac8de9 | ||
|
|
8f43ec13f3 | ||
|
|
5ae985ab16 | ||
|
|
b2d04925cb | ||
|
|
37ff254c9c | ||
|
|
7e968d16b6 | ||
|
|
2e4bf3338a | ||
|
|
0f3c455afc | ||
|
|
7439ea4a66 | ||
|
|
6546ce49f6 | ||
|
|
57bc235db6 | ||
|
|
2f416daac0 | ||
|
|
97779f3bc4 | ||
|
|
54c4f1d702 | ||
|
|
f2cc5c0da6 | ||
|
|
c2fc728414 | ||
|
|
90492354e2 | ||
|
|
856a5f5a73 | ||
|
|
f09fa74129 | ||
|
|
936e4019b6 | ||
|
|
4c44090462 | ||
|
|
9feac234f6 | ||
|
|
935212344c | ||
|
|
5d4c4dc214 | ||
|
|
a152354bcd | ||
|
|
08455bbdf0 | ||
|
|
64e9d8d43f | ||
|
|
dd1b435720 | ||
|
|
9fca8c9148 | ||
|
|
09d6e0f092 | ||
|
|
c3abfe08ad | ||
|
|
f2d5df082d | ||
|
|
16c15116fe | ||
|
|
f9f8d93ad7 | ||
|
|
c3b1ceb289 | ||
|
|
f9f33b96c8 | ||
|
|
658f847c48 | ||
|
|
dcec04e20e | ||
|
|
b38ec8a21b | ||
|
|
35d801cbc2 | ||
|
|
511a10a65c | ||
|
|
c5daebe4aa | ||
|
|
bb800948e5 | ||
|
|
65f921d456 | ||
|
|
ded21b0902 | ||
|
|
7b109baac2 | ||
|
|
c11fe3466f | ||
|
|
94c5b7a6e4 | ||
|
|
99ab5be8b9 | ||
|
|
aaa3a0ace0 | ||
|
|
acc5a24d9a | ||
|
|
892bb114ca | ||
|
|
30e4fe4e0b | ||
|
|
fcada08edd | ||
|
|
a2df125a62 | ||
|
|
9d9c5dc1d1 | ||
|
|
dbb1a8eaa5 | ||
|
|
1db6b57091 | ||
|
|
934ea201a5 | ||
|
|
5dcd5b5af8 | ||
|
|
000124f3dd | ||
|
|
c37f104446 | ||
|
|
3cfa48987f | ||
|
|
79c22ee22c | ||
|
|
d3a2fa13a5 | ||
|
|
20078f1e19 | ||
|
|
72b86d9edf | ||
|
|
59634133bc | ||
|
|
1e34a5528b | ||
|
|
4aa3bd3ef2 | ||
|
|
f6581de304 | ||
|
|
fd225d87b4 | ||
|
|
9bb3f576ee | ||
|
|
30cdb373cc | ||
|
|
59c7d6a567 | ||
|
|
9e4e5f891e | ||
|
|
ea3ba991d1 | ||
|
|
1024c77289 | ||
|
|
36b29d6065 | ||
|
|
1c8177e185 | ||
|
|
3dd5d6bb71 | ||
|
|
16a641693a | ||
|
|
efdb075155 | ||
|
|
a99d52a406 | ||
|
|
852edea36e | ||
|
|
defa426259 | ||
|
|
3aadcfd683 | ||
|
|
0f5d59103a | ||
|
|
50b9120434 | ||
|
|
896c761502 | ||
|
|
381d1b381f | ||
|
|
72a428a439 | ||
|
|
7447e61dbc | ||
|
|
45ffc3cc22 |
29
.github/workflows/docker.yml
vendored
29
.github/workflows/docker.yml
vendored
@@ -7,14 +7,7 @@ on:
|
|||||||
description: 'Retag latest Docker images'
|
description: 'Retag latest Docker images'
|
||||||
required: true
|
required: true
|
||||||
type: string
|
type: string
|
||||||
options:
|
default: "true"
|
||||||
- "true"
|
|
||||||
- "false"
|
|
||||||
skip-test:
|
|
||||||
description: 'Skip test'
|
|
||||||
required: false
|
|
||||||
type: string
|
|
||||||
default: "false"
|
|
||||||
options:
|
options:
|
||||||
- "true"
|
- "true"
|
||||||
- "false"
|
- "false"
|
||||||
@@ -125,6 +118,16 @@ jobs:
|
|||||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
|
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
|
||||||
python-libs: kestra
|
python-libs: kestra
|
||||||
steps:
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
# Vars
|
||||||
|
- name: Set image name
|
||||||
|
id: vars
|
||||||
|
run: |
|
||||||
|
TAG=${GITHUB_REF#refs/*/}
|
||||||
|
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||||
|
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
# Download release
|
# Download release
|
||||||
- name: Download release
|
- name: Download release
|
||||||
uses: robinraju/release-downloader@v1.10
|
uses: robinraju/release-downloader@v1.10
|
||||||
@@ -137,14 +140,6 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||||
|
|
||||||
# Vars
|
|
||||||
- name: Set image name
|
|
||||||
id: vars
|
|
||||||
run: |
|
|
||||||
TAG=${GITHUB_REF#refs/*/}
|
|
||||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
|
||||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
# Docker setup
|
# Docker setup
|
||||||
- name: Set up QEMU
|
- name: Set up QEMU
|
||||||
uses: docker/setup-qemu-action@v3
|
uses: docker/setup-qemu-action@v3
|
||||||
@@ -179,7 +174,7 @@ jobs:
|
|||||||
- name: Retag to latest
|
- name: Retag to latest
|
||||||
if: github.event.inputs.retag-latest == 'true'
|
if: github.event.inputs.retag-latest == 'true'
|
||||||
run: |
|
run: |
|
||||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{1}', matrix.image.name) }}
|
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM eclipse-temurin:21-jre
|
FROM eclipse-temurin:21-jre-jammy
|
||||||
|
|
||||||
ARG KESTRA_PLUGINS=""
|
ARG KESTRA_PLUGINS=""
|
||||||
ARG APT_PACKAGES=""
|
ARG APT_PACKAGES=""
|
||||||
|
|||||||
@@ -204,6 +204,8 @@ subprojects {
|
|||||||
testImplementation 'org.hamcrest:hamcrest:2.2'
|
testImplementation 'org.hamcrest:hamcrest:2.2'
|
||||||
testImplementation 'org.hamcrest:hamcrest-library:2.2'
|
testImplementation 'org.hamcrest:hamcrest-library:2.2'
|
||||||
testImplementation group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
|
testImplementation group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
|
||||||
|
|
||||||
|
testImplementation 'org.assertj:assertj-core:3.27.3'
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
@@ -454,7 +456,7 @@ subprojects {
|
|||||||
}
|
}
|
||||||
|
|
||||||
maven.pom {
|
maven.pom {
|
||||||
description 'The modern, scalable orchestrator & scheduler open source platform'
|
description = 'The modern, scalable orchestrator & scheduler open source platform'
|
||||||
|
|
||||||
developers {
|
developers {
|
||||||
developer {
|
developer {
|
||||||
|
|||||||
@@ -37,6 +37,12 @@ public class ExecutorCommand extends AbstractServerCommand {
|
|||||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
private List<String> skipFlows = Collections.emptyList();
|
private List<String> skipFlows = Collections.emptyList();
|
||||||
|
|
||||||
|
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
|
private List<String> skipNamespaces = Collections.emptyList();
|
||||||
|
|
||||||
|
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
|
private List<String> skipTenants = Collections.emptyList();
|
||||||
|
|
||||||
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "a list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "a list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
|
||||||
private List<String> startExecutors = Collections.emptyList();
|
private List<String> startExecutors = Collections.emptyList();
|
||||||
|
|
||||||
@@ -54,6 +60,8 @@ public class ExecutorCommand extends AbstractServerCommand {
|
|||||||
public Integer call() throws Exception {
|
public Integer call() throws Exception {
|
||||||
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
||||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||||
|
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||||
|
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||||
|
|
||||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||||
|
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ public class LocalCommand extends StandAloneCommand {
|
|||||||
"kestra.queue.type", "h2",
|
"kestra.queue.type", "h2",
|
||||||
"kestra.storage.type", "local",
|
"kestra.storage.type", "local",
|
||||||
"kestra.storage.local.base-path", data.toString(),
|
"kestra.storage.local.base-path", data.toString(),
|
||||||
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
|
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
|
||||||
"datasources.h2.username", "sa",
|
"datasources.h2.username", "sa",
|
||||||
"datasources.h2.password", "",
|
"datasources.h2.password", "",
|
||||||
"datasources.h2.driverClassName", "org.h2.Driver",
|
"datasources.h2.driverClassName", "org.h2.Driver",
|
||||||
|
|||||||
@@ -49,6 +49,12 @@ public class StandAloneCommand extends AbstractServerCommand {
|
|||||||
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
private List<String> skipFlows = Collections.emptyList();
|
private List<String> skipFlows = Collections.emptyList();
|
||||||
|
|
||||||
|
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
|
private List<String> skipNamespaces = Collections.emptyList();
|
||||||
|
|
||||||
|
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||||
|
private List<String> skipTenants = Collections.emptyList();
|
||||||
|
|
||||||
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||||
boolean tutorialsDisabled = false;
|
boolean tutorialsDisabled = false;
|
||||||
|
|
||||||
@@ -74,6 +80,8 @@ public class StandAloneCommand extends AbstractServerCommand {
|
|||||||
public Integer call() throws Exception {
|
public Integer call() throws Exception {
|
||||||
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
this.skipExecutionService.setSkipExecutions(skipExecutions);
|
||||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||||
|
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||||
|
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||||
|
|
||||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||||
|
|
||||||
|
|||||||
@@ -23,10 +23,12 @@ import org.apache.commons.lang3.ArrayUtils;
|
|||||||
@Singleton
|
@Singleton
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class MetricRegistry {
|
public class MetricRegistry {
|
||||||
|
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
|
||||||
|
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
|
||||||
|
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
|
||||||
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
|
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
|
||||||
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
|
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
|
||||||
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
|
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
|
||||||
public final static String METRIC_WORKER_RETRYED_COUNT = "worker.retryed.count";
|
|
||||||
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
|
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
|
||||||
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
|
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
|
||||||
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
|
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
|
||||||
@@ -143,7 +145,7 @@ public class MetricRegistry {
|
|||||||
*
|
*
|
||||||
* @param workerTask the current WorkerTask
|
* @param workerTask the current WorkerTask
|
||||||
* @param workerGroup the worker group, optional
|
* @param workerGroup the worker group, optional
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
|
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
|
||||||
var baseTags = ArrayUtils.addAll(
|
var baseTags = ArrayUtils.addAll(
|
||||||
@@ -164,7 +166,7 @@ public class MetricRegistry {
|
|||||||
*
|
*
|
||||||
* @param workerTrigger the current WorkerTask
|
* @param workerTrigger the current WorkerTask
|
||||||
* @param workerGroup the worker group, optional
|
* @param workerGroup the worker group, optional
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
|
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
|
||||||
var baseTags = ArrayUtils.addAll(
|
var baseTags = ArrayUtils.addAll(
|
||||||
@@ -184,7 +186,7 @@ public class MetricRegistry {
|
|||||||
* Return tags for current {@link WorkerTaskResult}
|
* Return tags for current {@link WorkerTaskResult}
|
||||||
*
|
*
|
||||||
* @param workerTaskResult the current WorkerTaskResult
|
* @param workerTaskResult the current WorkerTaskResult
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
|
public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
|
||||||
var baseTags = ArrayUtils.addAll(
|
var baseTags = ArrayUtils.addAll(
|
||||||
@@ -200,7 +202,7 @@ public class MetricRegistry {
|
|||||||
* Return tags for current {@link WorkerTaskResult}
|
* Return tags for current {@link WorkerTaskResult}
|
||||||
*
|
*
|
||||||
* @param subflowExecutionResult the current WorkerTaskResult
|
* @param subflowExecutionResult the current WorkerTaskResult
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(SubflowExecutionResult subflowExecutionResult, String... tags) {
|
public String[] tags(SubflowExecutionResult subflowExecutionResult, String... tags) {
|
||||||
var baseTags = ArrayUtils.addAll(
|
var baseTags = ArrayUtils.addAll(
|
||||||
@@ -216,7 +218,7 @@ public class MetricRegistry {
|
|||||||
* Return tags for current {@link Task}
|
* Return tags for current {@link Task}
|
||||||
*
|
*
|
||||||
* @param task the current Task
|
* @param task the current Task
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(Task task) {
|
public String[] tags(Task task) {
|
||||||
return new String[]{
|
return new String[]{
|
||||||
@@ -240,7 +242,7 @@ public class MetricRegistry {
|
|||||||
* Return tags for current {@link Execution}
|
* Return tags for current {@link Execution}
|
||||||
*
|
*
|
||||||
* @param execution the current Execution
|
* @param execution the current Execution
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(Execution execution) {
|
public String[] tags(Execution execution) {
|
||||||
var baseTags = new String[]{
|
var baseTags = new String[]{
|
||||||
@@ -255,33 +257,21 @@ public class MetricRegistry {
|
|||||||
* Return tags for current {@link TriggerContext}
|
* Return tags for current {@link TriggerContext}
|
||||||
*
|
*
|
||||||
* @param triggerContext the current TriggerContext
|
* @param triggerContext the current TriggerContext
|
||||||
* @param workerGroup the worker group, optional
|
* @return tags to apply to metrics
|
||||||
* @return tags to applied to metrics
|
|
||||||
*/
|
*/
|
||||||
public String[] tags(TriggerContext triggerContext, String workerGroup) {
|
public String[] tags(TriggerContext triggerContext) {
|
||||||
var baseTags = new String[]{
|
var baseTags = new String[]{
|
||||||
TAG_FLOW_ID, triggerContext.getFlowId(),
|
TAG_FLOW_ID, triggerContext.getFlowId(),
|
||||||
TAG_NAMESPACE_ID, triggerContext.getNamespace()
|
TAG_NAMESPACE_ID, triggerContext.getNamespace()
|
||||||
};
|
};
|
||||||
baseTags = workerGroup == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_WORKER_GROUP, workerGroup);
|
|
||||||
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
|
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return tags for current {@link TriggerContext}
|
|
||||||
*
|
|
||||||
* @param triggerContext the current TriggerContext
|
|
||||||
* @return tags to applied to metrics
|
|
||||||
*/
|
|
||||||
public String[] tags(TriggerContext triggerContext) {
|
|
||||||
return tags(triggerContext, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return tags for current {@link SchedulerExecutionWithTrigger}.
|
* Return tags for current {@link SchedulerExecutionWithTrigger}.
|
||||||
*
|
*
|
||||||
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
|
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
|
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
|
||||||
return ArrayUtils.addAll(
|
return ArrayUtils.addAll(
|
||||||
@@ -294,7 +284,7 @@ public class MetricRegistry {
|
|||||||
/**
|
/**
|
||||||
* Return globals tags
|
* Return globals tags
|
||||||
*
|
*
|
||||||
* @return tags to applied to metrics
|
* @return tags to apply to metrics
|
||||||
*/
|
*/
|
||||||
public Tags tags(String... tags) {
|
public Tags tags(String... tags) {
|
||||||
return Tags.of(tags);
|
return Tags.of(tags);
|
||||||
|
|||||||
@@ -9,6 +9,22 @@ import lombok.NoArgsConstructor;
|
|||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Kestra event for killing an execution. A {@link ExecutionKilled} can be in two states:
|
||||||
|
* <p>
|
||||||
|
* <pre>
|
||||||
|
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
|
||||||
|
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* A {@link ExecutionKilled} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
|
||||||
|
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
|
||||||
|
* to be killed no matter what the circumstances.
|
||||||
|
* <p>
|
||||||
|
* IMPORTANT: A {@link ExecutionKilled} is considered to be a fire-and-forget event. As a result, we do not manage a
|
||||||
|
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilled}
|
||||||
|
* before considering an execution to be KILLED.
|
||||||
|
*/
|
||||||
@Getter
|
@Getter
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
|
|||||||
@@ -8,22 +8,6 @@ import jakarta.validation.constraints.NotNull;
|
|||||||
import lombok.*;
|
import lombok.*;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
/**
|
|
||||||
* The Kestra event for killing an execution. A {@link ExecutionKilledExecution} can be in two states:
|
|
||||||
* <p>
|
|
||||||
* <pre>
|
|
||||||
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
|
|
||||||
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* A {@link ExecutionKilledExecution} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
|
|
||||||
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
|
|
||||||
* to be killed no matter what the circumstances.
|
|
||||||
* <p>
|
|
||||||
* IMPORTANT: A {@link ExecutionKilledExecution} is considered to be a fire-and-forget event. As a result, we do not manage a
|
|
||||||
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilledExecution}
|
|
||||||
* before considering an execution to be KILLED.
|
|
||||||
*/
|
|
||||||
@Getter
|
@Getter
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
@@ -47,8 +31,9 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
|
|||||||
Boolean isOnKillCascade;
|
Boolean isOnKillCascade;
|
||||||
|
|
||||||
public boolean isEqual(WorkerTask workerTask) {
|
public boolean isEqual(WorkerTask workerTask) {
|
||||||
return (workerTask.getTaskRun().getTenantId() == null || (workerTask.getTaskRun().getTenantId() != null && workerTask.getTaskRun().getTenantId().equals(this.tenantId))) &&
|
String taskTenantId = workerTask.getTaskRun().getTenantId();
|
||||||
workerTask.getTaskRun().getExecutionId().equals(this.executionId);
|
String taskExecutionId = workerTask.getTaskRun().getExecutionId();
|
||||||
|
return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package io.kestra.core.models.flows;
|
package io.kestra.core.models.flows;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonSetter;
|
import com.fasterxml.jackson.annotation.JsonSetter;
|
||||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
@@ -39,7 +38,6 @@ import jakarta.validation.constraints.Pattern;
|
|||||||
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
|
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
|
||||||
@JsonSubTypes.Type(value = URIInput.class, name = "URI")
|
@JsonSubTypes.Type(value = URIInput.class, name = "URI")
|
||||||
})
|
})
|
||||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
|
||||||
public abstract class Input<T> implements Data {
|
public abstract class Input<T> implements Data {
|
||||||
@NotNull
|
@NotNull
|
||||||
@NotBlank
|
@NotBlank
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ public class EnumInput extends Input<String> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void validate(String input) throws ConstraintViolationException {
|
public void validate(String input) throws ConstraintViolationException {
|
||||||
if (!values.contains(input)) {
|
if (!values.contains(input) & this.getRequired()) {
|
||||||
throw ManualConstraintViolation.toConstraintViolationException(
|
throw ManualConstraintViolation.toConstraintViolationException(
|
||||||
"it must match the values `" + values + "`",
|
"it must match the values `" + values + "`",
|
||||||
this,
|
this,
|
||||||
|
|||||||
@@ -1,18 +1,21 @@
|
|||||||
package io.kestra.core.models.flows.input;
|
package io.kestra.core.models.flows.input;
|
||||||
|
|
||||||
import io.kestra.core.models.flows.Input;
|
import io.kestra.core.models.flows.Input;
|
||||||
|
import jakarta.validation.ConstraintViolationException;
|
||||||
|
import lombok.Builder;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import jakarta.validation.ConstraintViolationException;
|
|
||||||
|
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@Getter
|
@Getter
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class FileInput extends Input<URI> {
|
public class FileInput extends Input<URI> {
|
||||||
|
@Builder.Default
|
||||||
|
public String extension = ".upl";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void validate(URI input) throws ConstraintViolationException {
|
public void validate(URI input) throws ConstraintViolationException {
|
||||||
// no validation yet
|
// no validation yet
|
||||||
|
|||||||
@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
|
|||||||
@Nullable
|
@Nullable
|
||||||
private String executionId;
|
private String executionId;
|
||||||
|
|
||||||
@Nullable
|
|
||||||
private State.Type executionCurrentState;
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private Instant updatedDate;
|
private Instant updatedDate;
|
||||||
|
|
||||||
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
|
|||||||
protected Trigger(TriggerBuilder<?, ?> b) {
|
protected Trigger(TriggerBuilder<?, ?> b) {
|
||||||
super(b);
|
super(b);
|
||||||
this.executionId = b.executionId;
|
this.executionId = b.executionId;
|
||||||
this.executionCurrentState = b.executionCurrentState;
|
|
||||||
this.updatedDate = b.updatedDate;
|
this.updatedDate = b.updatedDate;
|
||||||
this.evaluateRunningDate = b.evaluateRunningDate;
|
this.evaluateRunningDate = b.evaluateRunningDate;
|
||||||
}
|
}
|
||||||
@@ -79,7 +75,7 @@ public class Trigger extends TriggerContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String flowUid() {
|
public String flowUid() {
|
||||||
return Flow.uid(this.getTenantId(), this.getNamespace(), this.getFlowId(), Optional.of(this.getFlowRevision()));
|
return Flow.uidWithoutRevision(this.getTenantId(), this.getNamespace(), this.getFlowId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -90,7 +86,6 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(flow.getTenantId())
|
.tenantId(flow.getTenantId())
|
||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.flowRevision(flow.getRevision())
|
|
||||||
.triggerId(abstractTrigger.getId())
|
.triggerId(abstractTrigger.getId())
|
||||||
.stopAfter(abstractTrigger.getStopAfter())
|
.stopAfter(abstractTrigger.getStopAfter())
|
||||||
.build();
|
.build();
|
||||||
@@ -137,12 +132,10 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(execution.getTenantId())
|
.tenantId(execution.getTenantId())
|
||||||
.namespace(execution.getNamespace())
|
.namespace(execution.getNamespace())
|
||||||
.flowId(execution.getFlowId())
|
.flowId(execution.getFlowId())
|
||||||
.flowRevision(execution.getFlowRevision())
|
|
||||||
.triggerId(execution.getTrigger().getId())
|
.triggerId(execution.getTrigger().getId())
|
||||||
.date(trigger.getDate())
|
.date(trigger.getDate())
|
||||||
.nextExecutionDate(trigger.getNextExecutionDate())
|
.nextExecutionDate(trigger.getNextExecutionDate())
|
||||||
.executionId(execution.getId())
|
.executionId(execution.getId())
|
||||||
.executionCurrentState(execution.getState().getCurrent())
|
|
||||||
.updatedDate(Instant.now())
|
.updatedDate(Instant.now())
|
||||||
.backfill(trigger.getBackfill())
|
.backfill(trigger.getBackfill())
|
||||||
.stopAfter(trigger.getStopAfter())
|
.stopAfter(trigger.getStopAfter())
|
||||||
@@ -175,7 +168,6 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(flow.getTenantId())
|
.tenantId(flow.getTenantId())
|
||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.flowRevision(flow.getRevision())
|
|
||||||
.triggerId(abstractTrigger.getId())
|
.triggerId(abstractTrigger.getId())
|
||||||
.date(ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS))
|
.date(ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS))
|
||||||
.nextExecutionDate(nextDate)
|
.nextExecutionDate(nextDate)
|
||||||
@@ -225,7 +217,6 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(this.getTenantId())
|
.tenantId(this.getTenantId())
|
||||||
.namespace(this.getNamespace())
|
.namespace(this.getNamespace())
|
||||||
.flowId(this.getFlowId())
|
.flowId(this.getFlowId())
|
||||||
.flowRevision(this.getFlowRevision())
|
|
||||||
.triggerId(this.getTriggerId())
|
.triggerId(this.getTriggerId())
|
||||||
.date(this.getDate())
|
.date(this.getDate())
|
||||||
.nextExecutionDate(nextExecutionDate)
|
.nextExecutionDate(nextExecutionDate)
|
||||||
@@ -240,7 +231,6 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(this.getTenantId())
|
.tenantId(this.getTenantId())
|
||||||
.namespace(this.getNamespace())
|
.namespace(this.getNamespace())
|
||||||
.flowId(this.getFlowId())
|
.flowId(this.getFlowId())
|
||||||
.flowRevision(this.getFlowRevision())
|
|
||||||
.triggerId(this.getTriggerId())
|
.triggerId(this.getTriggerId())
|
||||||
.date(this.getDate())
|
.date(this.getDate())
|
||||||
.nextExecutionDate(this.getNextExecutionDate())
|
.nextExecutionDate(this.getNextExecutionDate())
|
||||||
@@ -301,7 +291,6 @@ public class Trigger extends TriggerContext {
|
|||||||
.tenantId(triggerContext.getTenantId())
|
.tenantId(triggerContext.getTenantId())
|
||||||
.namespace(triggerContext.getNamespace())
|
.namespace(triggerContext.getNamespace())
|
||||||
.flowId(triggerContext.getFlowId())
|
.flowId(triggerContext.getFlowId())
|
||||||
.flowRevision(triggerContext.getFlowRevision())
|
|
||||||
.triggerId(triggerContext.getTriggerId())
|
.triggerId(triggerContext.getTriggerId())
|
||||||
.date(triggerContext.getDate())
|
.date(triggerContext.getDate())
|
||||||
.backfill(triggerContext.getBackfill())
|
.backfill(triggerContext.getBackfill())
|
||||||
|
|||||||
@@ -29,9 +29,6 @@ public class TriggerContext {
|
|||||||
@NotNull
|
@NotNull
|
||||||
private String flowId;
|
private String flowId;
|
||||||
|
|
||||||
@NotNull
|
|
||||||
private Integer flowRevision;
|
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private String triggerId;
|
private String triggerId;
|
||||||
|
|
||||||
@@ -53,7 +50,6 @@ public class TriggerContext {
|
|||||||
this.tenantId = b.tenantId;
|
this.tenantId = b.tenantId;
|
||||||
this.namespace = b.namespace;
|
this.namespace = b.namespace;
|
||||||
this.flowId = b.flowId;
|
this.flowId = b.flowId;
|
||||||
this.flowRevision = b.flowRevision;
|
|
||||||
this.triggerId = b.triggerId;
|
this.triggerId = b.triggerId;
|
||||||
this.date = b.date;
|
this.date = b.date;
|
||||||
this.nextExecutionDate = b.nextExecutionDate;
|
this.nextExecutionDate = b.nextExecutionDate;
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
package io.kestra.core.models.triggers;
|
package io.kestra.core.models.triggers;
|
||||||
|
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
|
||||||
import io.kestra.core.models.Label;
|
|
||||||
import io.kestra.core.models.conditions.ConditionContext;
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||||
@@ -22,7 +20,7 @@ public abstract class TriggerService {
|
|||||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||||
RunContext runContext = conditionContext.getRunContext();
|
RunContext runContext = conditionContext.getRunContext();
|
||||||
|
|
||||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
|
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Execution generateExecution(
|
public static Execution generateExecution(
|
||||||
@@ -34,30 +32,32 @@ public abstract class TriggerService {
|
|||||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
||||||
RunContext runContext = conditionContext.getRunContext();
|
RunContext runContext = conditionContext.getRunContext();
|
||||||
|
|
||||||
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
|
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Execution generateRealtimeExecution(
|
public static Execution generateRealtimeExecution(
|
||||||
AbstractTrigger trigger,
|
AbstractTrigger trigger,
|
||||||
|
ConditionContext conditionContext,
|
||||||
TriggerContext context,
|
TriggerContext context,
|
||||||
Output output
|
Output output
|
||||||
) {
|
) {
|
||||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
|
||||||
|
|
||||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger);
|
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Execution generateExecution(
|
private static Execution generateExecution(
|
||||||
String id,
|
String id,
|
||||||
AbstractTrigger trigger,
|
AbstractTrigger trigger,
|
||||||
TriggerContext context,
|
TriggerContext context,
|
||||||
ExecutionTrigger executionTrigger
|
ExecutionTrigger executionTrigger,
|
||||||
|
Integer flowRevision
|
||||||
) {
|
) {
|
||||||
return Execution.builder()
|
return Execution.builder()
|
||||||
.id(id)
|
.id(id)
|
||||||
.namespace(context.getNamespace())
|
.namespace(context.getNamespace())
|
||||||
.flowId(context.getFlowId())
|
.flowId(context.getFlowId())
|
||||||
.flowRevision(context.getFlowRevision())
|
.flowRevision(flowRevision)
|
||||||
.state(new State())
|
.state(new State())
|
||||||
.trigger(executionTrigger)
|
.trigger(executionTrigger)
|
||||||
.labels(trigger.getLabels() == null ? null : trigger.getLabels())
|
.labels(trigger.getLabels() == null ? null : trigger.getLabels())
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
|||||||
@Nullable ChildFilter childFilter
|
@Nullable ChildFilter childFilter
|
||||||
);
|
);
|
||||||
|
|
||||||
Flux<Execution> find(
|
default Flux<Execution> find(
|
||||||
@Nullable String query,
|
@Nullable String query,
|
||||||
@Nullable String tenantId,
|
@Nullable String tenantId,
|
||||||
@Nullable String namespace,
|
@Nullable String namespace,
|
||||||
@@ -62,6 +62,22 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
|||||||
@Nullable Map<String, String> labels,
|
@Nullable Map<String, String> labels,
|
||||||
@Nullable String triggerExecutionId,
|
@Nullable String triggerExecutionId,
|
||||||
@Nullable ChildFilter childFilter
|
@Nullable ChildFilter childFilter
|
||||||
|
) {
|
||||||
|
return find(query, tenantId, namespace, flowId, startDate, endDate, state, labels, triggerExecutionId, childFilter, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Flux<Execution> find(
|
||||||
|
@Nullable String query,
|
||||||
|
@Nullable String tenantId,
|
||||||
|
@Nullable String namespace,
|
||||||
|
@Nullable String flowId,
|
||||||
|
@Nullable ZonedDateTime startDate,
|
||||||
|
@Nullable ZonedDateTime endDate,
|
||||||
|
@Nullable List<State.Type> state,
|
||||||
|
@Nullable Map<String, String> labels,
|
||||||
|
@Nullable String triggerExecutionId,
|
||||||
|
@Nullable ChildFilter childFilter,
|
||||||
|
boolean allowDeleted
|
||||||
);
|
);
|
||||||
|
|
||||||
ArrayListTotal<TaskRun> findTaskRun(
|
ArrayListTotal<TaskRun> findTaskRun(
|
||||||
|
|||||||
@@ -35,6 +35,11 @@ public class Executor {
|
|||||||
private ExecutionResumed executionResumed;
|
private ExecutionResumed executionResumed;
|
||||||
private ExecutionResumed joinedExecutionResumed;
|
private ExecutionResumed joinedExecutionResumed;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The sequence id should be incremented each time the execution is persisted after mutation.
|
||||||
|
*/
|
||||||
|
private long seqId = 0L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of {@link ExecutionKilled} to be propagated part of the execution.
|
* List of {@link ExecutionKilled} to be propagated part of the execution.
|
||||||
*/
|
*/
|
||||||
@@ -45,6 +50,12 @@ public class Executor {
|
|||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Executor(Execution execution, Long offset, long seqId) {
|
||||||
|
this.execution = execution;
|
||||||
|
this.offset = offset;
|
||||||
|
this.seqId = seqId;
|
||||||
|
}
|
||||||
|
|
||||||
public Executor(WorkerTaskResult workerTaskResult) {
|
public Executor(WorkerTaskResult workerTaskResult) {
|
||||||
this.joinedWorkerTaskResult = workerTaskResult;
|
this.joinedWorkerTaskResult = workerTaskResult;
|
||||||
}
|
}
|
||||||
@@ -148,7 +159,18 @@ public class Executor {
|
|||||||
public Executor serialize() {
|
public Executor serialize() {
|
||||||
return new Executor(
|
return new Executor(
|
||||||
this.execution,
|
this.execution,
|
||||||
this.offset
|
this.offset,
|
||||||
|
this.seqId
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments and returns the execution sequence id.
|
||||||
|
*
|
||||||
|
* @return the sequence id.
|
||||||
|
*/
|
||||||
|
public long incrementAndGetSeqId() {
|
||||||
|
this.seqId++;
|
||||||
|
return seqId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners;
|
|||||||
|
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.models.tasks.runners.PluginUtilsService;
|
import io.kestra.core.models.tasks.runners.PluginUtilsService;
|
||||||
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.kestra.core.utils.ListUtils;
|
import io.kestra.core.utils.ListUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -44,17 +45,16 @@ public abstract class FilesService {
|
|||||||
file.getParentFile().mkdirs();
|
file.getParentFile().mkdirs();
|
||||||
}
|
}
|
||||||
|
|
||||||
var fileContent = runContext.render(input, additionalVars);
|
if (input == null) {
|
||||||
if (fileContent == null) {
|
|
||||||
file.createNewFile();
|
file.createNewFile();
|
||||||
} else {
|
} else {
|
||||||
if (fileContent.startsWith("kestra://")) {
|
if (input.startsWith("kestra://")) {
|
||||||
try (var is = runContext.storage().getFile(URI.create(fileContent));
|
try (var is = runContext.storage().getFile(URI.create(input));
|
||||||
var out = new FileOutputStream(file)) {
|
var out = new FileOutputStream(file)) {
|
||||||
IOUtils.copyLarge(is, out);
|
IOUtils.copyLarge(is, out);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Files.write(file.toPath(), fileContent.getBytes());
|
Files.write(file.toPath(), input.getBytes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
@@ -85,10 +85,14 @@ public abstract class FilesService {
|
|||||||
.filter(path -> pathMatcher.matches(runContext.tempDir().relativize(path)))
|
.filter(path -> pathMatcher.matches(runContext.tempDir().relativize(path)))
|
||||||
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
|
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
|
||||||
runContext.tempDir().relativize(path).toString(),
|
runContext.tempDir().relativize(path).toString(),
|
||||||
runContext.storage().putFile(path.toFile())
|
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
|
||||||
)))
|
)))
|
||||||
.toList()
|
.toList()
|
||||||
.stream();
|
.stream();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String resolveUniqueNameForFile(final Path path) {
|
||||||
|
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Flow;
|
|||||||
import io.kestra.core.models.flows.Input;
|
import io.kestra.core.models.flows.Input;
|
||||||
import io.kestra.core.models.flows.Type;
|
import io.kestra.core.models.flows.Type;
|
||||||
import io.kestra.core.models.flows.input.ArrayInput;
|
import io.kestra.core.models.flows.input.ArrayInput;
|
||||||
|
import io.kestra.core.models.flows.input.FileInput;
|
||||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
@@ -85,7 +86,9 @@ public class FlowInputOutput {
|
|||||||
.subscribeOn(Schedulers.boundedElastic())
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
.map(throwFunction(input -> {
|
.map(throwFunction(input -> {
|
||||||
if (input instanceof CompletedFileUpload fileUpload) {
|
if (input instanceof CompletedFileUpload fileUpload) {
|
||||||
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", ".upl");
|
String fileExtension = inputs.stream().filter(flowInput -> flowInput instanceof FileInput && flowInput.getId().equals(fileUpload.getFilename())).map(flowInput -> ((FileInput) flowInput).getExtension()).findFirst().orElse(".upl");
|
||||||
|
fileExtension = fileExtension.startsWith(".") ? fileExtension : "." + fileExtension;
|
||||||
|
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", fileExtension);
|
||||||
try (var inputStream = fileUpload.getInputStream();
|
try (var inputStream = fileUpload.getInputStream();
|
||||||
var outputStream = new FileOutputStream(tempFile)) {
|
var outputStream = new FileOutputStream(tempFile)) {
|
||||||
long transferredBytes = inputStream.transferTo(outputStream);
|
long transferredBytes = inputStream.transferTo(outputStream);
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import io.kestra.core.services.FlowListenersInterface;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
@@ -98,13 +99,13 @@ public class FlowListeners implements FlowListenersInterface {
|
|||||||
private Optional<Flow> previous(Flow flow) {
|
private Optional<Flow> previous(Flow flow) {
|
||||||
return flows
|
return flows
|
||||||
.stream()
|
.stream()
|
||||||
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
.filter(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
|
||||||
.findFirst();
|
.findFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean remove(Flow flow) {
|
private boolean remove(Flow flow) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
boolean remove = flows.removeIf(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
|
||||||
if (!remove && flow.isDeleted()) {
|
if (!remove && flow.isDeleted()) {
|
||||||
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
|
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ public class NamespaceFilesService {
|
|||||||
private StorageInterface storageInterface;
|
private StorageInterface storageInterface;
|
||||||
|
|
||||||
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception {
|
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception {
|
||||||
if (!namespaceFiles.getEnabled()) {
|
if (!Boolean.TRUE.equals(namespaceFiles.getEnabled())) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -339,7 +339,7 @@ public class RunContext {
|
|||||||
if (execution.getTaskRunList() != null) {
|
if (execution.getTaskRunList() != null) {
|
||||||
Map<String, Object> outputs = new HashMap<>(execution.outputs());
|
Map<String, Object> outputs = new HashMap<>(execution.outputs());
|
||||||
if (decryptVariables) {
|
if (decryptVariables) {
|
||||||
decryptOutputs(outputs);
|
outputs = decryptOutputs(outputs);
|
||||||
}
|
}
|
||||||
builder.put("outputs", outputs);
|
builder.put("outputs", outputs);
|
||||||
}
|
}
|
||||||
@@ -401,25 +401,37 @@ public class RunContext {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.runContextLogger != null) {
|
||||||
|
builder.put("addSecretConsumer", (Consumer<String>) s -> this.runContextLogger.usedSecret(s));
|
||||||
|
}
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decryptOutputs(Map<String, Object> outputs) {
|
private Map<String, Object> decryptOutputs(Map<String, Object> mapToDecrypt) {
|
||||||
for (var entry: outputs.entrySet()) {
|
if (mapToDecrypt == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> decryptedMap = new HashMap<>();
|
||||||
|
for (var entry: mapToDecrypt.entrySet()) {
|
||||||
|
decryptedMap.put(entry.getKey(), entry.getValue());
|
||||||
if (entry.getValue() instanceof Map map) {
|
if (entry.getValue() instanceof Map map) {
|
||||||
// if some outputs are of type EncryptedString we decode them and replace the object
|
// if some value are of type EncryptedString we decode them and replace the object
|
||||||
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
||||||
try {
|
try {
|
||||||
String decoded = decrypt((String) map.get("value"));
|
String decoded = decrypt((String) map.get("value"));
|
||||||
outputs.put(entry.getKey(), decoded);
|
decryptedMap.put(entry.getKey(), decoded);
|
||||||
} catch (GeneralSecurityException e) {
|
} catch (GeneralSecurityException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
decryptOutputs((Map<String, Object>) map);
|
decryptedMap.put(entry.getKey(), decryptOutputs((Map<String, Object>) map));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return decryptedMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> variables(TaskRun taskRun) {
|
private Map<String, Object> variables(TaskRun taskRun) {
|
||||||
@@ -502,6 +514,8 @@ public class RunContext {
|
|||||||
runContext.runContextLogger = this.runContextLogger;
|
runContext.runContextLogger = this.runContextLogger;
|
||||||
runContext.tempBasedPath = this.tempBasedPath;
|
runContext.tempBasedPath = this.tempBasedPath;
|
||||||
runContext.temporaryDirectory = this.temporaryDirectory;
|
runContext.temporaryDirectory = this.temporaryDirectory;
|
||||||
|
runContext.pluginConfiguration = this.pluginConfiguration;
|
||||||
|
runContext.secretKey = this.secretKey;
|
||||||
|
|
||||||
return runContext;
|
return runContext;
|
||||||
}
|
}
|
||||||
@@ -583,6 +597,17 @@ public class RunContext {
|
|||||||
return newContext;
|
return newContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RunContext forWorkingDirectoryTask(final Task task) {
|
||||||
|
Map<String, Object> decryptedVariables = new HashMap<>(this.variables);
|
||||||
|
if (this.variables.get("outputs") != null) {
|
||||||
|
decryptedVariables.put("outputs", decryptOutputs((Map<String, Object>) this.variables.get("outputs")));
|
||||||
|
}
|
||||||
|
|
||||||
|
RunContext newRunContext = this.clone(decryptedVariables);
|
||||||
|
newRunContext.initPluginConfiguration(applicationContext, task.getClass(), task.getType());
|
||||||
|
return newRunContext;
|
||||||
|
}
|
||||||
|
|
||||||
public RunContext forTaskRunner(TaskRunner taskRunner) {
|
public RunContext forTaskRunner(TaskRunner taskRunner) {
|
||||||
this.initPluginConfiguration(applicationContext, taskRunner.getClass(), taskRunner.getType());
|
this.initPluginConfiguration(applicationContext, taskRunner.getClass(), taskRunner.getType());
|
||||||
|
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ public class VariableRenderer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> render(Map<String, Object> in, Map<String, Object> variables, boolean recursive) throws IllegalVariableEvaluationException {
|
public Map<String, Object> render(Map<String, Object> in, Map<String, Object> variables, boolean recursive) throws IllegalVariableEvaluationException {
|
||||||
Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = new LinkedHashMap<>();
|
||||||
|
|
||||||
for (Map.Entry<String, Object> r : in.entrySet()) {
|
for (Map.Entry<String, Object> r : in.entrySet()) {
|
||||||
String key = this.render(r.getKey(), variables);
|
String key = this.render(r.getKey(), variables);
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import io.micronaut.context.annotation.Parameter;
|
|||||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||||
import io.micronaut.core.annotation.Introspected;
|
import io.micronaut.core.annotation.Introspected;
|
||||||
import io.micronaut.core.annotation.Nullable;
|
import io.micronaut.core.annotation.Nullable;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Named;
|
import jakarta.inject.Named;
|
||||||
@@ -126,6 +127,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
|
|
||||||
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
private final AtomicReference<ServiceState> state = new AtomicReference<>();
|
||||||
|
|
||||||
|
private final Integer numThreads;
|
||||||
|
private final AtomicInteger pendingJobCount = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger runningJobCount = new AtomicInteger(0);
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link Worker} instance.
|
* Creates a new {@link Worker} instance.
|
||||||
*
|
*
|
||||||
@@ -143,6 +147,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
ExecutorsUtils executorsUtils
|
ExecutorsUtils executorsUtils
|
||||||
) {
|
) {
|
||||||
this.id = workerId;
|
this.id = workerId;
|
||||||
|
this.numThreads = numThreads;
|
||||||
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
|
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
|
||||||
this.eventPublisher = eventPublisher;
|
this.eventPublisher = eventPublisher;
|
||||||
this.executorService = executorsUtils.maxCachedThreadPool(numThreads, "worker");
|
this.executorService = executorsUtils.maxCachedThreadPool(numThreads, "worker");
|
||||||
@@ -168,6 +173,15 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
context.inject(this);
|
context.inject(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
void initMetrics() {
|
||||||
|
String[] tags = this.workerGroup == null ? new String[0] : new String[] { MetricRegistry.TAG_WORKER_GROUP, this.workerGroup };
|
||||||
|
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
|
||||||
|
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
setState(ServiceState.RUNNING);
|
setState(ServiceState.RUNNING);
|
||||||
@@ -208,19 +222,29 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
this.workerGroup,
|
this.workerGroup,
|
||||||
Worker.class,
|
Worker.class,
|
||||||
either -> {
|
either -> {
|
||||||
|
pendingJobCount.incrementAndGet();
|
||||||
|
|
||||||
executorService.execute(() -> {
|
executorService.execute(() -> {
|
||||||
if (either.isRight()) {
|
pendingJobCount.decrementAndGet();
|
||||||
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
runningJobCount.incrementAndGet();
|
||||||
handleDeserializationError(either.getRight());
|
|
||||||
return;
|
try {
|
||||||
|
if (either.isRight()) {
|
||||||
|
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
|
||||||
|
handleDeserializationError(either.getRight());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerJob workerTask = either.getLeft();
|
||||||
|
if (workerTask instanceof WorkerTask task) {
|
||||||
|
handleTask(task);
|
||||||
|
} else if (workerTask instanceof WorkerTrigger trigger) {
|
||||||
|
handleTrigger(trigger);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
runningJobCount.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerJob workerTask = either.getLeft();
|
|
||||||
if (workerTask instanceof WorkerTask task) {
|
|
||||||
handleTask(task);
|
|
||||||
} else if (workerTask instanceof WorkerTrigger trigger) {
|
|
||||||
handleTrigger(trigger);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@@ -285,7 +309,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
WorkerTask currentWorkerTask = workingDirectory.workerTask(
|
WorkerTask currentWorkerTask = workingDirectory.workerTask(
|
||||||
workerTask.getTaskRun(),
|
workerTask.getTaskRun(),
|
||||||
currentTask,
|
currentTask,
|
||||||
runContext
|
runContext.forWorkingDirectoryTask(currentTask)
|
||||||
);
|
);
|
||||||
|
|
||||||
// all tasks will be handled immediately by the worker
|
// all tasks will be handled immediately by the worker
|
||||||
@@ -378,13 +402,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
.increment();
|
.increment();
|
||||||
|
|
||||||
this.metricRegistry
|
this.metricRegistry
|
||||||
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
|
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
|
||||||
.record(() -> {
|
.record(() -> {
|
||||||
StopWatch stopWatch = new StopWatch();
|
StopWatch stopWatch = new StopWatch();
|
||||||
stopWatch.start();
|
stopWatch.start();
|
||||||
|
|
||||||
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
|
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
|
||||||
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
|
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
|
||||||
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
|
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -70,12 +70,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
private final PluginDefaultService pluginDefaultService;
|
private final PluginDefaultService pluginDefaultService;
|
||||||
private final WorkerGroupService workerGroupService;
|
private final WorkerGroupService workerGroupService;
|
||||||
private final LogService logService;
|
private final LogService logService;
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
|
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
|
||||||
private volatile Boolean isReady = false;
|
private volatile Boolean isReady = false;
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
|
@Getter
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
|
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
|
||||||
@@ -178,7 +180,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
if (abstractTrigger instanceof WorkerTriggerInterface) {
|
if (abstractTrigger instanceof WorkerTriggerInterface) {
|
||||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||||
Trigger trigger = Trigger.of(flow, abstractTrigger);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.triggerState.update(flow, abstractTrigger, conditionContext);
|
this.triggerState.update(flow, abstractTrigger, conditionContext);
|
||||||
@@ -186,6 +187,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
logError(conditionContext, flow, abstractTrigger, e);
|
logError(conditionContext, flow, abstractTrigger, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Trigger trigger = Trigger.of(flow, abstractTrigger);
|
||||||
this.executionKilledQueue.emit(ExecutionKilledTrigger
|
this.executionKilledQueue.emit(ExecutionKilledTrigger
|
||||||
.builder()
|
.builder()
|
||||||
.tenantId(trigger.getTenantId())
|
.tenantId(trigger.getTenantId())
|
||||||
@@ -255,7 +257,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.tenantId(flowAndTrigger.flow().getTenantId())
|
.tenantId(flowAndTrigger.flow().getTenantId())
|
||||||
.namespace(flowAndTrigger.flow().getNamespace())
|
.namespace(flowAndTrigger.flow().getNamespace())
|
||||||
.flowId(flowAndTrigger.flow().getId())
|
.flowId(flowAndTrigger.flow().getId())
|
||||||
.flowRevision(flowAndTrigger.flow().getRevision())
|
|
||||||
.triggerId(flowAndTrigger.trigger().getId())
|
.triggerId(flowAndTrigger.trigger().getId())
|
||||||
.date(now())
|
.date(now())
|
||||||
.nextExecutionDate(nextExecutionDate)
|
.nextExecutionDate(nextExecutionDate)
|
||||||
@@ -343,7 +344,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
logError(conditionContext, flow, abstractTrigger, e);
|
logError(conditionContext, flow, abstractTrigger, e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
this.triggerState.save(triggerContext, scheduleContext);
|
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
|
||||||
} else {
|
} else {
|
||||||
triggerContext = lastTrigger;
|
triggerContext = lastTrigger;
|
||||||
}
|
}
|
||||||
@@ -368,7 +369,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
public List<FlowWithTriggers> schedulerTriggers() {
|
public List<FlowWithTriggers> schedulerTriggers() {
|
||||||
Map<String, Flow> flows = this.flowListeners.flows()
|
Map<String, Flow> flows = this.flowListeners.flows()
|
||||||
.stream()
|
.stream()
|
||||||
.collect(Collectors.toMap(Flow::uid, Function.identity()));
|
.collect(Collectors.toMap(Flow::uidWithoutRevision, Function.identity()));
|
||||||
|
|
||||||
return this.triggerState.findAllForAllTenants().stream()
|
return this.triggerState.findAllForAllTenants().stream()
|
||||||
.filter(trigger -> flows.containsKey(trigger.flowUid()))
|
.filter(trigger -> flows.containsKey(trigger.flowUid()))
|
||||||
@@ -432,11 +433,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
)
|
)
|
||||||
.build()
|
.build()
|
||||||
)
|
)
|
||||||
.peek(f -> {
|
|
||||||
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
|
|
||||||
this.triggerState.unlock(f.getTriggerContext());
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
|
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
|
||||||
.filter(this::isExecutionNotRunning)
|
.filter(this::isExecutionNotRunning)
|
||||||
.map(FlowWithWorkerTriggerNextDate::of)
|
.map(FlowWithWorkerTriggerNextDate::of)
|
||||||
@@ -472,7 +468,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
|
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.triggerState.save(triggerRunning, scheduleContext);
|
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
|
||||||
this.sendWorkerTriggerToWorker(f);
|
this.sendWorkerTriggerToWorker(f);
|
||||||
} catch (InternalException e) {
|
} catch (InternalException e) {
|
||||||
logService.logTrigger(
|
logService.logTrigger(
|
||||||
@@ -497,7 +493,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
|
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
|
||||||
);
|
);
|
||||||
trigger = trigger.checkBackfill();
|
trigger = trigger.checkBackfill();
|
||||||
this.triggerState.save(trigger, scheduleContext);
|
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logService.logTrigger(
|
logService.logTrigger(
|
||||||
@@ -515,7 +511,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
logError(f, e);
|
logError(f, e);
|
||||||
}
|
}
|
||||||
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
|
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
|
||||||
this.triggerState.save(trigger, scheduleContext);
|
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
|
||||||
}
|
}
|
||||||
} catch (InternalException ie) {
|
} catch (InternalException ie) {
|
||||||
// validate schedule condition can fail to render variables
|
// validate schedule condition can fail to render variables
|
||||||
@@ -526,13 +522,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.tenantId(f.getTriggerContext().getTenantId())
|
.tenantId(f.getTriggerContext().getTenantId())
|
||||||
.namespace(f.getTriggerContext().getNamespace())
|
.namespace(f.getTriggerContext().getNamespace())
|
||||||
.flowId(f.getTriggerContext().getFlowId())
|
.flowId(f.getTriggerContext().getFlowId())
|
||||||
.flowRevision(f.getTriggerContext().getFlowRevision())
|
|
||||||
.labels(f.getFlow().getLabels())
|
.labels(f.getFlow().getLabels())
|
||||||
.state(new State().withState(State.Type.FAILED))
|
.state(new State().withState(State.Type.FAILED))
|
||||||
.build();
|
.build();
|
||||||
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -572,7 +567,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
|
|
||||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||||
// So we must save them by passing the scheduleContext.
|
// So we must save them by passing the scheduleContext.
|
||||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
|
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
|
||||||
@@ -593,8 +588,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The execution is not yet started, we skip
|
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
|
||||||
if (lastTrigger.getExecutionCurrentState() == null) {
|
|
||||||
|
// executionState hasn't received the execution, we skip
|
||||||
|
if (execution.isEmpty()) {
|
||||||
if (lastTrigger.getUpdatedDate() != null) {
|
if (lastTrigger.getUpdatedDate() != null) {
|
||||||
metricRegistry
|
metricRegistry
|
||||||
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
|
||||||
@@ -621,6 +618,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
|
||||||
|
// check that if an executionId but no state, this means the execution is not started
|
||||||
|
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
|
||||||
|
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
logService.logTrigger(
|
logService.logTrigger(
|
||||||
f.getTriggerContext(),
|
f.getTriggerContext(),
|
||||||
@@ -628,7 +629,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
Level.DEBUG,
|
Level.DEBUG,
|
||||||
"Execution '{}' is still '{}', updated at '{}'",
|
"Execution '{}' is still '{}', updated at '{}'",
|
||||||
lastTrigger.getExecutionId(),
|
lastTrigger.getExecutionId(),
|
||||||
lastTrigger.getExecutionCurrentState(),
|
execution.get().getState().getCurrent(),
|
||||||
lastTrigger.getUpdatedDate()
|
lastTrigger.getUpdatedDate()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -849,7 +850,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.tenantId(f.getTriggerContext().getTenantId())
|
.tenantId(f.getTriggerContext().getTenantId())
|
||||||
.namespace(f.getTriggerContext().getNamespace())
|
.namespace(f.getTriggerContext().getNamespace())
|
||||||
.flowId(f.getTriggerContext().getFlowId())
|
.flowId(f.getTriggerContext().getFlowId())
|
||||||
.flowRevision(f.getTriggerContext().getFlowRevision())
|
|
||||||
.triggerId(f.getTriggerContext().getTriggerId())
|
.triggerId(f.getTriggerContext().getTriggerId())
|
||||||
.date(f.getTriggerContext().getNextExecutionDate())
|
.date(f.getTriggerContext().getNextExecutionDate())
|
||||||
.nextExecutionDate(f.getTriggerContext().getNextExecutionDate())
|
.nextExecutionDate(f.getTriggerContext().getNextExecutionDate())
|
||||||
|
|||||||
@@ -1,4 +1,11 @@
|
|||||||
package io.kestra.core.schedulers;
|
package io.kestra.core.schedulers;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
// For tests purpose
|
// For tests purpose
|
||||||
public class DefaultScheduleContext implements ScheduleContextInterface {}
|
public class DefaultScheduleContext implements ScheduleContextInterface {
|
||||||
|
@Override
|
||||||
|
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||||
|
consumer.accept(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
|
|||||||
public DefaultScheduler(
|
public DefaultScheduler(
|
||||||
ApplicationContext applicationContext,
|
ApplicationContext applicationContext,
|
||||||
FlowListenersInterface flowListeners,
|
FlowListenersInterface flowListeners,
|
||||||
|
SchedulerExecutionStateInterface executionState,
|
||||||
SchedulerTriggerStateInterface triggerState
|
SchedulerTriggerStateInterface triggerState
|
||||||
) {
|
) {
|
||||||
super(applicationContext, flowListeners);
|
super(applicationContext, flowListeners);
|
||||||
this.triggerState = triggerState;
|
this.triggerState = triggerState;
|
||||||
|
this.executionState = executionState;
|
||||||
|
|
||||||
this.conditionService = applicationContext.getBean(ConditionService.class);
|
this.conditionService = applicationContext.getBean(ConditionService.class);
|
||||||
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||||
|
|||||||
@@ -1,4 +1,14 @@
|
|||||||
package io.kestra.core.schedulers;
|
package io.kestra.core.schedulers;
|
||||||
|
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
|
||||||
|
* See AbstractScheduler.handle().
|
||||||
|
*/
|
||||||
public interface ScheduleContextInterface {
|
public interface ScheduleContextInterface {
|
||||||
|
/**
|
||||||
|
* Do trigger retrieval and updating in a single transaction.
|
||||||
|
*/
|
||||||
|
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package io.kestra.core.schedulers;
|
||||||
|
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
|
||||||
|
@Inject
|
||||||
|
private ExecutionRepositoryInterface executionRepository;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<Execution> findById(String tenantId, String id) {
|
||||||
|
return executionRepository.findById(tenantId, id);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
package io.kestra.core.schedulers;
|
||||||
|
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public interface SchedulerExecutionStateInterface {
|
||||||
|
Optional<Execution> findById(String tenantId, String id);
|
||||||
|
}
|
||||||
@@ -20,19 +20,22 @@ public interface SchedulerTriggerStateInterface {
|
|||||||
|
|
||||||
Trigger create(Trigger trigger) throws ConstraintViolationException;
|
Trigger create(Trigger trigger) throws ConstraintViolationException;
|
||||||
|
|
||||||
|
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
|
||||||
|
|
||||||
|
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
|
||||||
|
|
||||||
Trigger update(Trigger trigger);
|
Trigger update(Trigger trigger);
|
||||||
|
|
||||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by the JDBC implementation: find triggers in all tenants.
|
||||||
|
*/
|
||||||
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Required for Kafka
|
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
|
||||||
*/
|
*/
|
||||||
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
|
||||||
|
|
||||||
/**
|
|
||||||
* Required for Kafka
|
|
||||||
*/
|
|
||||||
void unlock(Trigger trigger);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -345,7 +345,8 @@ public class ExecutionService {
|
|||||||
state,
|
state,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null
|
null,
|
||||||
|
true
|
||||||
)
|
)
|
||||||
.map(throwFunction(execution -> {
|
.map(throwFunction(execution -> {
|
||||||
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
|
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
|
||||||
@@ -478,6 +479,7 @@ public class ExecutionService {
|
|||||||
.executionId(childExecution.getId())
|
.executionId(childExecution.getId())
|
||||||
.isOnKillCascade(true)
|
.isOnKillCascade(true)
|
||||||
.state(ExecutionKilled.State.REQUESTED) // Event will be reentrant in the Executor.
|
.state(ExecutionKilled.State.REQUESTED) // Event will be reentrant in the Executor.
|
||||||
|
.tenantId(tenantId)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,13 +12,23 @@ import java.util.List;
|
|||||||
public class SkipExecutionService {
|
public class SkipExecutionService {
|
||||||
private volatile List<String> skipExecutions = Collections.emptyList();
|
private volatile List<String> skipExecutions = Collections.emptyList();
|
||||||
private volatile List<FlowId> skipFlows = Collections.emptyList();
|
private volatile List<FlowId> skipFlows = Collections.emptyList();
|
||||||
|
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
|
||||||
|
private volatile List<String> skipTenants = Collections.emptyList();
|
||||||
|
|
||||||
public synchronized void setSkipExecutions(List<String> skipExecutions) {
|
public synchronized void setSkipExecutions(List<String> skipExecutions) {
|
||||||
this.skipExecutions = skipExecutions;
|
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setSkipFlows(List<String> skipFlows) {
|
public synchronized void setSkipFlows(List<String> skipFlows) {
|
||||||
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(flow -> FlowId.from(flow)).toList();
|
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setSkipNamespaces(List<String> skipNamespaces) {
|
||||||
|
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setSkipTenants(List<String> skipTenants) {
|
||||||
|
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -38,17 +48,30 @@ public class SkipExecutionService {
|
|||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
|
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
|
||||||
return skipExecutions.contains(executionId) ||
|
return (tenant != null && skipTenants.contains(tenant)) ||
|
||||||
skipFlows.contains(new FlowId(tenant, namespace, flow));
|
skipNamespaces.contains(new NamespaceId(tenant, namespace)) ||
|
||||||
|
skipFlows.contains(new FlowId(tenant, namespace, flow)) ||
|
||||||
|
(executionId != null && skipExecutions.contains(executionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String[] splitIdParts(String id) {
|
||||||
|
return id.split("\\|");
|
||||||
}
|
}
|
||||||
|
|
||||||
record FlowId(String tenant, String namespace, String flow) {
|
record FlowId(String tenant, String namespace, String flow) {
|
||||||
static FlowId from(String flowId) {
|
static FlowId from(String flowId) {
|
||||||
String[] parts = flowId.split("\\|");
|
String[] parts = SkipExecutionService.splitIdParts(flowId);
|
||||||
if (parts.length == 3) {
|
if (parts.length == 3) {
|
||||||
return new FlowId(parts[0], parts[1], parts[2]);
|
return new FlowId(parts[0], parts[1], parts[2]);
|
||||||
}
|
}
|
||||||
return new FlowId(null, parts[0], parts[1]);
|
return new FlowId(null, parts[0], parts[1]);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
record NamespaceId(String tenant, String namespace) {
|
||||||
|
static NamespaceId from(String namespaceId) {
|
||||||
|
String[] parts = SkipExecutionService.splitIdParts(namespaceId);
|
||||||
|
return new NamespaceId(parts[0], parts[1]);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -152,7 +152,6 @@ abstract public class TestsUtils {
|
|||||||
.triggerId(trigger.getId())
|
.triggerId(trigger.getId())
|
||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowRevision(flow.getRevision())
|
|
||||||
.date(ZonedDateTime.now())
|
.date(ZonedDateTime.now())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|||||||
@@ -11,30 +11,15 @@ import io.kestra.core.models.executions.TaskRunAttempt;
|
|||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.models.tasks.ExecutableTask;
|
import io.kestra.core.models.tasks.ExecutableTask;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.runners.ExecutableUtils;
|
import io.kestra.core.runners.*;
|
||||||
import io.kestra.core.runners.FlowExecutorInterface;
|
|
||||||
import io.kestra.core.runners.FlowInputOutput;
|
|
||||||
import io.kestra.core.runners.RunContext;
|
|
||||||
import io.kestra.core.runners.SubflowExecution;
|
|
||||||
import io.kestra.core.runners.SubflowExecutionResult;
|
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import jakarta.validation.constraints.Min;
|
import jakarta.validation.constraints.Min;
|
||||||
import lombok.experimental.SuperBuilder;
|
|
||||||
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.ToString;
|
|
||||||
|
|
||||||
import jakarta.validation.constraints.NotEmpty;
|
import jakarta.validation.constraints.NotEmpty;
|
||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
import java.util.ArrayList;
|
import lombok.*;
|
||||||
import java.util.Collections;
|
import lombok.experimental.SuperBuilder;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.*;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@@ -153,6 +138,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
|||||||
|
|
||||||
if (this.labels != null) {
|
if (this.labels != null) {
|
||||||
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
|
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
|
||||||
|
labels.removeIf(label -> label.key().equals(entry.getKey()));
|
||||||
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
|
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -160,7 +160,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer iterationCount = (Integer) parentTaskRun.getOutputs().get("iterationCount");
|
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
|
||||||
|
.map(outputs -> (Integer) outputs.get("iterationCount"))
|
||||||
|
.orElse(0);
|
||||||
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
|
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
|
||||||
if (printLog) {logger.warn("Max iterations reached");}
|
if (printLog) {logger.warn("Max iterations reached");}
|
||||||
return true;
|
return true;
|
||||||
@@ -236,7 +238,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
|
|||||||
|
|
||||||
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
|
||||||
String value = parentTaskRun != null ?
|
String value = parentTaskRun != null ?
|
||||||
parentTaskRun.getOutputs().get("iterationCount").toString() : "0";
|
String.valueOf(Optional.ofNullable(parentTaskRun.getOutputs())
|
||||||
|
.map(outputs -> outputs.get("iterationCount"))
|
||||||
|
.orElse("0")) : "0";
|
||||||
|
|
||||||
return Output.builder()
|
return Output.builder()
|
||||||
.iterationCount(Integer.parseInt(value) + 1)
|
.iterationCount(Integer.parseInt(value) + 1)
|
||||||
|
|||||||
@@ -270,7 +270,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.namespaceFiles != null ) {
|
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||||
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
||||||
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
|
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import java.net.URI;
|
|||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.OptionalInt;
|
||||||
|
|
||||||
@SuperBuilder
|
@SuperBuilder
|
||||||
@ToString
|
@ToString
|
||||||
@@ -122,6 +123,16 @@ public class Request extends AbstractHttp implements RunnableTask<Request.Output
|
|||||||
response = client
|
response = client
|
||||||
.toBlocking()
|
.toBlocking()
|
||||||
.exchange(request, Argument.STRING, Argument.STRING);
|
.exchange(request, Argument.STRING, Argument.STRING);
|
||||||
|
|
||||||
|
// check that the string is a valid Unicode string
|
||||||
|
if (response.getBody().isPresent()) {
|
||||||
|
OptionalInt illegalChar = response.body().chars().filter(c -> !Character.isDefined(c)).findFirst();
|
||||||
|
if (illegalChar.isPresent()) {
|
||||||
|
throw new IllegalArgumentException("Illegal unicode code point in request body: " + illegalChar.getAsInt() +
|
||||||
|
", the Request task only support valid Unicode strings as body.\n" +
|
||||||
|
"You can try using the Download task instead.");
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (HttpClientResponseException e) {
|
} catch (HttpClientResponseException e) {
|
||||||
if (!allowFailed) {
|
if (!allowFailed) {
|
||||||
throw e;
|
throw e;
|
||||||
|
|||||||
@@ -80,7 +80,6 @@ public class DeleteFiles extends Task implements RunnableTask<DeleteFiles.Output
|
|||||||
private String namespace;
|
private String namespace;
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
@NotEmpty
|
|
||||||
@Schema(
|
@Schema(
|
||||||
title = "A file or a list of files from the given namespace.",
|
title = "A file or a list of files from the given namespace.",
|
||||||
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import io.kestra.core.runners.RunContext;
|
|||||||
import io.kestra.core.services.FlowService;
|
import io.kestra.core.services.FlowService;
|
||||||
import io.kestra.core.utils.Rethrow;
|
import io.kestra.core.utils.Rethrow;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import jakarta.validation.constraints.NotEmpty;
|
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
@@ -84,7 +83,6 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
|||||||
private String namespace;
|
private String namespace;
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
@NotEmpty
|
|
||||||
@Schema(
|
@Schema(
|
||||||
title = "A file or a list of files from the given namespace.",
|
title = "A file or a list of files from the given namespace.",
|
||||||
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
|
||||||
@@ -93,11 +91,19 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
|||||||
@PluginProperty(dynamic = true)
|
@PluginProperty(dynamic = true)
|
||||||
private Object files;
|
private Object files;
|
||||||
|
|
||||||
|
@Schema(
|
||||||
|
title = "The folder where the downloaded files will be stored"
|
||||||
|
)
|
||||||
|
@PluginProperty(dynamic = true)
|
||||||
|
@Builder.Default
|
||||||
|
private String destination = "";
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Output run(RunContext runContext) throws Exception {
|
public Output run(RunContext runContext) throws Exception {
|
||||||
Logger logger = runContext.logger();
|
Logger logger = runContext.logger();
|
||||||
String renderedNamespace = runContext.render(namespace);
|
String renderedNamespace = runContext.render(namespace);
|
||||||
|
String renderedDestination = runContext.render(destination);
|
||||||
// Check if namespace is allowed
|
// Check if namespace is allowed
|
||||||
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
RunContext.FlowInfo flowInfo = runContext.flowInfo();
|
||||||
FlowService flowService = runContext.getApplicationContext().getBean(FlowService.class);
|
FlowService flowService = runContext.getApplicationContext().getBean(FlowService.class);
|
||||||
@@ -120,7 +126,7 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
|
|||||||
namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> {
|
namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> {
|
||||||
if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) {
|
if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) {
|
||||||
try (InputStream inputStream = namespaceFilesService.content(flowInfo.tenantId(), renderedNamespace, uri)) {
|
try (InputStream inputStream = namespaceFilesService.content(flowInfo.tenantId(), renderedNamespace, uri)) {
|
||||||
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, uri.getPath()));
|
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, destination + uri.getPath()));
|
||||||
logger.debug(String.format("Downloaded %s", uri));
|
logger.debug(String.format("Downloaded %s", uri));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -138,10 +138,10 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
|
|||||||
});
|
});
|
||||||
|
|
||||||
// check for file in current tempDir that match regexs
|
// check for file in current tempDir that match regexs
|
||||||
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + reg)).toList();
|
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + runContext.tempDir().toString() + checkLeadingSlash(reg))).toList();
|
||||||
for (File file : Objects.requireNonNull(runContext.tempDir().toFile().listFiles())) {
|
for (File file : Objects.requireNonNull(listFilesRecursively(runContext.tempDir().toFile()))) {
|
||||||
if (patterns.stream().anyMatch(p -> p.matches(Path.of(file.toURI().getPath())))) {
|
if (patterns.stream().anyMatch(p -> p.matches(Path.of(file.toURI().getPath())))) {
|
||||||
String newFilePath = buildPath(renderedDestination, file.getName());
|
String newFilePath = buildPath(renderedDestination, file.getPath().replace(runContext.tempDir().toString(), ""));
|
||||||
storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, new FileInputStream(file));
|
storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, new FileInputStream(file));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -199,6 +199,24 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<File> listFilesRecursively(File directory) throws IOException {
|
||||||
|
List<File> files = new ArrayList<>();
|
||||||
|
if (directory == null || !directory.isDirectory()) {
|
||||||
|
return files; // Handle invalid directory or not a directory
|
||||||
|
}
|
||||||
|
|
||||||
|
for (File file : directory.listFiles()) {
|
||||||
|
if (file.isFile()) {
|
||||||
|
files.add(file);
|
||||||
|
} else {
|
||||||
|
// Recursively call for subdirectories
|
||||||
|
files.addAll(listFilesRecursively(file));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Builder
|
@Builder
|
||||||
@Getter
|
@Getter
|
||||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||||
|
|||||||
@@ -1,7 +1,11 @@
|
|||||||
package io.kestra.plugin.core.trigger;
|
package io.kestra.plugin.core.trigger;
|
||||||
|
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.annotations.PluginProperty;
|
import io.kestra.core.models.annotations.PluginProperty;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import lombok.*;
|
import lombok.*;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
import io.kestra.core.models.annotations.Example;
|
import io.kestra.core.models.annotations.Example;
|
||||||
@@ -91,7 +95,7 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
|||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.flowRevision(flow.getRevision())
|
.flowRevision(flow.getRevision())
|
||||||
.labels(flow.getLabels())
|
.labels(generateLabels(runContext, flow))
|
||||||
.state(new State())
|
.state(new State())
|
||||||
.trigger(ExecutionTrigger.of(
|
.trigger(ExecutionTrigger.of(
|
||||||
this,
|
this,
|
||||||
@@ -128,6 +132,34 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
|
||||||
|
final List<Label> labels = new ArrayList<>();
|
||||||
|
|
||||||
|
if (flow.getLabels() != null) {
|
||||||
|
labels.addAll(flow.getLabels()); // no need for rendering
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.getLabels() != null) {
|
||||||
|
for (Label label : this.getLabels()) {
|
||||||
|
final var value = renderLabelValue(runContext, label);
|
||||||
|
if (value != null) {
|
||||||
|
labels.add(new Label(label.key(), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return labels;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String renderLabelValue(RunContext runContext, Label label) {
|
||||||
|
try {
|
||||||
|
return runContext.render(label.value());
|
||||||
|
} catch (IllegalVariableEvaluationException e) {
|
||||||
|
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Builder
|
@Builder
|
||||||
@ToString
|
@ToString
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
|
|||||||
import com.cronutils.model.time.ExecutionTime;
|
import com.cronutils.model.time.ExecutionTime;
|
||||||
import com.cronutils.parser.CronParser;
|
import com.cronutils.parser.CronParser;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.kestra.core.exceptions.InternalException;
|
import io.kestra.core.exceptions.InternalException;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.annotations.Example;
|
import io.kestra.core.models.annotations.Example;
|
||||||
@@ -349,14 +350,13 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
|||||||
// validate schedule condition can fail to render variables
|
// validate schedule condition can fail to render variables
|
||||||
// in this case, we return a failed execution so the trigger is not evaluated each second
|
// in this case, we return a failed execution so the trigger is not evaluated each second
|
||||||
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
|
||||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
|
||||||
Execution execution = Execution.builder()
|
Execution execution = Execution.builder()
|
||||||
.id(runContext.getTriggerExecutionId())
|
.id(runContext.getTriggerExecutionId())
|
||||||
.tenantId(triggerContext.getTenantId())
|
.tenantId(triggerContext.getTenantId())
|
||||||
.namespace(triggerContext.getNamespace())
|
.namespace(triggerContext.getNamespace())
|
||||||
.flowId(triggerContext.getFlowId())
|
.flowId(triggerContext.getFlowId())
|
||||||
.flowRevision(triggerContext.getFlowRevision())
|
.flowRevision(conditionContext.getFlow().getRevision())
|
||||||
.labels(labels)
|
.labels(generateLabels(runContext, conditionContext, backfill))
|
||||||
.state(new State().withState(State.Type.FAILED))
|
.state(new State().withState(State.Type.FAILED))
|
||||||
.build();
|
.build();
|
||||||
return Optional.of(execution);
|
return Optional.of(execution);
|
||||||
@@ -390,7 +390,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
|||||||
} else {
|
} else {
|
||||||
variables = scheduleDates.toMap();
|
variables = scheduleDates.toMap();
|
||||||
}
|
}
|
||||||
List<Label> labels = generateLabels(conditionContext, backfill);
|
List<Label> labels = generateLabels(runContext, conditionContext, backfill);
|
||||||
|
|
||||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
|
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
|
||||||
|
|
||||||
@@ -399,7 +399,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
|||||||
.tenantId(triggerContext.getTenantId())
|
.tenantId(triggerContext.getTenantId())
|
||||||
.namespace(triggerContext.getNamespace())
|
.namespace(triggerContext.getNamespace())
|
||||||
.flowId(triggerContext.getFlowId())
|
.flowId(triggerContext.getFlowId())
|
||||||
.flowRevision(triggerContext.getFlowRevision())
|
.flowRevision(conditionContext.getFlow().getRevision())
|
||||||
.labels(labels)
|
.labels(labels)
|
||||||
.state(new State())
|
.state(new State())
|
||||||
.trigger(executionTrigger)
|
.trigger(executionTrigger)
|
||||||
@@ -425,19 +425,29 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
|
|||||||
.orElse(RecoverMissedSchedules.ALL);
|
.orElse(RecoverMissedSchedules.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Label> generateLabels(ConditionContext conditionContext, Backfill backfill) {
|
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
|
||||||
List<Label> labels = new ArrayList<>();
|
List<Label> labels = new ArrayList<>();
|
||||||
|
|
||||||
if (conditionContext.getFlow().getLabels() != null) {
|
if (conditionContext.getFlow().getLabels() != null) {
|
||||||
labels.addAll(conditionContext.getFlow().getLabels());
|
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
|
||||||
}
|
}
|
||||||
|
|
||||||
if (backfill != null && backfill.getLabels() != null) {
|
if (backfill != null && backfill.getLabels() != null) {
|
||||||
labels.addAll(backfill.getLabels());
|
for (Label label : backfill.getLabels()) {
|
||||||
|
final var value = runContext.render(label.value());
|
||||||
|
if (value != null) {
|
||||||
|
labels.add(new Label(label.key(), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.getLabels() != null) {
|
if (this.getLabels() != null) {
|
||||||
labels.addAll(this.getLabels());
|
for (Label label : this.getLabels()) {
|
||||||
|
final var value = runContext.render(label.value());
|
||||||
|
if (value != null) {
|
||||||
|
labels.add(new Label(label.key(), value));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return labels;
|
return labels;
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ public abstract class AbstractTriggerRepositoryTest {
|
|||||||
return Trigger.builder()
|
return Trigger.builder()
|
||||||
.flowId(IdUtils.create())
|
.flowId(IdUtils.create())
|
||||||
.namespace(TEST_NAMESPACE)
|
.namespace(TEST_NAMESPACE)
|
||||||
.flowRevision(1)
|
|
||||||
.triggerId(IdUtils.create())
|
.triggerId(IdUtils.create())
|
||||||
.executionId(IdUtils.create())
|
.executionId(IdUtils.create())
|
||||||
.date(ZonedDateTime.now());
|
.date(ZonedDateTime.now());
|
||||||
|
|||||||
@@ -88,8 +88,7 @@ public class DeserializationIssuesCaseTest {
|
|||||||
"date": "2023-11-24T15:48:57.632881597Z",
|
"date": "2023-11-24T15:48:57.632881597Z",
|
||||||
"flowId": "http-trigger",
|
"flowId": "http-trigger",
|
||||||
"namespace": "dev",
|
"namespace": "dev",
|
||||||
"triggerId": "http",
|
"triggerId": "http"
|
||||||
"flowRevision": 3
|
|
||||||
},
|
},
|
||||||
"conditionContext": {
|
"conditionContext": {
|
||||||
"flow": {
|
"flow": {
|
||||||
|
|||||||
@@ -34,6 +34,13 @@ class FilesServiceTest {
|
|||||||
assertThat(content.get("file.txt"), is("Hello World"));
|
assertThat(content.get("file.txt"), is("Hello World"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void renderRawFile() throws Exception {
|
||||||
|
RunContext runContext = runContextFactory.of(Map.of("filename", "file.txt", "content", "Hello World"));
|
||||||
|
Map<String, String> content = FilesService.inputFiles(runContext, Map.of("{{filename}}", "{% raw %}{{content}}{% endraw %}"));
|
||||||
|
assertThat(content.get("file.txt"), is("{{content}}"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void outputFiles() throws Exception {
|
void outputFiles() throws Exception {
|
||||||
RunContext runContext = runContextFactory.of();
|
RunContext runContext = runContextFactory.of();
|
||||||
|
|||||||
@@ -104,6 +104,13 @@ abstract public class FlowListenersTest {
|
|||||||
assertThat(count.get(), is(2));
|
assertThat(count.get(), is(2));
|
||||||
assertThat(flowListenersService.flows().size(), is(2));
|
assertThat(flowListenersService.flows().size(), is(2));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Flow withTenant = first.toBuilder().tenantId("some-tenant").build();
|
||||||
|
flowRepository.create(withTenant, withTenant.generateSource(), pluginDefaultService.injectDefaults(withTenant));
|
||||||
|
wait(ref, () -> {
|
||||||
|
assertThat(count.get(), is(3));
|
||||||
|
assertThat(flowListenersService.flows().size(), is(3));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Ref {
|
public static class Ref {
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
package io.kestra.core.runners;
|
package io.kestra.core.runners;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.contains;
|
||||||
|
|
||||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -18,6 +22,9 @@ class VariableRendererTest {
|
|||||||
@Inject
|
@Inject
|
||||||
VariableRenderer.VariableConfiguration variableConfiguration;
|
VariableRenderer.VariableConfiguration variableConfiguration;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
VariableRenderer variableRenderer;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
|
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
|
||||||
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
|
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
|
||||||
@@ -25,6 +32,25 @@ class VariableRendererTest {
|
|||||||
Assertions.assertEquals("result", render);
|
Assertions.assertEquals("result", render);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldKeepKeyOrderWhenRenderingMap() throws IllegalVariableEvaluationException {
|
||||||
|
final Map<String, Object> input = new LinkedHashMap<>();
|
||||||
|
input.put("foo-1", "A");
|
||||||
|
input.put("foo-2", "B");
|
||||||
|
|
||||||
|
final Map<String, Object> input_value3 = new LinkedHashMap<>();
|
||||||
|
input_value3.put("bar-1", "C");
|
||||||
|
input_value3.put("bar-2", "D");
|
||||||
|
input_value3.put("bar-3", "E");
|
||||||
|
//
|
||||||
|
input.put("foo-3", input_value3);
|
||||||
|
|
||||||
|
final Map<String, Object> result = variableRenderer.render(input, Map.of());
|
||||||
|
assertThat(result.keySet(), contains("foo-1", "foo-2", "foo-3"));
|
||||||
|
|
||||||
|
final Map<String, Object> result_value3 = (Map<String, Object>) result.get("foo-3");
|
||||||
|
assertThat(result_value3.keySet(), contains("bar-1", "bar-2", "bar-3"));
|
||||||
|
}
|
||||||
|
|
||||||
public static class TestVariableRenderer extends VariableRenderer {
|
public static class TestVariableRenderer extends VariableRenderer {
|
||||||
|
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ abstract public class AbstractSchedulerTest {
|
|||||||
.id(IdUtils.create())
|
.id(IdUtils.create())
|
||||||
.namespace(context.getNamespace())
|
.namespace(context.getNamespace())
|
||||||
.flowId(context.getFlowId())
|
.flowId(context.getFlowId())
|
||||||
.flowRevision(context.getFlowRevision())
|
.flowRevision(conditionContext.getFlow().getRevision())
|
||||||
.state(new State())
|
.state(new State())
|
||||||
.trigger(ExecutionTrigger.builder()
|
.trigger(ExecutionTrigger.builder()
|
||||||
.id(this.getId())
|
.id(this.getId())
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
|
|||||||
import io.kestra.core.models.triggers.Trigger;
|
import io.kestra.core.models.triggers.Trigger;
|
||||||
import io.kestra.plugin.core.trigger.Schedule;
|
import io.kestra.plugin.core.trigger.Schedule;
|
||||||
import io.kestra.core.runners.FlowListeners;
|
import io.kestra.core.runners.FlowListeners;
|
||||||
import io.kestra.core.runners.TestMethodScopedWorker;
|
|
||||||
import io.kestra.core.runners.Worker;
|
import io.kestra.core.runners.Worker;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -18,13 +17,13 @@ import java.time.ZonedDateTime;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.*;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
|
|
||||||
class SchedulerConditionTest extends AbstractSchedulerTest {
|
class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||||
@Inject
|
@Inject
|
||||||
@@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
private static Flow createScheduleFlow() {
|
private static Flow createScheduleFlow() {
|
||||||
Schedule schedule = Schedule.builder()
|
Schedule schedule = Schedule.builder()
|
||||||
.id("hourly")
|
.id("hourly")
|
||||||
@@ -58,6 +60,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
|||||||
void schedule() throws Exception {
|
void schedule() throws Exception {
|
||||||
// mock flow listeners
|
// mock flow listeners
|
||||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||||
|
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
|
||||||
CountDownLatch queueCount = new CountDownLatch(4);
|
CountDownLatch queueCount = new CountDownLatch(4);
|
||||||
|
|
||||||
Flow flow = createScheduleFlow();
|
Flow flow = createScheduleFlow();
|
||||||
@@ -65,7 +68,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
|||||||
triggerState.create(Trigger.builder()
|
triggerState.create(Trigger.builder()
|
||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowId(flow.getId())
|
.flowId(flow.getId())
|
||||||
.flowRevision(flow.getRevision())
|
|
||||||
.triggerId("hourly")
|
.triggerId("hourly")
|
||||||
.date(ZonedDateTime.parse("2021-09-06T02:00:00+01:00[Europe/Paris]"))
|
.date(ZonedDateTime.parse("2021-09-06T02:00:00+01:00[Europe/Paris]"))
|
||||||
.build()
|
.build()
|
||||||
@@ -75,12 +77,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
|||||||
.when(flowListenersServiceSpy)
|
.when(flowListenersServiceSpy)
|
||||||
.flows();
|
.flows();
|
||||||
|
|
||||||
|
// mock the backfill execution is ended
|
||||||
|
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||||
|
.when(executionRepositorySpy)
|
||||||
|
.findById(any(), any());
|
||||||
|
|
||||||
|
// start the worker as it execute polling triggers
|
||||||
|
Worker worker = new Worker(applicationContext, 8, null);
|
||||||
|
worker.run();
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = new DefaultScheduler(
|
try (AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy,
|
flowListenersServiceSpy,
|
||||||
triggerState);
|
executionRepositorySpy,
|
||||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
|
triggerState
|
||||||
|
)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
|
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
@@ -97,8 +109,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
|||||||
|
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
queueCount.await(15, TimeUnit.SECONDS);
|
queueCount.await(15, TimeUnit.SECONDS);
|
||||||
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
|
|
||||||
assertionStop.run();
|
|
||||||
|
|
||||||
assertThat(queueCount.getCount(), is(0L));
|
assertThat(queueCount.getCount(), is(0L));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
private SchedulerTriggerStateInterface triggerState;
|
private SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private SchedulerExecutionState schedulerExecutionState;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private FlowListeners flowListenersService;
|
private FlowListeners flowListenersService;
|
||||||
|
|
||||||
@@ -188,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
|||||||
return new DefaultScheduler(
|
return new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy,
|
flowListenersServiceSpy,
|
||||||
|
schedulerExecutionState,
|
||||||
triggerState
|
triggerState
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||||
protected QueueInterface<LogEntry> logQueue;
|
protected QueueInterface<LogEntry> logQueue;
|
||||||
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
.truncatedTo(ChronoUnit.HOURS);
|
.truncatedTo(ChronoUnit.HOURS);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||||
return new DefaultScheduler(
|
return new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy,
|
flowListenersServiceSpy,
|
||||||
|
executionStateSpy,
|
||||||
triggerState
|
triggerState
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
void schedule() throws Exception {
|
void schedule() throws Exception {
|
||||||
// mock flow listeners
|
// mock flow listeners
|
||||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||||
|
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
|
||||||
CountDownLatch queueCount = new CountDownLatch(6);
|
CountDownLatch queueCount = new CountDownLatch(6);
|
||||||
CountDownLatch invalidLogCount = new CountDownLatch(1);
|
CountDownLatch invalidLogCount = new CountDownLatch(1);
|
||||||
Set<String> date = new HashSet<>();
|
Set<String> date = new HashSet<>();
|
||||||
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(either -> {
|
Runnable assertionStop = executionQueue.receive(either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
triggerState.create(trigger);
|
triggerState.create(trigger);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
Await.until(() -> {
|
Await.until(() -> {
|
||||||
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
CountDownLatch queueCount = new CountDownLatch(1);
|
CountDownLatch queueCount = new CountDownLatch(1);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(either -> {
|
Runnable assertionStop = executionQueue.receive(either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
CountDownLatch queueCount = new CountDownLatch(1);
|
CountDownLatch queueCount = new CountDownLatch(1);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(either -> {
|
Runnable assertionStop = executionQueue.receive(either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
triggerState.create(lastTrigger);
|
triggerState.create(lastTrigger);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
|
||||||
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
Await.until(() -> {
|
Await.until(() -> {
|
||||||
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
triggerState.create(trigger);
|
triggerState.create(trigger);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
scheduler.run();
|
scheduler.run();
|
||||||
|
|
||||||
// Wait 3s to see if things happen
|
// Wait 3s to see if things happen
|
||||||
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
CountDownLatch queueCount = new CountDownLatch(2);
|
CountDownLatch queueCount = new CountDownLatch(2);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(either -> {
|
Runnable assertionStop = executionQueue.receive(either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
|||||||
CountDownLatch queueCount = new CountDownLatch(1);
|
CountDownLatch queueCount = new CountDownLatch(1);
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
|
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||||
// wait for execution
|
// wait for execution
|
||||||
Runnable assertionStop = executionQueue.receive(either -> {
|
Runnable assertionStop = executionQueue.receive(either -> {
|
||||||
Execution execution = either.getLeft();
|
Execution execution = either.getLeft();
|
||||||
|
|||||||
@@ -42,6 +42,9 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
private static Flow createFlow(Boolean failed) {
|
private static Flow createFlow(Boolean failed) {
|
||||||
RealtimeUnitTest schedule = RealtimeUnitTest.builder()
|
RealtimeUnitTest schedule = RealtimeUnitTest.builder()
|
||||||
.id("stream")
|
.id("stream")
|
||||||
@@ -75,6 +78,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
|
|||||||
AbstractScheduler scheduler = new DefaultScheduler(
|
AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy,
|
flowListenersServiceSpy,
|
||||||
|
executionState,
|
||||||
triggerState
|
triggerState
|
||||||
);
|
);
|
||||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||||
|
|||||||
@@ -16,14 +16,14 @@ import org.junit.jupiter.api.Test;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.*;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
|
|
||||||
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||||
@Inject
|
@Inject
|
||||||
@@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
public static Flow createThreadFlow() {
|
public static Flow createThreadFlow() {
|
||||||
return createThreadFlow(null);
|
return createThreadFlow(null);
|
||||||
}
|
}
|
||||||
@@ -72,17 +75,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
|||||||
|
|
||||||
// mock flow listeners
|
// mock flow listeners
|
||||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||||
|
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
|
||||||
|
|
||||||
doReturn(Collections.singletonList(flow))
|
doReturn(Collections.singletonList(flow))
|
||||||
.when(flowListenersServiceSpy)
|
.when(flowListenersServiceSpy)
|
||||||
.flows();
|
.flows();
|
||||||
|
|
||||||
|
// mock the backfill execution is ended
|
||||||
|
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
|
||||||
|
.when(schedulerExecutionStateSpy)
|
||||||
|
.findById(any(), any());
|
||||||
|
|
||||||
// scheduler
|
// scheduler
|
||||||
try (
|
try (
|
||||||
AbstractScheduler scheduler = new DefaultScheduler(
|
AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy,
|
flowListenersServiceSpy,
|
||||||
|
schedulerExecutionStateSpy,
|
||||||
triggerState
|
triggerState
|
||||||
);
|
);
|
||||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||||
|
|||||||
@@ -52,6 +52,9 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
public static Flow createFlow(Duration sleep) {
|
public static Flow createFlow(Duration sleep) {
|
||||||
SleepTriggerTest schedule = SleepTriggerTest.builder()
|
SleepTriggerTest schedule = SleepTriggerTest.builder()
|
||||||
.id("sleep")
|
.id("sleep")
|
||||||
@@ -101,6 +104,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
|||||||
AbstractScheduler scheduler = new DefaultScheduler(
|
AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersService,
|
flowListenersService,
|
||||||
|
executionState,
|
||||||
triggerState
|
triggerState
|
||||||
);
|
);
|
||||||
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ public abstract class SchedulerTriggerStateInterfaceTest {
|
|||||||
return Trigger.builder()
|
return Trigger.builder()
|
||||||
.flowId(IdUtils.create())
|
.flowId(IdUtils.create())
|
||||||
.namespace("io.kestra.unittest")
|
.namespace("io.kestra.unittest")
|
||||||
.flowRevision(1)
|
|
||||||
.triggerId(IdUtils.create())
|
.triggerId(IdUtils.create())
|
||||||
.executionId(IdUtils.create())
|
.executionId(IdUtils.create())
|
||||||
.date(ZonedDateTime.now());
|
.date(ZonedDateTime.now());
|
||||||
|
|||||||
@@ -123,10 +123,10 @@ class YamlFlowParserTest {
|
|||||||
void inputs() {
|
void inputs() {
|
||||||
Flow flow = this.parse("flows/valids/inputs.yaml");
|
Flow flow = this.parse("flows/valids/inputs.yaml");
|
||||||
|
|
||||||
assertThat(flow.getInputs().size(), is(27));
|
assertThat(flow.getInputs().size(), is(28));
|
||||||
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(9L));
|
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(10L));
|
||||||
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
|
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
|
||||||
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(1L));
|
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(2L));
|
||||||
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
|
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
|||||||
import io.kestra.core.models.executions.TaskRun;
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -16,6 +17,14 @@ class SkipExecutionServiceTest {
|
|||||||
@Inject
|
@Inject
|
||||||
private SkipExecutionService skipExecutionService;
|
private SkipExecutionService skipExecutionService;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void resetAll() {
|
||||||
|
skipExecutionService.setSkipExecutions(null);
|
||||||
|
skipExecutionService.setSkipFlows(null);
|
||||||
|
skipExecutionService.setSkipNamespaces(null);
|
||||||
|
skipExecutionService.setSkipTenants(null);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void skipExecutionByExecutionId() {
|
void skipExecutionByExecutionId() {
|
||||||
var executionToSkip = "aaabbbccc";
|
var executionToSkip = "aaabbbccc";
|
||||||
@@ -65,4 +74,25 @@ class SkipExecutionServiceTest {
|
|||||||
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
|
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
|
||||||
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
|
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void skipExecutionByNamespace() {
|
||||||
|
skipExecutionService.setSkipNamespaces(List.of("tenant|namespace"));
|
||||||
|
|
||||||
|
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
|
||||||
|
assertThat(skipExecutionService.skipExecution(null, "namespace", "someFlow", "someExecution"), is(false));
|
||||||
|
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
|
||||||
|
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "anotherFlow", "anotherExecution"), is(true));
|
||||||
|
assertThat(skipExecutionService.skipExecution("tenant", "other.namespace", "someFlow", "someExecution"), is(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void skipExecutionByTenantId() {
|
||||||
|
skipExecutionService.setSkipTenants(List.of("tenant"));
|
||||||
|
|
||||||
|
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
|
||||||
|
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
|
||||||
|
assertThat(skipExecutionService.skipExecution("tenant", "another.namespace", "someFlow", "someExecution"), is(true));
|
||||||
|
assertThat(skipExecutionService.skipExecution("anotherTenant", "another.namespace", "someFlow", "someExecution"), is(false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -6,22 +6,23 @@ import io.kestra.core.models.executions.Execution;
|
|||||||
import io.kestra.core.models.executions.TaskRun;
|
import io.kestra.core.models.executions.TaskRun;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||||
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
import io.kestra.core.runners.AbstractMemoryRunnerTest;
|
||||||
|
import io.kestra.core.runners.RunContextFactory;
|
||||||
import io.kestra.core.runners.RunnerUtils;
|
import io.kestra.core.runners.RunnerUtils;
|
||||||
import io.kestra.core.storages.InternalStorage;
|
import io.kestra.core.storages.InternalStorage;
|
||||||
import io.kestra.core.storages.StorageContext;
|
import io.kestra.core.storages.StorageContext;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
|
import io.kestra.core.utils.IdUtils;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junitpioneer.jupiter.RetryingTest;
|
import org.junitpioneer.jupiter.RetryingTest;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.file.Files;
|
import java.security.GeneralSecurityException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -30,7 +31,6 @@ import java.util.concurrent.TimeoutException;
|
|||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.hamcrest.io.FileMatchers.anExistingFile;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@@ -38,6 +38,9 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
Suite suite;
|
Suite suite;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
RunContextFactory runContextFactory;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void success() throws TimeoutException {
|
void success() throws TimeoutException {
|
||||||
suite.success(runnerUtils);
|
suite.success(runnerUtils);
|
||||||
@@ -83,6 +86,11 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
|||||||
suite.outputFiles(runnerUtils);
|
suite.outputFiles(runnerUtils);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void encryption() throws Exception {
|
||||||
|
suite.encryption(runnerUtils, runContextFactory);
|
||||||
|
}
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public static class Suite {
|
public static class Suite {
|
||||||
@Inject
|
@Inject
|
||||||
@@ -154,8 +162,15 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
|||||||
storageContext
|
storageContext
|
||||||
, storageInterface
|
, storageInterface
|
||||||
);
|
);
|
||||||
URI fileURI = URI.create("kestra:" + storageContext.getContextStorageURI() + "/input.txt");
|
|
||||||
assertThat(new String(storage.getFile(fileURI).readAllBytes()), is("Hello World"));
|
TaskRun taskRun = execution.getTaskRunList().get(1);
|
||||||
|
Map<String, Object> outputs = taskRun.getOutputs();
|
||||||
|
assertThat(outputs, hasKey("uris"));
|
||||||
|
|
||||||
|
URI uri = URI.create(((Map<String, String>) outputs.get("uris")).get("input.txt"));
|
||||||
|
|
||||||
|
assertTrue(uri.toString().endsWith("input.txt"));
|
||||||
|
assertThat(new String(storage.getFile(uri).readAllBytes()), is("Hello World"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -236,6 +251,18 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
|
|||||||
assertThat(execution.findTaskRunsByTaskId("t3").get(0).getOutputs().get("value"), is("third"));
|
assertThat(execution.findTaskRunsByTaskId("t3").get(0).getOutputs().get("value"), is("third"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void encryption(RunnerUtils runnerUtils, RunContextFactory runContextFactory) throws TimeoutException, GeneralSecurityException {
|
||||||
|
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-taskrun-encrypted");
|
||||||
|
|
||||||
|
assertThat(execution.getTaskRunList(), hasSize(3));
|
||||||
|
Map<String, Object> encryptedString = (Map<String, Object>) execution.findTaskRunsByTaskId("encrypted").get(0).getOutputs().get("value");
|
||||||
|
assertThat(encryptedString.get("type"), is(EncryptedString.TYPE));
|
||||||
|
String encryptedValue = (String) encryptedString.get("value");
|
||||||
|
assertThat(encryptedValue, is(not("Hello World")));
|
||||||
|
assertThat(runContextFactory.of().decrypt(encryptedValue), is("Hello World"));
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("decrypted").get(0).getOutputs().get("value"), is("Hello World"));
|
||||||
|
}
|
||||||
|
|
||||||
private void put(String path, String content) throws IOException {
|
private void put(String path, String content) throws IOException {
|
||||||
storageInterface.put(
|
storageInterface.put(
|
||||||
null,
|
null,
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||||||
|
|
||||||
@MicronautTest
|
@MicronautTest
|
||||||
class DownloadTest {
|
class DownloadTest {
|
||||||
public static final String FILE = "http://speedtest.ftp.otenet.gr/files/test1Mb.db";
|
public static final String FILE = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
|
||||||
@Inject
|
@Inject
|
||||||
private RunContextFactory runContextFactory;
|
private RunContextFactory runContextFactory;
|
||||||
|
|
||||||
@@ -56,11 +56,11 @@ class DownloadTest {
|
|||||||
IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8),
|
IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8),
|
||||||
is(IOUtils.toString(new URI(FILE).toURL().openStream(), StandardCharsets.UTF_8))
|
is(IOUtils.toString(new URI(FILE).toURL().openStream(), StandardCharsets.UTF_8))
|
||||||
);
|
);
|
||||||
assertThat(output.getUri().toString(), endsWith(".db"));
|
assertThat(output.getUri().toString(), endsWith(".csv"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void noResponse() throws Exception {
|
void noResponse() {
|
||||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||||
embeddedServer.start();
|
embeddedServer.start();
|
||||||
|
|
||||||
@@ -100,7 +100,7 @@ class DownloadTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void error() throws Exception {
|
void error() {
|
||||||
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
|
||||||
embeddedServer.start();
|
embeddedServer.start();
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import io.micronaut.context.ApplicationContext;
|
|||||||
import io.micronaut.http.*;
|
import io.micronaut.http.*;
|
||||||
import io.micronaut.http.annotation.Controller;
|
import io.micronaut.http.annotation.Controller;
|
||||||
import io.micronaut.http.annotation.Get;
|
import io.micronaut.http.annotation.Get;
|
||||||
|
import io.micronaut.http.annotation.Head;
|
||||||
import io.micronaut.http.annotation.Post;
|
import io.micronaut.http.annotation.Post;
|
||||||
import io.micronaut.http.multipart.StreamingFileUpload;
|
import io.micronaut.http.multipart.StreamingFileUpload;
|
||||||
import io.micronaut.runtime.server.EmbeddedServer;
|
import io.micronaut.runtime.server.EmbeddedServer;
|
||||||
@@ -67,7 +68,7 @@ class RequestTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void head() throws Exception {
|
void head() throws Exception {
|
||||||
final String url = "http://speedtest.ftp.otenet.gr/files/test100Mb.db";
|
final String url = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
|
||||||
|
|
||||||
Request task = Request.builder()
|
Request task = Request.builder()
|
||||||
.id(RequestTest.class.getSimpleName())
|
.id(RequestTest.class.getSimpleName())
|
||||||
@@ -81,7 +82,7 @@ class RequestTest {
|
|||||||
Request.Output output = task.run(runContext);
|
Request.Output output = task.run(runContext);
|
||||||
|
|
||||||
assertThat(output.getUri(), is(URI.create(url)));
|
assertThat(output.getUri(), is(URI.create(url)));
|
||||||
assertThat(output.getHeaders().get("Content-Length").get(0), is("104857600"));
|
assertThat(output.getHeaders().get("content-length").getFirst(), is("512789"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -260,6 +261,11 @@ class RequestTest {
|
|||||||
return HttpResponse.ok("{ \"hello\": \"world\" }");
|
return HttpResponse.ok("{ \"hello\": \"world\" }");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Head("/hello")
|
||||||
|
HttpResponse<String> head() {
|
||||||
|
return HttpResponse.ok();
|
||||||
|
}
|
||||||
|
|
||||||
@Get("/hello417")
|
@Get("/hello417")
|
||||||
HttpResponse<String> hello417() {
|
HttpResponse<String> hello417() {
|
||||||
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("{ \"hello\": \"world\" }");
|
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("{ \"hello\": \"world\" }");
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
|||||||
import io.kestra.core.runners.Worker;
|
import io.kestra.core.runners.Worker;
|
||||||
import io.kestra.core.schedulers.AbstractScheduler;
|
import io.kestra.core.schedulers.AbstractScheduler;
|
||||||
import io.kestra.core.schedulers.DefaultScheduler;
|
import io.kestra.core.schedulers.DefaultScheduler;
|
||||||
|
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||||
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
|
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
|
||||||
import io.kestra.core.services.FlowListenersInterface;
|
import io.kestra.core.services.FlowListenersInterface;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
@@ -31,6 +32,9 @@ class TriggerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
private SchedulerTriggerStateInterface triggerState;
|
private SchedulerTriggerStateInterface triggerState;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
protected SchedulerExecutionStateInterface executionState;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private FlowListenersInterface flowListenersService;
|
private FlowListenersInterface flowListenersService;
|
||||||
|
|
||||||
@@ -51,6 +55,7 @@ class TriggerTest {
|
|||||||
AbstractScheduler scheduler = new DefaultScheduler(
|
AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
this.applicationContext,
|
this.applicationContext,
|
||||||
this.flowListenersService,
|
this.flowListenersService,
|
||||||
|
this.executionState,
|
||||||
this.triggerState
|
this.triggerState
|
||||||
);
|
);
|
||||||
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
|
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
|
||||||
@@ -89,6 +94,7 @@ class TriggerTest {
|
|||||||
AbstractScheduler scheduler = new DefaultScheduler(
|
AbstractScheduler scheduler = new DefaultScheduler(
|
||||||
this.applicationContext,
|
this.applicationContext,
|
||||||
this.flowListenersService,
|
this.flowListenersService,
|
||||||
|
this.executionState,
|
||||||
this.triggerState
|
this.triggerState
|
||||||
);
|
);
|
||||||
) {
|
) {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import java.util.Optional;
|
|||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.hasItem;
|
import static org.hamcrest.Matchers.hasItem;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
@MicronautTest
|
@MicronautTest
|
||||||
@@ -109,4 +110,49 @@ class FlowTest {
|
|||||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||||
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void success_withLabels() {
|
||||||
|
var flow = io.kestra.core.models.flows.Flow.builder()
|
||||||
|
.id("flow-with-flow-trigger")
|
||||||
|
.namespace("io.kestra.unittest")
|
||||||
|
.revision(1)
|
||||||
|
.labels(List.of(
|
||||||
|
new Label("flow-label-1", "flow-label-1"),
|
||||||
|
new Label("flow-label-2", "flow-label-2")
|
||||||
|
))
|
||||||
|
.tasks(Collections.singletonList(Return.builder()
|
||||||
|
.id("test")
|
||||||
|
.type(Return.class.getName())
|
||||||
|
.format("test")
|
||||||
|
.build()))
|
||||||
|
.build();
|
||||||
|
var execution = Execution.builder()
|
||||||
|
.id(IdUtils.create())
|
||||||
|
.namespace("io.kestra.unittest")
|
||||||
|
.flowId("flow-with-flow-trigger")
|
||||||
|
.flowRevision(1)
|
||||||
|
.state(State.of(State.Type.RUNNING, Collections.emptyList()))
|
||||||
|
.build();
|
||||||
|
var flowTrigger = Flow.builder()
|
||||||
|
.id("flow")
|
||||||
|
.type(Flow.class.getName())
|
||||||
|
.labels(List.of(
|
||||||
|
new Label("trigger-label-1", "trigger-label-1"),
|
||||||
|
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||||
|
new Label("trigger-label-3", "{{ null }}"), // should return an empty string
|
||||||
|
new Label("trigger-label-4", "{{ foobar }}") // should fail
|
||||||
|
))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Optional<Execution> evaluate = flowTrigger.evaluate(runContextFactory.of(), flow, execution);
|
||||||
|
|
||||||
|
assertThat(evaluate.isPresent(), is(true));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasSize(5));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,7 +65,6 @@ class ScheduleTest {
|
|||||||
return TriggerContext.builder()
|
return TriggerContext.builder()
|
||||||
.namespace(flow.getNamespace())
|
.namespace(flow.getNamespace())
|
||||||
.flowId(flow.getNamespace())
|
.flowId(flow.getNamespace())
|
||||||
.flowRevision(flow.getRevision())
|
|
||||||
.triggerId(schedule.getId())
|
.triggerId(schedule.getId())
|
||||||
.date(date)
|
.date(date)
|
||||||
.build();
|
.build();
|
||||||
@@ -131,6 +130,35 @@ class ScheduleTest {
|
|||||||
assertThat(inputs.get("input2"), is("default"));
|
assertThat(inputs.get("input2"), is("default"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void success_withLabels() throws Exception {
|
||||||
|
var scheduleTrigger = Schedule.builder()
|
||||||
|
.id("schedule")
|
||||||
|
.cron("0 0 1 * *")
|
||||||
|
.labels(List.of(
|
||||||
|
new Label("trigger-label-1", "trigger-label-1"),
|
||||||
|
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
|
||||||
|
new Label("trigger-label-3", "{{ null }}")
|
||||||
|
))
|
||||||
|
.build();
|
||||||
|
var conditionContext = conditionContext(scheduleTrigger);
|
||||||
|
var date = ZonedDateTime.now()
|
||||||
|
.withDayOfMonth(1)
|
||||||
|
.withHour(0)
|
||||||
|
.withMinute(0)
|
||||||
|
.withSecond(0)
|
||||||
|
.truncatedTo(ChronoUnit.SECONDS)
|
||||||
|
.minusMonths(1);
|
||||||
|
var triggerContext = triggerContext(date, scheduleTrigger);
|
||||||
|
|
||||||
|
Optional<Execution> evaluate = scheduleTrigger.evaluate(conditionContext, triggerContext);
|
||||||
|
|
||||||
|
assertThat(evaluate.isPresent(), is(true));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
|
||||||
|
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
void everyMinute() throws Exception {
|
void everyMinute() throws Exception {
|
||||||
|
|||||||
@@ -95,6 +95,11 @@ inputs:
|
|||||||
- name: array
|
- name: array
|
||||||
type: ARRAY
|
type: ARRAY
|
||||||
itemType: INT
|
itemType: INT
|
||||||
|
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
|
||||||
|
- name: empty
|
||||||
|
type: STRING
|
||||||
|
defaults: ''
|
||||||
|
required: true
|
||||||
|
|
||||||
tasks:
|
tasks:
|
||||||
- id: string
|
- id: string
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
id: working-directory-taskrun-encrypted
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: workingDir
|
||||||
|
type: io.kestra.plugin.core.flow.WorkingDirectory
|
||||||
|
tasks:
|
||||||
|
- id: encrypted
|
||||||
|
type: io.kestra.core.tasks.test.Encrypted
|
||||||
|
format: "Hello World"
|
||||||
|
- id: decrypted
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "{{outputs.encrypted.value}}"
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
version=0.17.0
|
version=0.17.26
|
||||||
|
|
||||||
jacksonVersion=2.16.2
|
jacksonVersion=2.16.2
|
||||||
micronautVersion=4.4.3
|
micronautVersion=4.4.3
|
||||||
@@ -7,4 +7,4 @@ slf4jVersion=2.0.13
|
|||||||
|
|
||||||
org.gradle.parallel=true
|
org.gradle.parallel=true
|
||||||
org.gradle.caching=true
|
org.gradle.caching=true
|
||||||
org.gradle.priority=low
|
org.gradle.priority=low
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
|
|||||||
|
|
||||||
import io.kestra.core.runners.FlowListeners;
|
import io.kestra.core.runners.FlowListeners;
|
||||||
import io.kestra.core.schedulers.AbstractScheduler;
|
import io.kestra.core.schedulers.AbstractScheduler;
|
||||||
|
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||||
|
|
||||||
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
|
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
|
||||||
@Override
|
@Override
|
||||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||||
return new JdbcScheduler(
|
return new JdbcScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy
|
flowListenersServiceSpy
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
datasources:
|
datasources:
|
||||||
h2:
|
h2:
|
||||||
url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
|
url: jdbc:h2:mem:public;TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
|
||||||
username: sa
|
username: sa
|
||||||
password: ""
|
password: ""
|
||||||
driverClassName: org.h2.Driver
|
driverClassName: org.h2.Driver
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
|
|||||||
|
|
||||||
import io.kestra.core.runners.FlowListeners;
|
import io.kestra.core.runners.FlowListeners;
|
||||||
import io.kestra.core.schedulers.AbstractScheduler;
|
import io.kestra.core.schedulers.AbstractScheduler;
|
||||||
|
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||||
|
|
||||||
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||||
@Override
|
@Override
|
||||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||||
return new JdbcScheduler(
|
return new JdbcScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy
|
flowListenersServiceSpy
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package io.kestra.repository.postgres;
|
package io.kestra.repository.postgres;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import io.kestra.core.queues.QueueService;
|
import io.kestra.core.queues.QueueService;
|
||||||
import io.kestra.core.repositories.ArrayListTotal;
|
import io.kestra.core.repositories.ArrayListTotal;
|
||||||
import io.kestra.jdbc.JdbcMapper;
|
import io.kestra.jdbc.JdbcMapper;
|
||||||
@@ -21,6 +22,7 @@ import org.jooq.Result;
|
|||||||
import org.jooq.SelectConditionStep;
|
import org.jooq.SelectConditionStep;
|
||||||
import org.jooq.impl.DSL;
|
import org.jooq.impl.DSL;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
@@ -52,12 +54,10 @@ public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository
|
|||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@Override
|
@Override
|
||||||
public Map<Field<Object>, Object> persistFields(T entity) {
|
public Map<Field<Object>, Object> persistFields(T entity) {
|
||||||
Map<Field<Object>, Object> fields = super.persistFields(entity);
|
|
||||||
|
|
||||||
String json = JdbcMapper.of().writeValueAsString(entity);
|
String json = JdbcMapper.of().writeValueAsString(entity);
|
||||||
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));
|
return new HashMap<>(ImmutableMap
|
||||||
|
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)))
|
||||||
return fields;
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
|
|||||||
@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
|
|||||||
|
|
||||||
import io.kestra.core.runners.FlowListeners;
|
import io.kestra.core.runners.FlowListeners;
|
||||||
import io.kestra.core.schedulers.AbstractScheduler;
|
import io.kestra.core.schedulers.AbstractScheduler;
|
||||||
|
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
|
||||||
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
import io.kestra.core.schedulers.SchedulerScheduleTest;
|
||||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||||
|
|
||||||
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
|
||||||
@Override
|
@Override
|
||||||
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
|
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
|
||||||
return new JdbcScheduler(
|
return new JdbcScheduler(
|
||||||
applicationContext,
|
applicationContext,
|
||||||
flowListenersServiceSpy
|
flowListenersServiceSpy
|
||||||
|
|||||||
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
|
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
|
||||||
|
private static final int FETCH_SIZE = 100;
|
||||||
|
|
||||||
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
|
||||||
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
|
||||||
private final ApplicationContext applicationContext;
|
private final ApplicationContext applicationContext;
|
||||||
@@ -110,10 +112,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
.where(this.defaultFilter(tenantId))
|
.where(this.defaultFilter(tenantId))
|
||||||
.and(field("trigger_execution_id").eq(triggerExecutionId));
|
.and(field("trigger_execution_id").eq(triggerExecutionId));
|
||||||
|
|
||||||
select.fetch()
|
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||||
.map(this.jdbcRepository::map)
|
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||||
.forEach(emitter::next);
|
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||||
emitter.complete();
|
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||||
|
} finally {
|
||||||
|
emitter.complete();
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
FluxSink.OverflowStrategy.BUFFER
|
FluxSink.OverflowStrategy.BUFFER
|
||||||
);
|
);
|
||||||
@@ -172,7 +177,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
state,
|
state,
|
||||||
labels,
|
labels,
|
||||||
triggerExecutionId,
|
triggerExecutionId,
|
||||||
childFilter
|
childFilter,
|
||||||
|
false
|
||||||
);
|
);
|
||||||
|
|
||||||
return this.jdbcRepository.fetchPage(context, select, pageable);
|
return this.jdbcRepository.fetchPage(context, select, pageable);
|
||||||
@@ -190,7 +196,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
@Nullable List<State.Type> state,
|
@Nullable List<State.Type> state,
|
||||||
@Nullable Map<String, String> labels,
|
@Nullable Map<String, String> labels,
|
||||||
@Nullable String triggerExecutionId,
|
@Nullable String triggerExecutionId,
|
||||||
@Nullable ChildFilter childFilter
|
@Nullable ChildFilter childFilter,
|
||||||
|
boolean deleted
|
||||||
) {
|
) {
|
||||||
return Flux.create(
|
return Flux.create(
|
||||||
emitter -> this.jdbcRepository
|
emitter -> this.jdbcRepository
|
||||||
@@ -209,14 +216,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
state,
|
state,
|
||||||
labels,
|
labels,
|
||||||
triggerExecutionId,
|
triggerExecutionId,
|
||||||
childFilter
|
childFilter,
|
||||||
|
deleted
|
||||||
);
|
);
|
||||||
|
|
||||||
select.fetch()
|
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
|
||||||
.map(this.jdbcRepository::map)
|
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
|
||||||
.forEach(emitter::next);
|
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
|
||||||
|
stream.map(this.jdbcRepository::map).forEach(emitter::next);
|
||||||
emitter.complete();
|
} finally {
|
||||||
|
emitter.complete();
|
||||||
|
}
|
||||||
}),
|
}),
|
||||||
FluxSink.OverflowStrategy.BUFFER
|
FluxSink.OverflowStrategy.BUFFER
|
||||||
);
|
);
|
||||||
@@ -233,7 +243,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
@Nullable List<State.Type> state,
|
@Nullable List<State.Type> state,
|
||||||
@Nullable Map<String, String> labels,
|
@Nullable Map<String, String> labels,
|
||||||
@Nullable String triggerExecutionId,
|
@Nullable String triggerExecutionId,
|
||||||
@Nullable ChildFilter childFilter
|
@Nullable ChildFilter childFilter,
|
||||||
|
boolean deleted
|
||||||
) {
|
) {
|
||||||
SelectConditionStep<Record1<Object>> select = context
|
SelectConditionStep<Record1<Object>> select = context
|
||||||
.select(
|
.select(
|
||||||
@@ -241,7 +252,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
)
|
)
|
||||||
.hint(context.configuration().dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
|
.hint(context.configuration().dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
|
||||||
.from(this.jdbcRepository.getTable())
|
.from(this.jdbcRepository.getTable())
|
||||||
.where(this.defaultFilter(tenantId));
|
.where(this.defaultFilter(tenantId, deleted));
|
||||||
|
|
||||||
select = filteringQuery(select, namespace, flowId, null, query, labels, triggerExecutionId, childFilter);
|
select = filteringQuery(select, namespace, flowId, null, query, labels, triggerExecutionId, childFilter);
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,11 @@ public abstract class AbstractJdbcRepository {
|
|||||||
return tenant.and(field("deleted", Boolean.class).eq(false));
|
return tenant.and(field("deleted", Boolean.class).eq(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Condition defaultFilter(String tenantId, Boolean allowDeleted) {
|
||||||
|
var tenant = buildTenantCondition(tenantId);
|
||||||
|
return allowDeleted ? tenant : tenant.and(field("deleted", Boolean.class).eq(false));
|
||||||
|
}
|
||||||
|
|
||||||
protected Condition buildTenantCondition(String tenantId) {
|
protected Condition buildTenantCondition(String tenantId) {
|
||||||
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
|
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
|
|||||||
Trigger current = optionalTrigger.get();
|
Trigger current = optionalTrigger.get();
|
||||||
current = current.toBuilder()
|
current = current.toBuilder()
|
||||||
.executionId(trigger.getExecutionId())
|
.executionId(trigger.getExecutionId())
|
||||||
.executionCurrentState(trigger.getExecutionCurrentState())
|
|
||||||
.updatedDate(trigger.getUpdatedDate())
|
.updatedDate(trigger.getUpdatedDate())
|
||||||
.build();
|
.build();
|
||||||
this.save(context, current);
|
this.save(context, current);
|
||||||
|
|||||||
@@ -706,6 +706,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
.executionId(killedExecution.getExecutionId())
|
.executionId(killedExecution.getExecutionId())
|
||||||
.isOnKillCascade(false)
|
.isOnKillCascade(false)
|
||||||
.state(ExecutionKilled.State.EXECUTED)
|
.state(ExecutionKilled.State.EXECUTED)
|
||||||
|
.tenantId(killedExecution.getTenantId())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
|
|||||||
public class JdbcScheduler extends AbstractScheduler {
|
public class JdbcScheduler extends AbstractScheduler {
|
||||||
private final QueueInterface<Execution> executionQueue;
|
private final QueueInterface<Execution> executionQueue;
|
||||||
private final TriggerRepositoryInterface triggerRepository;
|
private final TriggerRepositoryInterface triggerRepository;
|
||||||
private final ConditionService conditionService;
|
|
||||||
|
|
||||||
private final FlowRepositoryInterface flowRepository;
|
private final FlowRepositoryInterface flowRepository;
|
||||||
private final JooqDSLContextWrapper dslContextWrapper;
|
private final JooqDSLContextWrapper dslContextWrapper;
|
||||||
|
private final ConditionService conditionService;
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
|||||||
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
|
||||||
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
|
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
|
||||||
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
|
||||||
|
executionState = applicationContext.getBean(SchedulerExecutionState.class);
|
||||||
conditionService = applicationContext.getBean(ConditionService.class);
|
conditionService = applicationContext.getBean(ConditionService.class);
|
||||||
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||||
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
|
||||||
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
|||||||
public void run() {
|
public void run() {
|
||||||
super.run();
|
super.run();
|
||||||
|
|
||||||
|
// reset scheduler trigger at end
|
||||||
executionQueue.receive(
|
executionQueue.receive(
|
||||||
Scheduler.class,
|
Scheduler.class,
|
||||||
either -> {
|
either -> {
|
||||||
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
|
|||||||
.ifPresent(trigger -> {
|
.ifPresent(trigger -> {
|
||||||
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
|
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
// update execution state on each state change so the scheduler knows the execution is running
|
|
||||||
triggerRepository
|
|
||||||
.findByExecution(execution)
|
|
||||||
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
|
|
||||||
.ifPresent(trigger -> {
|
|
||||||
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
|
|||||||
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
|
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
|
||||||
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
|
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
|
||||||
|
|
||||||
schedulerContext.startTransaction(scheduleContextInterface -> {
|
schedulerContext.doInTransaction(scheduleContextInterface -> {
|
||||||
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
|
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
|
||||||
|
|
||||||
consumer.accept(triggers, scheduleContextInterface);
|
consumer.accept(triggers, scheduleContextInterface);
|
||||||
|
|||||||
@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
|
|||||||
this.dslContextWrapper = dslContextWrapper;
|
this.dslContextWrapper = dslContextWrapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
|
@Override
|
||||||
|
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
|
||||||
this.dslContextWrapper.transaction(configuration -> {
|
this.dslContextWrapper.transaction(configuration -> {
|
||||||
this.context = DSL.using(configuration);
|
this.context = DSL.using(configuration);
|
||||||
|
|
||||||
consumer.accept(this);
|
consumer.accept(this);
|
||||||
|
|
||||||
this.commit();
|
this.context.commit();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commit() {
|
|
||||||
this.context.commit();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
|||||||
return trigger;
|
return trigger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Trigger create(Trigger trigger, String headerContent) {
|
||||||
|
return this.triggerRepository.create(trigger);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
|
||||||
|
this.triggerRepository.save(trigger, scheduleContextInterface);
|
||||||
|
|
||||||
|
return trigger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Trigger create(Trigger trigger) {
|
public Trigger create(Trigger trigger) {
|
||||||
|
|
||||||
@@ -84,7 +96,4 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
|
|||||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unlock(Trigger trigger) {}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,11 @@ public class MemoryExecutionRepository implements ExecutionRepositoryInterface {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<Execution> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> state, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter, boolean allowDeleted) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ArrayListTotal<TaskRun> findTaskRun(Pageable pageable, @Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> states, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter) {
|
public ArrayListTotal<TaskRun> findTaskRun(Pageable pageable, @Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> states, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|||||||
@@ -56,6 +56,22 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
|
|||||||
return trigger;
|
return trigger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
|
||||||
|
triggers.put(trigger.uid(), trigger);
|
||||||
|
triggerQueue.emit(trigger);
|
||||||
|
|
||||||
|
return trigger;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Trigger create(Trigger trigger, String headerContent) {
|
||||||
|
triggers.put(trigger.uid(), trigger);
|
||||||
|
triggerQueue.emit(trigger);
|
||||||
|
|
||||||
|
return trigger;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Trigger update(Trigger trigger) {
|
public Trigger update(Trigger trigger) {
|
||||||
triggers.put(trigger.uid(), trigger);
|
triggers.put(trigger.uid(), trigger);
|
||||||
@@ -79,7 +95,4 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
|
|||||||
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unlock(Trigger trigger) {}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,10 +131,10 @@ public class CommandsWrapper implements TaskCommands {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public ScriptOutput run() throws Exception {
|
public ScriptOutput run() throws Exception {
|
||||||
List<String> filesToUpload = new ArrayList<>();
|
List<String> filesToUpload = new ArrayList<>();
|
||||||
if (this.namespaceFiles != null) {
|
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
|
||||||
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
|
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
|
||||||
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
|
|
||||||
|
|
||||||
|
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
|
||||||
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
|
||||||
List<URI> injectedFiles = namespaceFilesService.inject(
|
List<URI> injectedFiles = namespaceFilesService.inject(
|
||||||
runContext,
|
runContext,
|
||||||
|
|||||||
@@ -7,6 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
|
|||||||
class DockerTest extends AbstractTaskRunnerTest {
|
class DockerTest extends AbstractTaskRunnerTest {
|
||||||
@Override
|
@Override
|
||||||
protected TaskRunner taskRunner() {
|
protected TaskRunner taskRunner() {
|
||||||
return Docker.builder().image("centos").build();
|
return Docker.builder().image("rockylinux:9.3-minimal").build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,7 +78,7 @@ public class LocalStorage implements StorageInterface {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
|
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
|
||||||
uris.add(URI.create(file.toString()));
|
uris.add(URI.create(file.toString().replace("\\", "/")));
|
||||||
return FileVisitResult.CONTINUE;
|
return FileVisitResult.CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ public class LocalStorage implements StorageInterface {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
URI fsPathUri = URI.create(fsPath.toString());
|
URI fsPathUri = URI.create(fsPath.toString().replace("\\", "/"));
|
||||||
return uris.stream().sorted(Comparator.reverseOrder())
|
return uris.stream().sorted(Comparator.reverseOrder())
|
||||||
.map(fsPathUri::relativize)
|
.map(fsPathUri::relativize)
|
||||||
.map(URI::getPath)
|
.map(URI::getPath)
|
||||||
@@ -115,7 +115,7 @@ public class LocalStorage implements StorageInterface {
|
|||||||
URI relative = URI.create(
|
URI relative = URI.create(
|
||||||
getPath(tenantId, null).relativize(
|
getPath(tenantId, null).relativize(
|
||||||
Path.of(file.toUri())
|
Path.of(file.toUri())
|
||||||
).toString()
|
).toString().replace("\\", "/")
|
||||||
);
|
);
|
||||||
return getAttributes(tenantId, relative);
|
return getAttributes(tenantId, relative);
|
||||||
}))
|
}))
|
||||||
|
|||||||
1884
ui/package-lock.json
generated
1884
ui/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -12,20 +12,20 @@
|
|||||||
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
|
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@kestra-io/ui-libs": "^0.0.47",
|
"@kestra-io/ui-libs": "^0.0.48",
|
||||||
"@vue-flow/background": "^1.3.0",
|
"@vue-flow/background": "^1.3.0",
|
||||||
"@vue-flow/controls": "^1.1.1",
|
"@vue-flow/controls": "^1.1.1",
|
||||||
"@vue-flow/core": "^1.33.6",
|
"@vue-flow/core": "^1.33.6",
|
||||||
"ansi-to-html": "^0.7.2",
|
"ansi-to-html": "^0.7.2",
|
||||||
"axios": "^1.6.8",
|
"axios": "^1.7.2",
|
||||||
"bootstrap": "^5.3.3",
|
"bootstrap": "^5.3.3",
|
||||||
"buffer": "^6.0.3",
|
"buffer": "^6.0.3",
|
||||||
"chart.js": "^4.4.2",
|
"chart.js": "^4.4.3",
|
||||||
"chartjs-chart-treemap": "^2.3.1",
|
"chartjs-chart-treemap": "^2.3.1",
|
||||||
"core-js": "^3.37.0",
|
"core-js": "^3.37.1",
|
||||||
"dagre": "^0.8.5",
|
"dagre": "^0.8.5",
|
||||||
"element-plus": "^2.7.2",
|
"element-plus": "^2.7.5",
|
||||||
"humanize-duration": "^3.32.0",
|
"humanize-duration": "^3.32.1",
|
||||||
"js-yaml": "^4.1.0",
|
"js-yaml": "^4.1.0",
|
||||||
"lodash": "^4.17.21",
|
"lodash": "^4.17.21",
|
||||||
"markdown-it": "^14.1.0",
|
"markdown-it": "^14.1.0",
|
||||||
@@ -40,45 +40,49 @@
|
|||||||
"moment-timezone": "^0.5.45",
|
"moment-timezone": "^0.5.45",
|
||||||
"node-modules-polyfill": "^0.1.4",
|
"node-modules-polyfill": "^0.1.4",
|
||||||
"nprogress": "^0.2.0",
|
"nprogress": "^0.2.0",
|
||||||
"posthog-js": "^1.130.2",
|
"posthog-js": "^1.138.2",
|
||||||
|
"cronstrue": "^2.50.0",
|
||||||
"throttle-debounce": "^5.0.0",
|
"throttle-debounce": "^5.0.0",
|
||||||
"vite-plugin-eslint": "^1.8.1",
|
"vite-plugin-eslint": "^1.8.1",
|
||||||
"vue": "^3.4.26",
|
"vue": "^3.4.27",
|
||||||
"vue-axios": "3.5.2",
|
"vue-axios": "3.5.2",
|
||||||
"vue-chartjs": "^5.3.1",
|
"vue-chartjs": "^5.3.1",
|
||||||
"vue-gtag": "^2.0.1",
|
"vue-gtag": "^2.0.1",
|
||||||
"vue-i18n": "^9.13.1",
|
"vue-i18n": "^9.13.1",
|
||||||
"vue-material-design-icons": "^5.3.0",
|
"vue-material-design-icons": "^5.3.0",
|
||||||
"vue-router": "^4.3.2",
|
"vue-router": "^4.3.2",
|
||||||
"vue-sidebar-menu": "^5.3.1",
|
"vue-sidebar-menu": "^5.4.0",
|
||||||
"vue-virtual-scroller": "^2.0.0-beta.8",
|
"vue-virtual-scroller": "^2.0.0-beta.8",
|
||||||
"vue3-popper": "^1.5.0",
|
"vue3-popper": "^1.5.0",
|
||||||
"vue3-tour": "github:kestra-io/vue3-tour",
|
"vue3-tour": "github:kestra-io/vue3-tour",
|
||||||
"vuex": "^4.1.0",
|
"vuex": "^4.1.0",
|
||||||
"xss": "^1.0.15",
|
"xss": "^1.0.15",
|
||||||
"yaml": "^2.4.2"
|
"yaml": "^2.4.5"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@rushstack/eslint-patch": "^1.10.2",
|
"@rushstack/eslint-patch": "^1.10.3",
|
||||||
"@shikijs/markdown-it": "^1.4.0",
|
"@shikijs/markdown-it": "^1.6.3",
|
||||||
"@typescript-eslint/parser": "^7.8.0",
|
"@typescript-eslint/parser": "^7.12.0",
|
||||||
"@vitejs/plugin-vue": "^5.0.4",
|
"@vitejs/plugin-vue": "^5.0.5",
|
||||||
"@vue/eslint-config-prettier": "^9.0.0",
|
"@vue/eslint-config-prettier": "^9.0.0",
|
||||||
"@vue/test-utils": "^2.4.5",
|
"@vue/test-utils": "^2.4.6",
|
||||||
"decompress": "^4.2.1",
|
"decompress": "^4.2.1",
|
||||||
"eslint": "^8.57.0",
|
"eslint": "^8.57.0",
|
||||||
"eslint-plugin-vue": "^9.25.0",
|
"eslint-plugin-vue": "^9.26.0",
|
||||||
"jsdom": "^24.0.0",
|
"jsdom": "^24.1.0",
|
||||||
"monaco-editor": "^0.48.0",
|
"monaco-editor": "^0.49.0",
|
||||||
"monaco-yaml": "^5.1.1",
|
"monaco-yaml": "^5.2.0",
|
||||||
"prettier": "^3.2.5",
|
"prettier": "^3.3.1",
|
||||||
"rollup-plugin-copy": "^3.5.0",
|
"rollup-plugin-copy": "^3.5.0",
|
||||||
"rollup-plugin-visualizer": "^5.12.0",
|
"rollup-plugin-visualizer": "^5.12.0",
|
||||||
"sass": "^1.76.0",
|
"sass": "^1.77.4",
|
||||||
"typescript": "^5.4.5",
|
"typescript": "^5.4.5",
|
||||||
"vite": "^5.2.11",
|
|
||||||
"vite-plugin-rewrite-all": "1.0.1",
|
"vite-plugin-rewrite-all": "1.0.1",
|
||||||
"vitest": "^1.5.3"
|
"vite": "^5.2.13",
|
||||||
|
"vitest": "^1.6.0"
|
||||||
|
},
|
||||||
|
"optionalDependencies": {
|
||||||
|
"@rollup/rollup-linux-x64-gnu": "4.18.0"
|
||||||
},
|
},
|
||||||
"overrides": {
|
"overrides": {
|
||||||
"bootstrap": {
|
"bootstrap": {
|
||||||
|
|||||||
@@ -209,7 +209,7 @@
|
|||||||
},
|
},
|
||||||
watch: {
|
watch: {
|
||||||
$route(to) {
|
$route(to) {
|
||||||
if (this.user && to.name === "home" && this.overallTotal === 0) {
|
if (to.name === "home" && this.overallTotal === 0) {
|
||||||
this.$router.push({
|
this.$router.push({
|
||||||
name: "welcome",
|
name: "welcome",
|
||||||
params: {
|
params: {
|
||||||
|
|||||||
BIN
ui/src/assets/errors/kestra-error.png
Normal file
BIN
ui/src/assets/errors/kestra-error.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 73 KiB |
File diff suppressed because one or more lines are too long
|
Before Width: | Height: | Size: 4.9 KiB |
@@ -1,6 +1,10 @@
|
|||||||
<template>
|
<template>
|
||||||
<el-tooltip :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
|
<el-tooltip :visible="visible" :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
|
||||||
<template #content v-if="link">
|
<template #content v-if="link">
|
||||||
|
<el-button circle class="ee-tooltip-close" @click="changeVisibility(false)">
|
||||||
|
<Close />
|
||||||
|
</el-button>
|
||||||
|
|
||||||
<p>{{ $t("ee-tooltip.features-blocked") }}</p>
|
<p>{{ $t("ee-tooltip.features-blocked") }}</p>
|
||||||
|
|
||||||
<a
|
<a
|
||||||
@@ -13,7 +17,7 @@
|
|||||||
</a>
|
</a>
|
||||||
</template>
|
</template>
|
||||||
<template #default>
|
<template #default>
|
||||||
<span ref="slot-container">
|
<span ref="slot-container" class="cursor-pointer" @click="changeVisibility()">
|
||||||
<slot />
|
<slot />
|
||||||
<lock v-if="disabled" />
|
<lock v-if="disabled" />
|
||||||
</span>
|
</span>
|
||||||
@@ -22,10 +26,11 @@
|
|||||||
</template>
|
</template>
|
||||||
|
|
||||||
<script>
|
<script>
|
||||||
|
import Close from "vue-material-design-icons/Close.vue";
|
||||||
import Lock from "vue-material-design-icons/Lock.vue";
|
import Lock from "vue-material-design-icons/Lock.vue";
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
components: {Lock},
|
components: {Close, Lock},
|
||||||
props: {
|
props: {
|
||||||
top: {
|
top: {
|
||||||
type: Boolean,
|
type: Boolean,
|
||||||
@@ -48,6 +53,16 @@
|
|||||||
default: undefined
|
default: undefined
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
visible: false,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
methods: {
|
||||||
|
changeVisibility(visible = true) {
|
||||||
|
this.visible = visible
|
||||||
|
}
|
||||||
|
},
|
||||||
computed: {
|
computed: {
|
||||||
link() {
|
link() {
|
||||||
|
|
||||||
@@ -83,5 +98,13 @@
|
|||||||
:deep(.material-design-icon) > .material-design-icon__svg {
|
:deep(.material-design-icon) > .material-design-icon__svg {
|
||||||
bottom: -0.125em;
|
bottom: -0.125em;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.ee-tooltip-close {
|
||||||
|
position: absolute;
|
||||||
|
top: 0;
|
||||||
|
right: 0;
|
||||||
|
border: none;
|
||||||
|
margin: 0.5rem;
|
||||||
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|
||||||
|
|||||||
@@ -77,12 +77,6 @@
|
|||||||
</router-link>
|
</router-link>
|
||||||
</template>
|
</template>
|
||||||
</el-table-column>
|
</el-table-column>
|
||||||
|
|
||||||
<el-table-column :label="$t('state')">
|
|
||||||
<template #default="scope">
|
|
||||||
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
|
|
||||||
</template>
|
|
||||||
</el-table-column>
|
|
||||||
<el-table-column :label="$t('date')">
|
<el-table-column :label="$t('date')">
|
||||||
<template #default="scope">
|
<template #default="scope">
|
||||||
<date-ago :inverted="true" :date="scope.row.date" />
|
<date-ago :inverted="true" :date="scope.row.date" />
|
||||||
@@ -171,7 +165,6 @@
|
|||||||
import RefreshButton from "../layout/RefreshButton.vue";
|
import RefreshButton from "../layout/RefreshButton.vue";
|
||||||
import DateAgo from "../layout/DateAgo.vue";
|
import DateAgo from "../layout/DateAgo.vue";
|
||||||
import Id from "../Id.vue";
|
import Id from "../Id.vue";
|
||||||
import Status from "../Status.vue";
|
|
||||||
import {mapState} from "vuex";
|
import {mapState} from "vuex";
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
@@ -183,7 +176,6 @@
|
|||||||
SearchField,
|
SearchField,
|
||||||
NamespaceSelect,
|
NamespaceSelect,
|
||||||
DateAgo,
|
DateAgo,
|
||||||
Status,
|
|
||||||
Id,
|
Id,
|
||||||
},
|
},
|
||||||
data() {
|
data() {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
<el-form-item
|
<el-form-item
|
||||||
:label="$t('password')"
|
:label="$t('password')"
|
||||||
required
|
required
|
||||||
|
prop="password"
|
||||||
>
|
>
|
||||||
<el-input v-model="form.password" type="password" show-password />
|
<el-input v-model="form.password" type="password" show-password />
|
||||||
</el-form-item>
|
</el-form-item>
|
||||||
@@ -62,6 +63,28 @@
|
|||||||
trigger: ["blur"],
|
trigger: ["blur"],
|
||||||
pattern: "^$|^[a-zA-Z0-9_!#$%&’*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$"
|
pattern: "^$|^[a-zA-Z0-9_!#$%&’*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
validator: (rule, value, callback) => {
|
||||||
|
if (value && value.length > 256) {
|
||||||
|
callback(new Error(this.$t("email length constraint")));
|
||||||
|
} else {
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
trigger: ["blur", "change"]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
password: [
|
||||||
|
{
|
||||||
|
validator: (rule, value, callback) => {
|
||||||
|
if (value && value.length > 256) {
|
||||||
|
callback(new Error(this.$t("password length constraint")));
|
||||||
|
} else {
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
trigger: ["blur", "change"]
|
||||||
|
}
|
||||||
],
|
],
|
||||||
confirmPassword: [
|
confirmPassword: [
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -7,6 +7,10 @@
|
|||||||
<p>
|
<p>
|
||||||
<span v-html="$t('errors.' + code + '.content')" />
|
<span v-html="$t('errors.' + code + '.content')" />
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
|
<el-button tag="router-link" :to="{name: 'home'}" type="primary">
|
||||||
|
{{ $t("back_to_dashboard") }}
|
||||||
|
</el-button>
|
||||||
</section>
|
</section>
|
||||||
</template>
|
</template>
|
||||||
|
|
||||||
@@ -42,19 +46,23 @@
|
|||||||
|
|
||||||
<style lang="scss" scoped>
|
<style lang="scss" scoped>
|
||||||
.errors {
|
.errors {
|
||||||
h2 {
|
margin-top: 10em;
|
||||||
margin-bottom: calc(var(--spacer) * 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
width: 100%;
|
|
||||||
text-align: center;
|
text-align: center;
|
||||||
|
|
||||||
.img {
|
.img {
|
||||||
display: inline-block;
|
background: url("../../assets/errors/kestra-error.png") no-repeat center;
|
||||||
background: url("../../assets/errors/sorry.svg") no-repeat;
|
|
||||||
background-size: contain;
|
background-size: contain;
|
||||||
height: 300px;
|
|
||||||
width: 300px;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h2 {
|
||||||
|
line-height: 30px;
|
||||||
|
font-size: 20px;
|
||||||
|
font-weight: 600;
|
||||||
|
}
|
||||||
|
|
||||||
|
p {
|
||||||
|
line-height: 22px;
|
||||||
|
font-size: 14px;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|||||||
@@ -69,6 +69,14 @@
|
|||||||
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
|
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
|
||||||
this.follow()
|
this.follow()
|
||||||
}
|
}
|
||||||
|
// if we change the execution id, we need to close the sse
|
||||||
|
if (this.$route.params.id != this.execution.id) {
|
||||||
|
this.closeSSE();
|
||||||
|
window.removeEventListener("popstate", this.follow)
|
||||||
|
this.$store.commit("execution/setExecution", undefined);
|
||||||
|
this.$store.commit("flow/setFlow", undefined);
|
||||||
|
this.$store.commit("flow/setFlowGraph", undefined);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
methods: {
|
methods: {
|
||||||
@@ -91,13 +99,16 @@
|
|||||||
}
|
}
|
||||||
// sse.onerror doesnt return the details of the error
|
// sse.onerror doesnt return the details of the error
|
||||||
// but as our emitter can only throw an error on 404
|
// but as our emitter can only throw an error on 404
|
||||||
// we can safely assume that the error
|
// we can safely assume that the error is a 404
|
||||||
|
// if execution is not defined
|
||||||
this.sse.onerror = () => {
|
this.sse.onerror = () => {
|
||||||
this.$store.dispatch("core/showMessage", {
|
if (!this.execution) {
|
||||||
variant: "error",
|
this.$store.dispatch("core/showMessage", {
|
||||||
title: this.$t("error"),
|
variant: "error",
|
||||||
message: this.$t("errors.404.flow or execution"),
|
title: this.$t("error"),
|
||||||
});
|
message: this.$t("errors.404.flow or execution"),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -425,6 +425,8 @@
|
|||||||
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert} from "element-plus";
|
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert} from "element-plus";
|
||||||
import {h, ref} from "vue";
|
import {h, ref} from "vue";
|
||||||
|
|
||||||
|
import {filterLabels} from "./utils"
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
|
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
|
||||||
components: {
|
components: {
|
||||||
@@ -809,6 +811,13 @@
|
|||||||
);
|
);
|
||||||
},
|
},
|
||||||
setLabels() {
|
setLabels() {
|
||||||
|
const filtered = filterLabels(this.executionLabels)
|
||||||
|
|
||||||
|
if(filtered.error) {
|
||||||
|
this.$toast().error(this.$t("wrong labels"))
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.$toast().confirm(
|
this.$toast().confirm(
|
||||||
this.$t("bulk set labels", {"executionCount": this.queryBulkAction ? this.total : this.selection.length}),
|
this.$t("bulk set labels", {"executionCount": this.queryBulkAction ? this.total : this.selection.length}),
|
||||||
() => {
|
() => {
|
||||||
@@ -819,7 +828,7 @@
|
|||||||
sort: this.$route.query.sort || "state.startDate:desc",
|
sort: this.$route.query.sort || "state.startDate:desc",
|
||||||
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
|
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
|
||||||
}, false),
|
}, false),
|
||||||
data: this.executionLabels
|
data: filtered.labels
|
||||||
})
|
})
|
||||||
.then(r => {
|
.then(r => {
|
||||||
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
||||||
@@ -829,7 +838,7 @@
|
|||||||
return this.$store
|
return this.$store
|
||||||
.dispatch("execution/bulkSetLabels", {
|
.dispatch("execution/bulkSetLabels", {
|
||||||
executionsId: this.selection,
|
executionsId: this.selection,
|
||||||
executionLabels: this.executionLabels
|
executionLabels: filtered.labels
|
||||||
})
|
})
|
||||||
.then(r => {
|
.then(r => {
|
||||||
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
|
||||||
|
|||||||
@@ -53,6 +53,8 @@
|
|||||||
import LabelInput from "../../components/labels/LabelInput.vue";
|
import LabelInput from "../../components/labels/LabelInput.vue";
|
||||||
import State from "../../utils/state";
|
import State from "../../utils/state";
|
||||||
|
|
||||||
|
import {filterLabels} from "./utils"
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
components: {LabelInput,},
|
components: {LabelInput,},
|
||||||
props: {
|
props: {
|
||||||
@@ -71,9 +73,16 @@
|
|||||||
},
|
},
|
||||||
methods: {
|
methods: {
|
||||||
setLabels() {
|
setLabels() {
|
||||||
|
const filtered = filterLabels(this.executionLabels)
|
||||||
|
|
||||||
|
if(filtered.error) {
|
||||||
|
this.$toast().error(this.$t("wrong labels"))
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.isOpen = false;
|
this.isOpen = false;
|
||||||
this.$store.dispatch("execution/setLabels", {
|
this.$store.dispatch("execution/setLabels", {
|
||||||
labels: this.executionLabels,
|
labels: filtered.labels,
|
||||||
executionId: this.execution.id
|
executionId: this.execution.id
|
||||||
}).then(response => {
|
}).then(response => {
|
||||||
this.$store.commit("execution/setExecution", response.data)
|
this.$store.commit("execution/setExecution", response.data)
|
||||||
|
|||||||
@@ -263,6 +263,8 @@
|
|||||||
return this.attempts(taskRun)[this.selectedAttemptNumberByTaskRunId[taskRun.id] ?? 0];
|
return this.attempts(taskRun)[this.selectedAttemptNumberByTaskRunId[taskRun.id] ?? 0];
|
||||||
},
|
},
|
||||||
taskType(taskRun) {
|
taskType(taskRun) {
|
||||||
|
if(!taskRun) return undefined;
|
||||||
|
|
||||||
const task = FlowUtils.findTaskById(this.flow, taskRun.taskId);
|
const task = FlowUtils.findTaskById(this.flow, taskRun.taskId);
|
||||||
const parentTaskRunId = taskRun.parentTaskRunId;
|
const parentTaskRunId = taskRun.parentTaskRunId;
|
||||||
if (task === undefined && parentTaskRunId) {
|
if (task === undefined && parentTaskRunId) {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
:start-date="startDate"
|
:start-date="startDate"
|
||||||
:end-date="endDate"
|
:end-date="endDate"
|
||||||
@update:model-value="onAbsFilterChange"
|
@update:model-value="onAbsFilterChange"
|
||||||
|
class="w-auto"
|
||||||
/>
|
/>
|
||||||
<relative-date-select
|
<relative-date-select
|
||||||
v-if="selectedFilterType === filterType.RELATIVE"
|
v-if="selectedFilterType === filterType.RELATIVE"
|
||||||
|
|||||||
14
ui/src/components/executions/utils.ts
Normal file
14
ui/src/components/executions/utils.ts
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
interface Label {
|
||||||
|
key: string | null;
|
||||||
|
value: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface FilterResult {
|
||||||
|
labels: Label[];
|
||||||
|
error?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const filterLabels = (labels: Label[]): FilterResult => {
|
||||||
|
const invalid = labels.some(label => label.key === null || label.value === null);
|
||||||
|
return invalid ? {labels, error: true} : {labels};
|
||||||
|
};
|
||||||
@@ -41,7 +41,7 @@
|
|||||||
if (this.$route.query.reset) {
|
if (this.$route.query.reset) {
|
||||||
localStorage.setItem("tourDoneOrSkip", undefined);
|
localStorage.setItem("tourDoneOrSkip", undefined);
|
||||||
this.$store.commit("core/setGuidedProperties", {tourStarted: false});
|
this.$store.commit("core/setGuidedProperties", {tourStarted: false});
|
||||||
this.$tours["guidedTour"].start();
|
this.$tours["guidedTour"]?.start();
|
||||||
}
|
}
|
||||||
this.setupFlow()
|
this.setupFlow()
|
||||||
},
|
},
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user