mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
111 Commits
fix/filter
...
fix/failin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
709ac37773 | ||
|
|
f35a0b6d60 | ||
|
|
0c9ed17f1c | ||
|
|
7ca20371f8 | ||
|
|
8ff3454cbd | ||
|
|
09593d9fd2 | ||
|
|
d3cccf36f0 | ||
|
|
eeb91cd9ed | ||
|
|
2679b0f067 | ||
|
|
54281864c8 | ||
|
|
e4f9b11d0c | ||
|
|
12cef0593c | ||
|
|
c6cf8f307f | ||
|
|
3b4eb55f84 | ||
|
|
d32949985d | ||
|
|
c051ca2e66 | ||
|
|
93a456963b | ||
|
|
9a45f17680 | ||
|
|
5fb6806d74 | ||
|
|
f3cff72edd | ||
|
|
0abc660e7d | ||
|
|
f09ca3d92e | ||
|
|
9fd778fca1 | ||
|
|
667af25e1b | ||
|
|
1b1aed5ff1 | ||
|
|
da1bb58199 | ||
|
|
d3e661f9f8 | ||
|
|
2126c8815e | ||
|
|
6cfc5b8799 | ||
|
|
16d44034f0 | ||
|
|
f76e62a4af | ||
|
|
f6645da94c | ||
|
|
93b2bbf0d0 | ||
|
|
9d46e2aece | ||
|
|
133315a2a5 | ||
|
|
b96b9bb414 | ||
|
|
9865d8a7dc | ||
|
|
29f22c2f81 | ||
|
|
3e69469381 | ||
|
|
38c24ccf7f | ||
|
|
12cf41a309 | ||
|
|
7b8ea0d885 | ||
|
|
cf88bbcb12 | ||
|
|
6abe7f96e7 | ||
|
|
e73ac78d8b | ||
|
|
b0687eb702 | ||
|
|
85f9070f56 | ||
|
|
0a42ab40ec | ||
|
|
856d2d1d51 | ||
|
|
a7d6dbc8a3 | ||
|
|
cf82109da6 | ||
|
|
d4168ba424 | ||
|
|
46a294f25a | ||
|
|
a229036d8d | ||
|
|
a518fefecd | ||
|
|
1d3210fd7d | ||
|
|
597f84ecb7 | ||
|
|
5f3c7ac9f0 | ||
|
|
77c4691b04 | ||
|
|
6d34416529 | ||
|
|
40a67d5dcd | ||
|
|
2c68c704f6 | ||
|
|
e59d9f622c | ||
|
|
c951ba39a7 | ||
|
|
a0200cfacb | ||
|
|
c6310f0697 | ||
|
|
21ba59a525 | ||
|
|
4f9e3cd06c | ||
|
|
e74010d1a4 | ||
|
|
465e6467e9 | ||
|
|
c68c1b16d9 | ||
|
|
468c32156e | ||
|
|
6e0a1c61ef | ||
|
|
552d55ef6b | ||
|
|
08b0b682bf | ||
|
|
cff90c93bb | ||
|
|
ea465056d0 | ||
|
|
02f150f0b0 | ||
|
|
95d95d3d3c | ||
|
|
6b8d3d6928 | ||
|
|
1e347073ca | ||
|
|
ac09dcecd9 | ||
|
|
40b337cd22 | ||
|
|
5377d16036 | ||
|
|
f717bc413f | ||
|
|
d6bed2d235 | ||
|
|
07fd74b238 | ||
|
|
60eef29de2 | ||
|
|
20ca7b6380 | ||
|
|
9d82df61c6 | ||
|
|
e78210b5eb | ||
|
|
83143fae83 | ||
|
|
25f5ccc6b5 | ||
|
|
cf3e49a284 | ||
|
|
9a72d378df | ||
|
|
752a927fac | ||
|
|
4053392921 | ||
|
|
8b0483643a | ||
|
|
5feeb41c7a | ||
|
|
d7f5e5c05d | ||
|
|
4840f723fc | ||
|
|
8cf159b281 | ||
|
|
4c79576113 | ||
|
|
f87f2ed753 | ||
|
|
298a6c7ca8 | ||
|
|
ab464fff6e | ||
|
|
6dcba16314 | ||
|
|
80a328e87e | ||
|
|
f2034f4975 | ||
|
|
edca56d168 | ||
|
|
c55dedcc56 |
37
.github/workflows/docker.yml
vendored
37
.github/workflows/docker.yml
vendored
@@ -20,6 +20,15 @@ on:
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
@@ -38,9 +47,18 @@ jobs:
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
needs: [ plugins, build-artifacts ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -73,14 +91,27 @@ jobs:
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
|
||||
# [workflow_dispatch]
|
||||
# Download executable from GitHub Release
|
||||
- name: Artifacts - Download release (workflow_dispatch)
|
||||
id: download-github-release
|
||||
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
# [workflow_call]
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
3
.github/workflows/main.yml
vendored
3
.github/workflows/main.yml
vendored
@@ -43,7 +43,8 @@ jobs:
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
74
.github/workflows/workflow-build-artifacts.yml
vendored
74
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
workflow_call: {}
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -82,55 +66,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -143,10 +78,3 @@ jobs:
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "The Slack webhook URL."
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
|
||||
@@ -41,8 +41,6 @@ jobs:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
|
||||
11
.github/workflows/workflow-release.yml
vendored
11
.github/workflows/workflow-release.yml
vendored
@@ -42,12 +42,16 @@ on:
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "GH personnal Token."
|
||||
required: true
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "Slack webhook for releases channel."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
@@ -77,4 +81,5 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
6
.plugins
6
.plugins
@@ -26,6 +26,7 @@
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
|
||||
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
|
||||
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
|
||||
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
|
||||
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
|
||||
@@ -86,13 +87,18 @@
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
26
build.gradle
26
build.gradle
@@ -225,14 +225,14 @@ subprojects {
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme 'mocha-parallel'
|
||||
showExceptions true
|
||||
showFullStackTraces true
|
||||
showCauses true
|
||||
slowThreshold 2000
|
||||
showStandardStreams true
|
||||
showPassedStandardStreams false
|
||||
showSkippedStandardStreams true
|
||||
theme = 'mocha-parallel'
|
||||
showExceptions = true
|
||||
showFullStackTraces = true
|
||||
showCauses = true
|
||||
slowThreshold = 2000
|
||||
showStandardStreams = true
|
||||
showPassedStandardStreams = false
|
||||
showSkippedStandardStreams = true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -410,7 +410,7 @@ jar {
|
||||
shadowJar {
|
||||
archiveClassifier.set(null)
|
||||
mergeServiceFiles()
|
||||
zip64 true
|
||||
zip64 = true
|
||||
}
|
||||
|
||||
distZip.dependsOn shadowJar
|
||||
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
|
||||
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
|
||||
|
||||
tasks.register('writeExecutableJar') {
|
||||
group "build"
|
||||
description "Write an executable jar from shadow jar"
|
||||
group = "build"
|
||||
description = "Write an executable jar from shadow jar"
|
||||
dependsOn = [shadowJar]
|
||||
|
||||
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
|
||||
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
|
||||
}
|
||||
|
||||
tasks.register('executableJar', Zip) {
|
||||
group "build"
|
||||
description "Zip the executable jar"
|
||||
group = "build"
|
||||
description = "Zip the executable jar"
|
||||
dependsOn = [writeExecutableJar]
|
||||
|
||||
archiveFileName = "${project.name}-${project.version}.zip"
|
||||
|
||||
@@ -162,7 +162,15 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
log.warn("File not found: {}, deleting it", entry, e);
|
||||
// the file might have been deleted while reading so if not found we try to delete the flow
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public record Label(@NotNull String key, @NotNull String value) {
|
||||
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
|
||||
* @return the nested {@link Map}.
|
||||
*/
|
||||
public static Map<String, Object> toNestedMap(List<Label> labels) {
|
||||
Map<String, Object> asMap = labels.stream()
|
||||
return MapUtils.flattenToNestedMap(toMap(labels));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a flat map.
|
||||
* Key order is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be converted.
|
||||
* @return the flat {@link Map}.
|
||||
*/
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the first is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
|
||||
return MapUtils.flattenToNestedMap(asMap);
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for deduplicating a list of labels by their key.
|
||||
* Value of the last key occurrence is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be deduplicated.
|
||||
* @return the deduplicated {@link List}.
|
||||
*/
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import java.util.ArrayList;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -49,42 +49,27 @@ public record QueryFilter(
|
||||
PREFIX
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Object> asValues(Object value) {
|
||||
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
|
||||
switch (this.operation) {
|
||||
case EQUALS:
|
||||
return EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS:
|
||||
return NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN:
|
||||
return GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN:
|
||||
return LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO:
|
||||
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO:
|
||||
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN:
|
||||
return In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN:
|
||||
return NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH:
|
||||
return StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH:
|
||||
return EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS:
|
||||
return Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX:
|
||||
return Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX:
|
||||
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
|
||||
}
|
||||
return switch (this.operation) {
|
||||
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
};
|
||||
}
|
||||
|
||||
public enum Field {
|
||||
|
||||
@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param labels The Flow labels.
|
||||
* @return a new {@link Execution}.
|
||||
*/
|
||||
public static Execution newExecution(final Flow flow, final List<Label> labels) {
|
||||
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
|
||||
return newExecution(flow, null, labels, Optional.empty());
|
||||
}
|
||||
|
||||
public List<Label> getLabels() {
|
||||
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
|
||||
return ListUtils.emptyOnNull(this.labels);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Customization of Lombok-generated builder.
|
||||
*/
|
||||
public static class ExecutionBuilder {
|
||||
|
||||
/**
|
||||
* Enforce unique values of {@link Label} when using the builder.
|
||||
*
|
||||
* @param labels The labels.
|
||||
* @return Deduplicated labels.
|
||||
*/
|
||||
public ExecutionBuilder labels(List<Label> labels) {
|
||||
this.labels = Label.deduplicate(labels);
|
||||
return this;
|
||||
}
|
||||
|
||||
void prebuild() {
|
||||
this.originalId = this.id;
|
||||
this.metadata = ExecutionMetadata.builder()
|
||||
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
public Execution withLabels(List<Label> labels) {
|
||||
|
||||
return new Execution(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.taskRunList,
|
||||
this.inputs,
|
||||
this.outputs,
|
||||
labels,
|
||||
Label.deduplicate(labels),
|
||||
this.variables,
|
||||
this.state,
|
||||
this.parentId,
|
||||
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
*
|
||||
* @param resolvedTasks normal tasks
|
||||
* @param resolvedErrors errors tasks
|
||||
* @param resolvedErrors finally tasks
|
||||
* @param resolvedFinally finally tasks
|
||||
* @return the flow we need to follow
|
||||
*/
|
||||
public List<ResolvedTask> findTaskDependingFlowState(
|
||||
|
||||
@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
@Min(value = 1)
|
||||
Integer revision;
|
||||
|
||||
String description;
|
||||
|
||||
@Valid
|
||||
List<Input<?>> inputs;
|
||||
|
||||
|
||||
@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
}
|
||||
});
|
||||
|
||||
String description;
|
||||
|
||||
Map<String, Object> variables;
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
List<Task> tasks;
|
||||
|
||||
|
||||
@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
|
||||
|
||||
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
|
||||
|
||||
String getDescription();
|
||||
|
||||
boolean isDisabled();
|
||||
|
||||
boolean isDeleted();
|
||||
|
||||
@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
|
||||
|
||||
private static final String DEFAULT_EXTENSION = ".upl";
|
||||
|
||||
@Builder.Default
|
||||
@Deprecated(since = "0.24", forRemoval = true)
|
||||
public String extension = DEFAULT_EXTENSION;
|
||||
public String extension;
|
||||
|
||||
@Override
|
||||
public void validate(URI input) throws ConstraintViolationException {
|
||||
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
|
||||
String res = inputs.stream()
|
||||
.filter(in -> in instanceof FileInput)
|
||||
.filter(in -> in.getId().equals(fileName))
|
||||
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
|
||||
.map(flowInput -> ((FileInput) flowInput).getExtension())
|
||||
.findFirst()
|
||||
.orElse(FileInput.DEFAULT_EXTENSION);
|
||||
|
||||
@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
|
||||
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
|
||||
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -27,8 +27,6 @@ public class QueueService {
|
||||
return ((Executor) object).getExecution().getId();
|
||||
} else if (object.getClass() == MetricEntry.class) {
|
||||
return null;
|
||||
} else if (object.getClass() == ExecutionRunning.class) {
|
||||
return ((ExecutionRunning) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionEnd.class) {
|
||||
return ((SubflowExecutionEnd) object).getParentExecutionId();
|
||||
} else {
|
||||
|
||||
@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -82,6 +82,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String executionId,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -11,7 +12,7 @@ import lombok.With;
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ExecutionRunning {
|
||||
public class ExecutionRunning implements HasUID {
|
||||
String tenantId;
|
||||
|
||||
@NotNull
|
||||
@@ -26,6 +27,7 @@ public class ExecutionRunning {
|
||||
@With
|
||||
ConcurrencyState concurrencyState;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
|
||||
}
|
||||
|
||||
@@ -102,49 +102,39 @@ public class ExecutorService {
|
||||
return this.flowExecutorInterface;
|
||||
}
|
||||
|
||||
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
|
||||
// if above the limit, handle concurrency limit based on its behavior
|
||||
if (count >= flow.getConcurrency().getLimit()) {
|
||||
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
|
||||
// if concurrency was removed, it can be null as we always get the latest flow definition
|
||||
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
|
||||
return switch (flow.getConcurrency().getBehavior()) {
|
||||
case QUEUE -> {
|
||||
var newExecution = execution.withState(State.Type.QUEUED);
|
||||
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
|
||||
.build();
|
||||
|
||||
// when max concurrency is reached, we throttle the execution and stop processing
|
||||
logService.logExecution(
|
||||
newExecution,
|
||||
executionRunning.getExecution(),
|
||||
Level.INFO,
|
||||
"Flow is queued due to concurrency limit exceeded, {} running(s)",
|
||||
count
|
||||
"Execution is queued due to concurrency limit exceeded, {} running(s)",
|
||||
runningCount
|
||||
);
|
||||
// return the execution queued
|
||||
yield executor
|
||||
.withExecutionRunning(executionRunning)
|
||||
.withExecution(newExecution, "checkConcurrencyLimit");
|
||||
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
yield executionRunning
|
||||
.withExecution(newExecution)
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
|
||||
}
|
||||
case CANCEL ->
|
||||
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
case FAIL ->
|
||||
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
|
||||
var executionRunning = new ExecutionRunning(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
executor.getExecution(),
|
||||
ExecutionRunning.ConcurrencyState.RUNNING
|
||||
);
|
||||
return executor.withExecutionRunning(executionRunning);
|
||||
// if under the limit, run it!
|
||||
return executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
}
|
||||
|
||||
public Executor process(Executor executor) {
|
||||
|
||||
@@ -286,18 +286,10 @@ public class FlowableUtils {
|
||||
|
||||
// start as many tasks as we have concurrency slots
|
||||
return collect.values().stream()
|
||||
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
|
||||
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
|
||||
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
|
||||
.limit(concurrencySlots)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
|
||||
return tasks.stream()
|
||||
.filter(resolvedTask -> taskRuns.stream()
|
||||
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
|
||||
)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst())
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
@@ -102,6 +102,19 @@ public abstract class AbstractDate {
|
||||
}
|
||||
|
||||
if (value instanceof Long longValue) {
|
||||
if(value.toString().length() == 13) {
|
||||
return Instant.ofEpochMilli(longValue).atZone(zoneId);
|
||||
}else if(value.toString().length() == 19 ){
|
||||
if(value.toString().endsWith("000")){
|
||||
long seconds = longValue/1_000_000_000;
|
||||
int nanos = (int) (longValue%1_000_000_000);
|
||||
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
|
||||
}else{
|
||||
long milliseconds = longValue/1_000_000;
|
||||
int micros = (int) (longValue%1_000_000);
|
||||
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
|
||||
}
|
||||
}
|
||||
return Instant.ofEpochSecond(longValue).atZone(zoneId);
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -318,7 +319,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +329,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -467,9 +469,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +496,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
|
||||
@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
stateLock.lock();
|
||||
// Optional callback to be executed at the end.
|
||||
Runnable returnCallback = null;
|
||||
|
||||
localServiceState = localServiceState(service);
|
||||
try {
|
||||
localServiceState = localServiceState(service);
|
||||
|
||||
|
||||
if (localServiceState == null) {
|
||||
return null; // service has been unregistered.
|
||||
}
|
||||
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
// Update the local instance
|
||||
this.serviceRegistry.register(localServiceState.with(remoteInstance));
|
||||
} catch (Exception e) {
|
||||
final ServiceInstance localInstance = localServiceState(service).instance();
|
||||
final ServiceInstance localInstance = localServiceState.instance();
|
||||
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
|
||||
localInstance.uid(),
|
||||
localInstance.type(),
|
||||
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
returnCallback.run();
|
||||
}
|
||||
}
|
||||
return localServiceState(service).instance();
|
||||
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
|
||||
}
|
||||
|
||||
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
|
||||
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
final Service service,
|
||||
final ServiceInstance instance,
|
||||
final boolean isLivenessEnabled) {
|
||||
// Never shutdown STANDALONE server or WEB_SERVER service.
|
||||
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
|
||||
instance.is(ServiceType.WEBSERVER)) {
|
||||
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
|
||||
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
|
||||
instance.is(ServiceType.INDEXER) ||
|
||||
instance.is(ServiceType.WEBSERVER)
|
||||
) {
|
||||
// Force the RUNNING state.
|
||||
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowService {
|
||||
|
||||
@Inject
|
||||
Optional<FlowRepositoryInterface> flowRepository;
|
||||
|
||||
@@ -236,6 +236,7 @@ public class FlowService {
|
||||
}
|
||||
|
||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||
@@ -246,6 +247,21 @@ public class FlowService {
|
||||
}
|
||||
});
|
||||
|
||||
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
|
||||
flow.allTasksWithChilds().forEach(task -> {
|
||||
if (!(task instanceof RunnableTask<?>)) {
|
||||
if (task.getTimeout() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getTaskCache() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getWorkerGroup() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.micronaut.context.annotation.Context;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Enforces validation rules upon the application configuration.
|
||||
*/
|
||||
@Slf4j
|
||||
@Context
|
||||
public class AppConfigValidator {
|
||||
private static final String KESTRA_URL_KEY = "kestra.url";
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
@Inject
|
||||
public AppConfigValidator(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void validate() {
|
||||
final List<Boolean> validationResults = List.of(
|
||||
isKestraUrlValid()
|
||||
);
|
||||
|
||||
if (validationResults.contains(false)) {
|
||||
throw new AppConfigException("Invalid configuration");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isKestraUrlValid() {
|
||||
if (!environment.containsProperty(KESTRA_URL_KEY)) {
|
||||
return true;
|
||||
}
|
||||
final String rawUrl = environment.getProperty(KESTRA_URL_KEY, String.class).orElseThrow();
|
||||
final URL url;
|
||||
|
||||
try {
|
||||
url = URI.create(rawUrl).toURL();
|
||||
} catch (IllegalArgumentException | MalformedURLException e) {
|
||||
log.error(
|
||||
"Value of the '{}' configuration property must be a valid URL - e.g. https://your.company.com",
|
||||
KESTRA_URL_KEY
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!List.of("http", "https").contains(url.getProtocol())) {
|
||||
log.error(
|
||||
"Value of the '{}' configuration property must contain either HTTP or HTTPS scheme - e.g. https://your.company.com",
|
||||
KESTRA_URL_KEY
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class AppConfigException extends RuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public AppConfigException(String errorMessage) {
|
||||
super(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
||||
}
|
||||
|
||||
List<Task> allTasks = value.allTasksWithChilds();
|
||||
|
||||
// tasks unique id
|
||||
List<String> taskIds = value.allTasksWithChilds()
|
||||
.stream()
|
||||
List<String> taskIds = allTasks.stream()
|
||||
.map(Task::getId)
|
||||
.toList();
|
||||
|
||||
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
allTasks.stream()
|
||||
.filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> invalidTasks = value.allTasks()
|
||||
List<String> invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
}
|
||||
|
||||
List<Pattern> outputsWithMinusPattern = value.allTasks()
|
||||
List<Pattern> outputsWithMinusPattern = allTasks.stream()
|
||||
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
||||
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
invalidTasks = value.allTasks()
|
||||
invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
//@TriggersDataFilterValidation
|
||||
@Schema(
|
||||
title = "Display Execution data in a dashboard chart.",
|
||||
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
||||
|
||||
@@ -111,8 +111,9 @@ public class Labels extends Task implements ExecutionUpdatableTask {
|
||||
})
|
||||
).collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
Map.Entry::getValue
|
||||
));
|
||||
Map.Entry::getValue,
|
||||
(first, second) -> second)
|
||||
);
|
||||
} else if (labels instanceof Map<?, ?> map) {
|
||||
labelsAsMap = map.entrySet()
|
||||
.stream()
|
||||
|
||||
@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
final Output.OutputBuilder builder = Output.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(execution.getState().getCurrent());
|
||||
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
if (this.wait) { // we only compute outputs if we wait for the subflow
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -1,11 +1,12 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class LabelTest {
|
||||
|
||||
@Test
|
||||
@@ -15,9 +16,8 @@ class LabelTest {
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
Assertions.assertEquals(
|
||||
Map.of("system", Map.of("username", "test", "correlationId", "id")),
|
||||
result
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of("system", Map.of("username", "test", "correlationId", "id"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -29,9 +29,48 @@ class LabelTest {
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
Assertions.assertEquals(
|
||||
Map.of("system", Map.of("username", "test1", "correlationId", "id")),
|
||||
result
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of("system", Map.of("username", "test2", "correlationId", "id"))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetMapGivenDistinctLabels() {
|
||||
Map<String, String> result = Label.toMap(List.of(
|
||||
new Label(Label.USERNAME, "test"),
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of(Label.USERNAME, "test", Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetMapGivenDuplicateLabels() {
|
||||
Map<String, String> result = Label.toMap(List.of(
|
||||
new Label(Label.USERNAME, "test1"),
|
||||
new Label(Label.USERNAME, "test2"),
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of(Label.USERNAME, "test2", Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDuplicateLabelsWithKeyOrderKept() {
|
||||
List<Label> result = Label.deduplicate(List.of(
|
||||
new Label(Label.USERNAME, "test1"),
|
||||
new Label(Label.USERNAME, "test2"),
|
||||
new Label(Label.CORRELATION_ID, "id"),
|
||||
new Label(Label.USERNAME, "test3"))
|
||||
);
|
||||
|
||||
assertThat(result).containsExactly(
|
||||
new Label(Label.USERNAME, "test3"),
|
||||
new Label(Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.core.models.flows.State;
|
||||
|
||||
@@ -157,7 +158,58 @@ class ExecutionTest {
|
||||
.labels(List.of(new Label("test", "test-value")))
|
||||
.build();
|
||||
|
||||
assertThat(execution.getLabels().size()).isEqualTo(1);
|
||||
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("test", "test-value"));
|
||||
assertThat(execution.getLabels()).containsExactly(new Label("test", "test-value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void labelsGetDeduplicated() {
|
||||
final List<Label> duplicatedLabels = List.of(
|
||||
new Label("test", "value1"),
|
||||
new Label("test", "value2")
|
||||
);
|
||||
|
||||
final Execution executionWithLabels = Execution.builder()
|
||||
.build()
|
||||
.withLabels(duplicatedLabels);
|
||||
assertThat(executionWithLabels.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
|
||||
final Execution executionBuilder = Execution.builder()
|
||||
.labels(duplicatedLabels)
|
||||
.build();
|
||||
assertThat(executionBuilder.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled("Solve label deduplication on instantization")
|
||||
void labelsGetDeduplicatedOnNewInstance() {
|
||||
final List<Label> duplicatedLabels = List.of(
|
||||
new Label("test", "value1"),
|
||||
new Label("test", "value2")
|
||||
);
|
||||
|
||||
final Execution executionNew = new Execution(
|
||||
"foo",
|
||||
"id",
|
||||
"namespace",
|
||||
"flowId",
|
||||
1,
|
||||
Collections.emptyList(),
|
||||
Map.of(),
|
||||
Map.of(),
|
||||
duplicatedLabels,
|
||||
Map.of(),
|
||||
State.of(State.Type.SUCCESS, Collections.emptyList()),
|
||||
"parentId",
|
||||
"originalId",
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
assertThat(executionNew.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
@@ -740,4 +741,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.size()).isEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||
inject();
|
||||
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||
|
||||
assertThat(lastExecutions).isNotEmpty();
|
||||
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
.flowId("flowId")
|
||||
.namespace("io.kestra.unittest")
|
||||
.taskId("taskId")
|
||||
.executionId(IdUtils.create())
|
||||
.executionId("executionId")
|
||||
.taskRunId(IdUtils.create())
|
||||
.attemptNumber(0)
|
||||
.timestamp(Instant.now())
|
||||
@@ -293,19 +293,23 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", Level.INFO, startDate);
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", null, null, Level.INFO, startDate);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(3);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, Level.ERROR, startDate);
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", Level.INFO, startDate);
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", "flowId", null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", "flowId", "executionId", Level.INFO, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, Level.INFO, startDate.plusSeconds(2));
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.INFO, startDate.plusSeconds(2));
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
}
|
||||
|
||||
@@ -417,6 +417,12 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
|
||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -8,18 +8,28 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -28,7 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@Singleton
|
||||
public class FlowConcurrencyCaseTest {
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
protected RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private FlowInputOutput flowIO;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
@@ -237,4 +253,49 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
|
||||
URI file = storageUpload();
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
|
||||
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
|
||||
executionIds.add(e.getLeft().getId());
|
||||
}
|
||||
});
|
||||
|
||||
// wait a little to be sure there are not too many executions started
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(executionIds).hasSize(1);
|
||||
receive.blockLast();
|
||||
|
||||
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
Files.write(tempFile.toPath(), content());
|
||||
|
||||
return storageInterface.put(
|
||||
MAIN_TENANT,
|
||||
null,
|
||||
new URI("/file/storage/file.txt"),
|
||||
new FileInputStream(tempFile)
|
||||
);
|
||||
}
|
||||
|
||||
private List<String> content() {
|
||||
return IntStream
|
||||
.range(0, 7)
|
||||
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
|
||||
.toList();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -273,7 +273,7 @@ public class RestartCaseTest {
|
||||
|
||||
// wait
|
||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
|
||||
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
|
||||
throwRunnable(() -> {
|
||||
Execution restartedExec = executionService.restart(firstExecution, null);
|
||||
assertThat(restartedExec).isNotNull();
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -109,7 +110,8 @@ class RunContextLoggerTest {
|
||||
logger.info("test myawesomepassmyawesomepass myawesomepass myawesomepassmyawesomepass");
|
||||
logger.warn("test {}", URI.create("http://it-s.secret"));
|
||||
|
||||
matchingLog = TestsUtils.awaitLogs(logs, 3);
|
||||
// the 3 logs will create 4 log entries as exceptions stacktraces are logged separately at the TRACE level
|
||||
matchingLog = TestsUtils.awaitLogs(logs, 4);
|
||||
receive.blockLast();
|
||||
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.DEBUG)).findFirst().orElseThrow().getMessage()).isEqualTo("test john@****** test");
|
||||
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.TRACE)).findFirst().orElseThrow().getMessage()).contains("exception from doe.com");
|
||||
|
||||
@@ -71,6 +71,8 @@ class DateFilterTest {
|
||||
{{ "2013-09-08T17:19:12+02:00" | date(timeZone="Europe/Paris") }}
|
||||
{{ "2013-09-08T17:19:12" | date(timeZone="Europe/Paris") }}
|
||||
{{ "2013-09-08" | date(timeZone="Europe/Paris") }}
|
||||
{{ "08.09.2023" | date("yyyy-MM-dd", existingFormat="dd.MM.yyyy") }}
|
||||
{{ "08092023" | date("yyyy-MM-dd", existingFormat="ddMMyyyy") }}
|
||||
""",
|
||||
Map.of()
|
||||
);
|
||||
@@ -80,6 +82,8 @@ class DateFilterTest {
|
||||
2013-09-08T17:19:12.000000+02:00
|
||||
2013-09-08T17:19:12.000000+02:00
|
||||
2013-09-08T00:00:00.000000+02:00
|
||||
2023-09-08
|
||||
2023-09-08
|
||||
""");
|
||||
}
|
||||
|
||||
@@ -171,7 +175,9 @@ class DateFilterTest {
|
||||
|
||||
render = variableRenderer.render("{{ now(format=\"sql_milli\") }}", ImmutableMap.of());
|
||||
|
||||
assertThat(render).isEqualTo(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
|
||||
// a millisecond can pass between the render and now so we can't assert on a precise to millisecond date
|
||||
assertThat(render).startsWith(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
assertThat(render).hasSize(23);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -185,4 +191,41 @@ class DateFilterTest {
|
||||
|
||||
assertThat(render).isEqualTo("2013-09-07T17:19:12.123456+02:00");
|
||||
}
|
||||
|
||||
@Test
|
||||
void timestampDateFormat() throws IllegalVariableEvaluationException {
|
||||
String render =
|
||||
variableRenderer.render(
|
||||
"""
|
||||
{{ 1378653552 | date(format="iso_sec", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="iso_milli", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="iso_zoned_date_time", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123456000 | date(format="iso", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552000123456 | date(format="iso", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552 | date(format="sql_sec", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="sql_milli", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123456000 | date(format="sql", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552000123456 | date(format="sql", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="sql_milli", timeZone="UTC") }}
|
||||
{{ "1378653552123" | number | date(format="sql_milli", timeZone="UTC") }}
|
||||
""",
|
||||
Map.of());
|
||||
|
||||
assertThat(render).isEqualTo("""
|
||||
2013-09-08T17:19:12+02:00
|
||||
2013-09-08T17:19:12.123+02:00
|
||||
2013-09-08T17:19:12.123000+02:00
|
||||
2013-09-08T17:19:12.123+02:00[Europe/Paris]
|
||||
2013-09-08T17:19:12.123456+02:00
|
||||
2013-09-08T17:19:12.123456+02:00
|
||||
2013-09-08 17:19:12
|
||||
2013-09-08 17:19:12.123
|
||||
2013-09-08 17:19:12.123456
|
||||
2013-09-08 17:19:12.123456
|
||||
2013-09-08 15:19:12.123
|
||||
2013-09-08 15:19:12.123
|
||||
""");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -372,4 +372,44 @@ class FlowServiceTest {
|
||||
|
||||
assertThat(exceptions.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnValidationForRunnablePropsOnFlowable() {
|
||||
// Given
|
||||
String source = """
|
||||
id: dolphin_164914
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: for
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: [1, 2, 3]
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
""";
|
||||
|
||||
// When
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
|
||||
|
||||
// Then
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getWarnings()).hasSize(3);
|
||||
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
|
||||
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.exceptions.BeanInstantiationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
class AppConfigValidatorTest {
|
||||
|
||||
@Test
|
||||
void validateNoKestraUrl() {
|
||||
assertThatCode(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.run()) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean got initialized properly including the PostConstruct validation")
|
||||
.doesNotThrowAnyException();
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateValidKestraUrl() {
|
||||
assertThatCode(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "https://postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean got initialized properly including the PostConstruct validation")
|
||||
.doesNotThrowAnyException();
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateInvalidKestraUrl() {
|
||||
assertThatThrownBy(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean initialization failed at PostConstruct")
|
||||
.isInstanceOf(BeanInstantiationException.class)
|
||||
.hasMessageContaining("Invalid configuration");
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateNonHttpKestraUrl() {
|
||||
assertThatThrownBy(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "ftp://postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean initialization failed at PostConstruct")
|
||||
.isInstanceOf(BeanInstantiationException.class)
|
||||
.hasMessageContaining("Invalid configuration");
|
||||
}
|
||||
}
|
||||
@@ -54,4 +54,10 @@ class ForEachTest {
|
||||
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/foreach-nested.yaml")
|
||||
void nested(Execution execution) {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package io.kestra.plugin.core.flow;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -104,4 +103,61 @@ class RuntimeLabelsTest {
|
||||
new Label("taskRunId", labelsTaskRunId),
|
||||
new Label("existingLabel", "someValue"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/primitive-labels-flow.yml"})
|
||||
void primitiveTypeLabelsOverrideExistingLabels() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
"io.kestra.tests",
|
||||
"primitive-labels-flow",
|
||||
null,
|
||||
(flow, createdExecution) -> Map.of(
|
||||
"intLabel", 42,
|
||||
"boolLabel", true,
|
||||
"floatLabel", 3.14f
|
||||
),
|
||||
null,
|
||||
List.of(
|
||||
new Label("intValue", "1"),
|
||||
new Label("boolValue", "false"),
|
||||
new Label("floatValue", "4.2f")
|
||||
)
|
||||
);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(1);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
String labelsTaskRunId = execution.findTaskRunsByTaskId("update-labels").getFirst().getId();
|
||||
|
||||
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("intValue", "42"),
|
||||
new Label("boolValue", "true"),
|
||||
new Label("floatValue", "3.14"),
|
||||
new Label("taskRunId", labelsTaskRunId));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/labels-update-task-deduplicate.yml"})
|
||||
void updateGetsDeduplicated() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
"io.kestra.tests",
|
||||
"labels-update-task-deduplicate",
|
||||
null,
|
||||
(flow, createdExecution) -> Map.of(),
|
||||
null,
|
||||
List.of()
|
||||
);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("fromStringKey", "value2"),
|
||||
new Label("fromListKey", "value2")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class SubflowRunnerTest {
|
||||
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
|
||||
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-inherited-labels-parent");
|
||||
|
||||
assertThat(parentExecution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, parentExecution.getId()),
|
||||
new Label("parentFlowLabel1", "value1"),
|
||||
new Label("parentFlowLabel2", "value2")
|
||||
);
|
||||
|
||||
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("launch").getFirst().getOutputs().get("executionId");
|
||||
|
||||
assertThat(childExecutionId).isNotBlank();
|
||||
|
||||
Execution childExecution = executionRepository.findById(MAIN_TENANT, childExecutionId).orElseThrow();
|
||||
|
||||
assertThat(childExecution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, parentExecution.getId()), // parent's correlation ID
|
||||
new Label("childFlowLabel1", "value1"), // defined by the subtask flow
|
||||
new Label("childFlowLabel2", "value2"), // defined by the subtask flow
|
||||
new Label("launchTaskLabel", "launchFoo"), // added by Subtask
|
||||
new Label("parentFlowLabel1", "launchBar"), // overridden by Subtask
|
||||
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
|
||||
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
|
||||
AtomicReference<Execution> childExecution = new AtomicReference<>();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Runnable closing = executionQueue.receive(either -> {
|
||||
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
|
||||
childExecution.set(either.getLeft());
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
|
||||
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
|
||||
assertThat(childExecutionId).isNotBlank();
|
||||
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(parentExecution.getTaskRunList()).hasSize(1);
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
|
||||
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
|
||||
closing.run();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
id: flow-concurrency-for-each-item
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: file
|
||||
type: FILE
|
||||
- id: batch
|
||||
type: INT
|
||||
|
||||
tasks:
|
||||
- id: each
|
||||
type: io.kestra.plugin.core.flow.ForEachItem
|
||||
items: "{{ inputs.file }}"
|
||||
batch:
|
||||
rows: "{{inputs.batch}}"
|
||||
namespace: io.kestra.tests
|
||||
flowId: flow-concurrency-queue
|
||||
wait: true
|
||||
transmitFailed: true
|
||||
inputs:
|
||||
items: "{{ taskrun.items }}"
|
||||
21
core/src/test/resources/flows/valids/foreach-nested.yaml
Normal file
21
core/src/test/resources/flows/valids/foreach-nested.yaml
Normal file
@@ -0,0 +1,21 @@
|
||||
id: foreach-nested
|
||||
namespace: io.kestra.tests
|
||||
tasks:
|
||||
- id: each0
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: ["l1", "l2"]
|
||||
tasks:
|
||||
- id: each1
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
concurrencyLimit: 0
|
||||
values: ["d1", "d2", "d3"]
|
||||
tasks:
|
||||
- id: p1
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ parent.taskrun.value }}-{{ taskrun.value }}"
|
||||
- id: p2
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ outputs.p1[parent.taskrun.value][taskrun.value].value }}"
|
||||
- id: test
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ outputs.p1 }}"
|
||||
@@ -0,0 +1,14 @@
|
||||
id: labels-update-task-deduplicate
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: from-string
|
||||
type: io.kestra.plugin.core.execution.Labels
|
||||
labels: "{ \"fromStringKey\": \"value1\", \"fromStringKey\": \"value2\" }"
|
||||
- id: from-list
|
||||
type: io.kestra.plugin.core.execution.Labels
|
||||
labels:
|
||||
- key: "fromListKey"
|
||||
value: "value1"
|
||||
- key: "fromListKey"
|
||||
value: "value2"
|
||||
@@ -0,0 +1,12 @@
|
||||
id: subflow-child-with-output
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "Some value"
|
||||
|
||||
outputs:
|
||||
- id: flow_a_output
|
||||
type: STRING
|
||||
value: "{{ outputs.return.value }}"
|
||||
@@ -0,0 +1,11 @@
|
||||
id: subflow-inherited-labels-child
|
||||
namespace: io.kestra.tests
|
||||
|
||||
labels:
|
||||
childFlowLabel1: value1
|
||||
childFlowLabel2: value2
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "{{ execution.id }}"
|
||||
@@ -0,0 +1,18 @@
|
||||
id: subflow-inherited-labels-parent
|
||||
namespace: io.kestra.tests
|
||||
|
||||
labels:
|
||||
parentFlowLabel1: value1
|
||||
parentFlowLabel2: value2
|
||||
|
||||
tasks:
|
||||
- id: launch
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: subflow-inherited-labels-child
|
||||
wait: true
|
||||
transmitFailed: true
|
||||
inheritLabels: true
|
||||
labels:
|
||||
launchTaskLabel: launchFoo
|
||||
parentFlowLabel1: launchBar
|
||||
@@ -0,0 +1,9 @@
|
||||
id: subflow-parent-no-wait
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: subflow
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: subflow-child-with-output
|
||||
wait: false
|
||||
@@ -1,4 +1,4 @@
|
||||
version=0.24.0-SNAPSHOT
|
||||
version=1.0.0-SNAPSHOT
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2QueueEnabled
|
||||
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new H2Queue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
"value" TEXT NOT NULL,
|
||||
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
|
||||
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
|
||||
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
|
||||
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
) NOT NULL
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
`value` JSON NOT NULL,
|
||||
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
|
||||
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
|
||||
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
|
||||
INDEX ix_flow (tenant_id, namespace, flow_id)
|
||||
);
|
||||
|
||||
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
) NOT NULL;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE queues MODIFY COLUMN `offset` BIGINT NOT NULL AUTO_INCREMENT;
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
key VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
value JSONB NOT NULL,
|
||||
tenant_id VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
|
||||
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
|
||||
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running (tenant_id, namespace, flow_id);
|
||||
|
||||
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionRunning';
|
||||
@@ -0,0 +1,3 @@
|
||||
ALTER SEQUENCE queues_offset_seq AS BIGINT;
|
||||
ALTER SEQUENCE queues_offset_seq MAXVALUE 9223372036854775807;
|
||||
ALTER TABLE queues ALTER COLUMN "offset" TYPE BIGINT;
|
||||
@@ -125,6 +125,12 @@ public class JdbcTableConfigsFactory {
|
||||
return new InstantiableJdbcTableConfig("dashboards", Dashboard.class, "dashboards");
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Named("executionrunning")
|
||||
public InstantiableJdbcTableConfig executionRunning() {
|
||||
return new InstantiableJdbcTableConfig("executionrunning", ExecutionRunning.class, "execution_running");
|
||||
}
|
||||
|
||||
public static class InstantiableJdbcTableConfig extends JdbcTableConfig {
|
||||
public InstantiableJdbcTableConfig(String name, @Nullable Class<?> cls, String table) {
|
||||
super(name, cls, table);
|
||||
|
||||
@@ -869,8 +869,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
|
||||
@Override
|
||||
public List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
List<FlowFilter> flows
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
@@ -892,14 +892,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.and(NORMAL_KIND_CONDITION)
|
||||
.and(field("end_date").isNotNull())
|
||||
.and(DSL.or(
|
||||
flows
|
||||
.stream()
|
||||
.map(flow -> DSL.and(
|
||||
field("namespace").eq(flow.getNamespace()),
|
||||
field("flow_id").eq(flow.getId())
|
||||
))
|
||||
.toList()
|
||||
));
|
||||
ListUtils.emptyOnNull(flows).isEmpty() ?
|
||||
DSL.trueCondition()
|
||||
:
|
||||
DSL.or(
|
||||
flows.stream()
|
||||
.map(flow -> DSL.and(
|
||||
field("namespace").eq(flow.getNamespace()),
|
||||
field("flow_id").eq(flow.getId())
|
||||
))
|
||||
.toList()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Table<Record2<Object, Integer>> cte = subquery.asTable("cte");
|
||||
|
||||
|
||||
@@ -26,13 +26,11 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.Comparator;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -162,12 +160,28 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
return select;
|
||||
}
|
||||
|
||||
private static <T extends Record> SelectConditionStep<T> addFlowId(SelectConditionStep<T> select, String flowId) {
|
||||
if (flowId != null) {
|
||||
select = select.and(field("flow_id").eq(flowId));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
private static <T extends Record> SelectConditionStep<T> addExecutionId(SelectConditionStep<T> select, String executionId) {
|
||||
if (executionId != null) {
|
||||
select = select.and(field("execution_id").eq(executionId));
|
||||
}
|
||||
return select;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String executionId,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
){
|
||||
return Flux.create(emitter -> this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
@@ -181,6 +195,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
.where(this.defaultFilter(tenantId))
|
||||
.and(NORMAL_KIND_CONDITION);
|
||||
addNamespace(select, namespace);
|
||||
addFlowId(select, flowId);
|
||||
addExecutionId(select, executionId);
|
||||
addMinLevel(select, minLevel);
|
||||
select = select.and(field("timestamp").greaterThan(startDate.toOffsetDateTime()));
|
||||
|
||||
@@ -217,24 +233,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
|
||||
}), FluxSink.OverflowStrategy.BUFFER);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private List<LogStatistics> fillDate(List<LogStatistics> result, ZonedDateTime startDate, ZonedDateTime endDate) {
|
||||
DateUtils.GroupType groupByType = DateUtils.groupByType(Duration.between(startDate, endDate));
|
||||
|
||||
if (groupByType.equals(DateUtils.GroupType.MONTH)) {
|
||||
return fillDate(result, startDate, endDate, ChronoUnit.MONTHS, "YYYY-MM");
|
||||
} else if (groupByType.equals(DateUtils.GroupType.WEEK)) {
|
||||
return fillDate(result, startDate, endDate, ChronoUnit.WEEKS, "YYYY-ww");
|
||||
} else if (groupByType.equals(DateUtils.GroupType.DAY)) {
|
||||
return fillDate(result, startDate, endDate, ChronoUnit.DAYS, "YYYY-MM-DD");
|
||||
} else if (groupByType.equals(DateUtils.GroupType.HOUR)) {
|
||||
return fillDate(result, startDate, endDate, ChronoUnit.HOURS, "YYYY-MM-DD HH");
|
||||
} else {
|
||||
return fillDate(result, startDate, endDate, ChronoUnit.MINUTES, "YYYY-MM-DD HH:mm");
|
||||
}
|
||||
}
|
||||
|
||||
private List<LogStatistics> fillDate(
|
||||
List<LogStatistics> result,
|
||||
ZonedDateTime startDate,
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.runners.ExecutionQueued;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
@@ -19,9 +20,9 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
}
|
||||
|
||||
public void save(ExecutionQueued executionQueued) {
|
||||
public void save(DSLContext dslContext, ExecutionQueued executionQueued) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionQueued);
|
||||
this.jdbcRepository.persist(executionQueued, fields);
|
||||
this.jdbcRepository.persist(executionQueued, dslContext, fields);
|
||||
}
|
||||
|
||||
public void pop(String tenantId, String namespace, String flowId, Consumer<Execution> consumer) {
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
package io.kestra.jdbc.runner;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.impl.DSL;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.*;
|
||||
|
||||
public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository {
|
||||
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository;
|
||||
|
||||
public AbstractJdbcExecutionRunningStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
}
|
||||
|
||||
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
|
||||
this.jdbcRepository.persist(executionRunning, dslContext, fields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Count for running executions then process the count using the consumer function.
|
||||
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
|
||||
* <p>
|
||||
* Note: when there is no execution running, there will be no database locks, so multiple calls will return 0.
|
||||
* This is only potentially an issue with multiple executor instances when the concurrency limit is set to 1.
|
||||
*/
|
||||
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, Integer, ExecutionRunning> consumer) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transactionResult(configuration -> {
|
||||
var dslContext = DSL.using(configuration);
|
||||
var select = dslContext
|
||||
.select(AbstractJdbcRepository.field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(this.buildTenantCondition(flow.getTenantId()))
|
||||
.and(field("namespace").eq(flow.getNamespace()))
|
||||
.and(field("flow_id").eq(flow.getId()));
|
||||
|
||||
Integer count = select.forUpdate().fetch().size();
|
||||
return consumer.apply(dslContext, count);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the execution running corresponding to the given execution.
|
||||
*/
|
||||
public void remove(Execution execution) {
|
||||
this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
.transaction(configuration -> {
|
||||
var select = DSL
|
||||
.using(configuration)
|
||||
.select(AbstractJdbcRepository.field("value"))
|
||||
.from(this.jdbcRepository.getTable())
|
||||
.where(buildTenantCondition(execution.getTenantId()))
|
||||
.and(field("key").eq(IdUtils.fromPartsAndSeparator('|', execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
|
||||
.forUpdate();
|
||||
|
||||
Optional<ExecutionRunning> maybeExecution = this.jdbcRepository.fetchOne(select);
|
||||
maybeExecution.ifPresent(executionRunning -> this.jdbcRepository.delete(executionRunning));
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.*;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.sla.*;
|
||||
import io.kestra.core.models.tasks.ExecutableTask;
|
||||
@@ -113,6 +112,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
|
||||
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
private QueueInterface<ExecutionRunning> executionRunningQueue;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@@ -146,6 +149,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
@Inject
|
||||
private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcExecutionRunningStorage executionRunningStorage;
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcExecutorStateStorage executorStateStorage;
|
||||
|
||||
@@ -303,6 +309,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
|
||||
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
|
||||
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
|
||||
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
|
||||
|
||||
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
@@ -553,37 +560,22 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
monitors.forEach(monitor -> slaMonitorStorage.save(monitor));
|
||||
}
|
||||
|
||||
// queue execution if needed (limit concurrency)
|
||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||
// are processed sequentially so inside a queue with no parallelism
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
|
||||
ExecutionCount count = executionRepository.executionCounts(
|
||||
flow.getTenantId(),
|
||||
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
|
||||
List.of(State.Type.RUNNING, State.Type.PAUSED),
|
||||
null,
|
||||
null,
|
||||
null
|
||||
).getFirst();
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(executor.getFlow().getTenantId())
|
||||
.namespace(executor.getFlow().getNamespace())
|
||||
.flowId(executor.getFlow().getId())
|
||||
.execution(executor.getExecution())
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
|
||||
.build();
|
||||
|
||||
executor = executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount());
|
||||
|
||||
// the execution has been queued, we save the queued execution and stops here
|
||||
if (executor.getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(executor.getExecutionRunning()));
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment();
|
||||
|
||||
return Pair.of(
|
||||
executor,
|
||||
executorState
|
||||
);
|
||||
}
|
||||
|
||||
// the execution has been moved to FAILED or CANCELLED, we stop here
|
||||
if (executor.getExecution().getState().isTerminated()) {
|
||||
return Pair.of(
|
||||
executor,
|
||||
executorState
|
||||
);
|
||||
}
|
||||
executionRunningQueue.emit(executionRunning);
|
||||
return Pair.of(
|
||||
executor,
|
||||
executorState
|
||||
);
|
||||
}
|
||||
|
||||
// handle execution changed SLA
|
||||
@@ -790,7 +782,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
// move it to the state of the child flow, and merge the outputs.
|
||||
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
|
||||
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
|
||||
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId()).withState(message.getState());
|
||||
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
|
||||
if (taskRun.getState().getCurrent() != message.getState()) {
|
||||
taskRun = taskRun.withState(message.getState());
|
||||
}
|
||||
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
|
||||
taskRun = taskRun.withOutputs(variables);
|
||||
@@ -981,6 +976,37 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
}
|
||||
}
|
||||
|
||||
private void executionRunningQueue(Either<ExecutionRunning, DeserializationException> either) {
|
||||
if (either.isRight()) {
|
||||
log.error("Unable to deserialize a running execution: {}", either.getRight().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
ExecutionRunning executionRunning = either.getLeft();
|
||||
FlowInterface flow = flowMetaStore.findByExecution(executionRunning.getExecution()).orElseThrow();
|
||||
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
|
||||
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning);
|
||||
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
|
||||
executionRunningStorage.save(dslContext, computed);
|
||||
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
|
||||
}
|
||||
return computed;
|
||||
});
|
||||
|
||||
try {
|
||||
executionQueue.emit(processed.getExecution());
|
||||
} catch (QueueException e) {
|
||||
try {
|
||||
this.executionQueue.emit(
|
||||
processed.getExecution().failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
|
||||
);
|
||||
} catch (QueueException ex) {
|
||||
log.error("Unable to emit the execution {}", processed.getExecution().getId(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
||||
return executionRepository.lock(executionId, pair -> {
|
||||
Execution currentExecution = pair.getLeft();
|
||||
@@ -1083,6 +1109,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
slaMonitorStorage.purge(executor.getExecution().getId());
|
||||
}
|
||||
|
||||
// purge execution running
|
||||
if (executor.getFlow().getConcurrency() != null) {
|
||||
executionRunningStorage.remove(execution);
|
||||
}
|
||||
|
||||
// check if there exist a queued execution and submit it to the execution queue
|
||||
if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
|
||||
executionQueuedStorage.pop(executor.getFlow().getTenantId(),
|
||||
|
||||
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest
|
||||
abstract public class JdbcQueueTest {
|
||||
@@ -54,7 +55,7 @@ abstract public class JdbcQueueTest {
|
||||
|
||||
flowQueue.emit(builder("io.kestra.f1"));
|
||||
|
||||
countDownLatch.await(5, TimeUnit.SECONDS);
|
||||
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
@@ -75,7 +76,7 @@ abstract public class JdbcQueueTest {
|
||||
|
||||
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
|
||||
|
||||
countDownLatch.await(5, TimeUnit.SECONDS);
|
||||
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(countDownLatch.getCount()).isEqualTo(0L);
|
||||
@@ -83,52 +84,48 @@ abstract public class JdbcQueueTest {
|
||||
|
||||
@Test
|
||||
void withType() throws InterruptedException, QueueException {
|
||||
// first one
|
||||
flowQueue.emit(builder("io.kestra.f1"));
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Flux<FlowInterface> receive = TestsUtils.receive(flowQueue, Indexer.class, either -> {
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
|
||||
countDownLatch.await(5, TimeUnit.SECONDS);
|
||||
|
||||
// first one
|
||||
flowQueue.emit(builder("io.kestra.f1"));
|
||||
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
|
||||
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f1");
|
||||
|
||||
// second one only
|
||||
flowQueue.emit(builder("io.kestra.f2"));
|
||||
|
||||
CountDownLatch countDownLatch2 = new CountDownLatch(1);
|
||||
receive = TestsUtils.receive(flowQueue, Indexer.class, either -> {
|
||||
countDownLatch2.countDown();
|
||||
});
|
||||
countDownLatch2.await(5, TimeUnit.SECONDS);
|
||||
|
||||
// second one only
|
||||
flowQueue.emit(builder("io.kestra.f2"));
|
||||
assertTrue(countDownLatch2.await(5, TimeUnit.SECONDS));
|
||||
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f2");
|
||||
}
|
||||
|
||||
// FIXME
|
||||
@Test
|
||||
void withGroupAndType() throws InterruptedException, QueueException {
|
||||
// first one
|
||||
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Flux<FlowInterface> receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> {
|
||||
countDownLatch.countDown();
|
||||
});
|
||||
|
||||
countDownLatch.await(5, TimeUnit.SECONDS);
|
||||
|
||||
// first one
|
||||
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
|
||||
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
|
||||
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f1");
|
||||
|
||||
// second one only
|
||||
flowQueue.emit("consumer_group", builder("io.kestra.f2"));
|
||||
|
||||
CountDownLatch countDownLatch2 = new CountDownLatch(1);
|
||||
receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> {
|
||||
countDownLatch2.countDown();
|
||||
});
|
||||
countDownLatch2.await(5, TimeUnit.SECONDS);
|
||||
|
||||
// second one only
|
||||
flowQueue.emit("consumer_group", builder("io.kestra.f2"));
|
||||
assertTrue(countDownLatch2.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f2");
|
||||
}
|
||||
|
||||
6
package-lock.json
generated
Normal file
6
package-lock.json
generated
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "kestra",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ dependencies {
|
||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||
api platform('com.google.cloud:libraries-bom:26.64.0')
|
||||
api platform("com.azure:azure-sdk-bom:1.2.36")
|
||||
api platform('software.amazon.awssdk:bom:2.32.6')
|
||||
api platform('software.amazon.awssdk:bom:2.32.11')
|
||||
|
||||
|
||||
constraints {
|
||||
@@ -69,6 +69,8 @@ dependencies {
|
||||
|
||||
// we need at least 0.14, it could be removed when Micronaut contains a recent only version in their BOM
|
||||
api "io.micrometer:micrometer-core:1.15.2"
|
||||
// We need at least 6.17, it could be removed when Micronaut contains a recent only version in their BOM
|
||||
api "io.micronaut.openapi:micronaut-openapi-bom:6.17.3"
|
||||
|
||||
// Other libs
|
||||
api("org.projectlombok:lombok:1.18.38")
|
||||
@@ -78,7 +80,7 @@ dependencies {
|
||||
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
|
||||
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
|
||||
api group: 'com.devskiller.friendly-id', name: 'friendly-id', version: '1.1.0'
|
||||
api group: 'net.thisptr', name: 'jackson-jq', version: '1.3.0'
|
||||
api group: 'net.thisptr', name: 'jackson-jq', version: '1.4.0'
|
||||
api group: 'com.google.guava', name: 'guava', version: '33.4.8-jre'
|
||||
api group: 'commons-io', name: 'commons-io', version: '2.20.0'
|
||||
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.18.0'
|
||||
@@ -103,7 +105,7 @@ dependencies {
|
||||
api group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: jsonschemaVersion
|
||||
api 'com.h2database:h2:2.3.232'
|
||||
api 'com.mysql:mysql-connector-j:9.3.0'
|
||||
api 'org.postgresql:postgresql:42.7.6'
|
||||
api 'org.postgresql:postgresql:42.7.7'
|
||||
api 'com.github.docker-java:docker-java:3.5.3'
|
||||
api 'com.github.docker-java:docker-java-transport-httpclient5:3.5.3'
|
||||
api (group: 'org.opensearch.client', name: 'opensearch-java', version: "$opensearchVersion")
|
||||
@@ -114,7 +116,7 @@ dependencies {
|
||||
api "org.xhtmlrenderer:flying-saucer-pdf:$flyingSaucerVersion"
|
||||
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.3'
|
||||
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.3'
|
||||
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.1'
|
||||
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
|
||||
api group: 'de.siegmar', name: 'fastcsv', version: '4.0.0'
|
||||
// Json Diff
|
||||
api group: 'com.github.java-json-tools', name: 'json-patch', version: '1.13'
|
||||
|
||||
@@ -14,5 +14,5 @@ public class Cpu {
|
||||
title = "The maximum amount of CPU resources a container can use.",
|
||||
description = "Make sure to set that to a numeric value e.g. `cpus: \"1.5\"` or `cpus: \"4\"` or For instance, if the host machine has two CPUs and you set `cpus: \"1.5\"`, the container is guaranteed **at most** one and a half of the CPUs."
|
||||
)
|
||||
private Property<Long> cpus;
|
||||
private Property<Double> cpus;
|
||||
}
|
||||
|
||||
@@ -765,7 +765,8 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
|
||||
}
|
||||
|
||||
if (this.getCpu() != null && this.getCpu().getCpus() != null) {
|
||||
hostConfig.withCpuQuota(runContext.render(this.getCpu().getCpus()).as(Long.class).orElseThrow() * 10000L);
|
||||
Double cpuValue = runContext.render(this.getCpu().getCpus()).as(Double.class).orElseThrow();
|
||||
hostConfig.withNanoCPUs((long)(cpuValue * 1_000_000_000L));
|
||||
}
|
||||
|
||||
if (this.getMemory() != null) {
|
||||
|
||||
@@ -2,15 +2,18 @@ package io.kestra.plugin.scripts.runner.docker;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
|
||||
import io.kestra.core.models.tasks.runners.TaskCommands;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
class DockerTest extends AbstractTaskRunnerTest {
|
||||
@Override
|
||||
@@ -37,4 +40,30 @@ class DockerTest extends AbstractTaskRunnerTest {
|
||||
assertThat(result.getExitCode()).isZero();
|
||||
Assertions.assertThat(result.getLogConsumer().getStdOutCount()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldSetCorrectCPULimitsInContainer() throws Exception {
|
||||
var runContext = runContext(this.runContextFactory);
|
||||
|
||||
var cpuConfig = Cpu.builder()
|
||||
.cpus(Property.ofValue(1.5))
|
||||
.build();
|
||||
|
||||
var docker = Docker.builder()
|
||||
.image("rockylinux:9.3-minimal")
|
||||
.cpu(cpuConfig)
|
||||
.build();
|
||||
|
||||
var taskCommands = new CommandsWrapper(runContext).withCommands(Property.ofValue(List.of(
|
||||
"/bin/sh", "-c",
|
||||
"CPU_LIMIT=$(cat /sys/fs/cgroup/cpu.max || cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us) && " +
|
||||
"echo \"::{\\\"outputs\\\":{\\\"cpuLimit\\\":\\\"$CPU_LIMIT\\\"}}::\""
|
||||
)));
|
||||
var result = docker.run(runContext, taskCommands, Collections.emptyList());
|
||||
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.getExitCode()).isZero();
|
||||
MatcherAssert.assertThat((String) result.getLogConsumer().getOutputs().get("cpuLimit"), containsString("150000"));
|
||||
assertThat(result.getLogConsumer().getStdOutCount()).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
@@ -116,7 +116,7 @@ class LogConsumerTest {
|
||||
Collections.emptyList()
|
||||
);
|
||||
|
||||
Await.until(() -> logs.size() >= 10, null, Duration.ofSeconds(5));
|
||||
Await.until(() -> logs.size() >= 10, null, Duration.ofSeconds(10));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).count()).isEqualTo(1L);
|
||||
|
||||
14
ui/package-lock.json
generated
14
ui/package-lock.json
generated
@@ -10,7 +10,7 @@
|
||||
"hasInstallScript": true,
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.224",
|
||||
"@kestra-io/ui-libs": "^0.0.232",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.45.0",
|
||||
@@ -1792,9 +1792,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@eslint/plugin-kit": {
|
||||
"version": "0.3.3",
|
||||
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.3.tgz",
|
||||
"integrity": "sha512-1+WqvgNMhmlAambTvT3KPtCl/Ibr68VldY2XY40SL1CE0ZXiakFR/cbTspaF5HsnpDMvcYYoJHfl4980NBjGag==",
|
||||
"version": "0.3.4",
|
||||
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.4.tgz",
|
||||
"integrity": "sha512-Ul5l+lHEcw3L5+k8POx6r74mxEYKG5kOb6Xpy2gCRW6zweT6TEhAf8vhxGgjhqrd/VO/Dirhsb+1hNpD1ue9hw==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
@@ -3133,9 +3133,9 @@
|
||||
"license": "BSD-3-Clause"
|
||||
},
|
||||
"node_modules/@kestra-io/ui-libs": {
|
||||
"version": "0.0.224",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.224.tgz",
|
||||
"integrity": "sha512-upEsKh8rfonNGW+EvA+ql2DaDc6umBX96xWb49lufmQGpm8xMEO+hhWbBRRXU+7egTYJW/6yaFPShNXWddHB4Q==",
|
||||
"version": "0.0.232",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.232.tgz",
|
||||
"integrity": "sha512-4Z1DNxWEZSEEy2Tv63uNf2remxb/IqVUY01/qCaeYjLcp5axrS7Dn43N8DspA4EPdlhe4JFq2RhG13Pom+JDQA==",
|
||||
"dependencies": {
|
||||
"@nuxtjs/mdc": "^0.16.1",
|
||||
"@popperjs/core": "^2.11.8",
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.224",
|
||||
"@kestra-io/ui-libs": "^0.0.232",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.45.0",
|
||||
@@ -149,7 +149,7 @@
|
||||
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
|
||||
},
|
||||
"el-table-infinite-scroll": {
|
||||
"vue": "$vue"
|
||||
"vue": "^3.5.18"
|
||||
},
|
||||
"storybook": "$storybook"
|
||||
},
|
||||
|
||||
@@ -254,6 +254,7 @@ The table below lists common Pebble expressions and functions.
|
||||
| `{{ "apple" \| upper \| abbreviate(3) }}` | Chains multiple filters together. |
|
||||
| `{{ now(timeZone='Europe/Paris') }}` | Returns the current datetime in a specific timezone. |
|
||||
| `{{ now(format='sql_milli') }}` | Returns the current datetime in a specific format. |
|
||||
| `{{ 1378653552123 \| date(format="iso_milli", timeZone="Europe/Paris") }}` | Format a given timestamp to datetime in a specific format and timezone. |
|
||||
| `{% macro input(type='text', name, value) %} ... {% endmacro %}` | Macro with default argument values. |
|
||||
| `{# THIS IS A COMMENT #}` | Adds a comment that won't appear in the output. |
|
||||
| `{{ foo.bar }}` | Accesses a child attribute of a variable. |
|
||||
|
||||
@@ -163,16 +163,11 @@
|
||||
},
|
||||
getTabClasses(tab) {
|
||||
const isEnterpriseTab = tab.locked;
|
||||
const isGanttTab = tab.name === "gantt";
|
||||
const ROUTES = ["/flows/edit/", "/namespaces/edit/"];
|
||||
const EDIT_ROUTES = ROUTES.some(route => this.$route.path.startsWith(route));
|
||||
const isOverviewTab = EDIT_ROUTES && tab.title === "Overview";
|
||||
|
||||
return {
|
||||
"container": !isEnterpriseTab && !isOverviewTab,
|
||||
"mt-4": !isEnterpriseTab && !isOverviewTab,
|
||||
"px-0": isEnterpriseTab && isOverviewTab,
|
||||
"gantt-container": isGanttTab
|
||||
"container": !isEnterpriseTab,
|
||||
"mt-4": !isEnterpriseTab,
|
||||
"px-0": isEnterpriseTab,
|
||||
};
|
||||
},
|
||||
},
|
||||
|
||||
@@ -339,6 +339,12 @@
|
||||
}
|
||||
});
|
||||
},
|
||||
triggerLoadDataAfterBulkEditAction() {
|
||||
this.loadData();
|
||||
setTimeout(() => this.loadData(), 200);
|
||||
setTimeout(() => this.loadData(), 1000);
|
||||
setTimeout(() => this.loadData(), 5000);
|
||||
},
|
||||
async unlock() {
|
||||
const namespace = this.triggerToUnlock.namespace;
|
||||
const flowId = this.triggerToUnlock.flowId;
|
||||
@@ -391,10 +397,10 @@
|
||||
this.triggerStore.update({...trigger, disabled: !value})
|
||||
.then(updatedTrigger => {
|
||||
this.triggers = this.triggers.map(t => {
|
||||
const triggerContextMatches = t.triggerContext &&
|
||||
const triggerContextMatches = t.triggerContext &&
|
||||
t.triggerContext.flowId === updatedTrigger.flowId &&
|
||||
t.triggerContext.triggerId === updatedTrigger.triggerId;
|
||||
|
||||
|
||||
if (triggerContextMatches) {
|
||||
return {triggerContext: updatedTrigger, abstractTrigger: t.abstractTrigger};
|
||||
}
|
||||
@@ -404,7 +410,7 @@
|
||||
},
|
||||
genericConfirmAction(toast, queryAction, byIdAction, success, data) {
|
||||
this.$toast().confirm(
|
||||
this.$t(toast, {"count": this.queryBulkAction ? this.total : this.selection.length}),
|
||||
this.$t(toast, {"count": this.queryBulkAction ? this.total : this.selection.length}) + ". " + this.$t("bulk action async warning"),
|
||||
() => this.genericConfirmCallback(queryAction, byIdAction, success, data),
|
||||
() => {
|
||||
}
|
||||
@@ -432,7 +438,7 @@
|
||||
.then(data => {
|
||||
this.$toast().success(this.$t(success, {count: data.count}));
|
||||
this.toggleAllUnselected();
|
||||
this.loadData();
|
||||
this.triggerLoadDataAfterBulkEditAction();
|
||||
})
|
||||
} else {
|
||||
const selection = this.selection;
|
||||
@@ -442,7 +448,7 @@
|
||||
.then(data => {
|
||||
this.$toast().success(this.$t(success, {count: data.count}));
|
||||
this.toggleAllUnselected();
|
||||
this.loadData();
|
||||
this.triggerLoadDataAfterBulkEditAction();
|
||||
}).catch(e => {
|
||||
this.$toast().error(e?.invalids.map(exec => {
|
||||
return {message: this.$t(exec.message, {triggers: exec.invalidValue})}
|
||||
|
||||
89
ui/src/components/ai/AITriggerButton.vue
Normal file
89
ui/src/components/ai/AITriggerButton.vue
Normal file
@@ -0,0 +1,89 @@
|
||||
<template>
|
||||
<div class="ai-trigger-box" v-if="show">
|
||||
<el-button
|
||||
v-if="enabled && !opened"
|
||||
class="ai-trigger-button"
|
||||
:icon="AiIcon"
|
||||
@click="handleClick"
|
||||
>
|
||||
{{ t("ai.flow.title") }}
|
||||
</el-button>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {useI18n} from "vue-i18n";
|
||||
import AiIcon from "./AiIcon.vue";
|
||||
|
||||
interface AITriggerButtonProps {
|
||||
show: boolean;
|
||||
enabled: boolean;
|
||||
opened: boolean;
|
||||
}
|
||||
|
||||
interface AITriggerButtonEmits {
|
||||
(event: "click"): void;
|
||||
}
|
||||
|
||||
const {t} = useI18n();
|
||||
|
||||
withDefaults(defineProps<AITriggerButtonProps>(), {
|
||||
show: false,
|
||||
enabled: false,
|
||||
opened: false,
|
||||
});
|
||||
|
||||
const emit = defineEmits<AITriggerButtonEmits>();
|
||||
|
||||
function handleClick(): void {
|
||||
emit("click");
|
||||
}
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
.ai-trigger-box {
|
||||
--border-angle: 0turn;
|
||||
--main-bg: conic-gradient(from calc(var(--border-angle) + 50.37deg) at 50% 50%, #3991FF 0deg, #8C4BFF 124.62deg, #A396FF 205.96deg, #3991FF 299.42deg, #E0E0FF 342.69deg, #3991FF 360deg);
|
||||
--gradient-border: conic-gradient(from calc(var(--border-angle) + 50.37deg) at 50% 50%, #3991FF 0deg, #8C4BFF 124.62deg, #A396FF 205.96deg, #3991FF 299.42deg, #E0E0FF 342.69deg, #3991FF 360deg);
|
||||
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: end;
|
||||
gap: 0.5rem;
|
||||
margin-top: 0.5rem;
|
||||
border: solid 1px transparent;
|
||||
border-radius: 3rem;
|
||||
background:
|
||||
var(--main-bg) padding-box,
|
||||
var(--gradient-border) border-box,
|
||||
var(--main-bg) border-box;
|
||||
|
||||
background-position: center center;
|
||||
animation: bg-spin 3s linear infinite;
|
||||
|
||||
@keyframes bg-spin {
|
||||
to {
|
||||
--border-angle: 1turn;
|
||||
}
|
||||
}
|
||||
|
||||
.ai-trigger-button {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 6px;
|
||||
background-color: var(--ks-button-background-secondary);
|
||||
color: var(--ks-content-primary);
|
||||
box-shadow: 0px 4px 4px 0px #00000040;
|
||||
font-size: 12px;
|
||||
font-weight: 700;
|
||||
border: none;
|
||||
border-radius: 3rem;
|
||||
}
|
||||
}
|
||||
|
||||
@property --border-angle {
|
||||
syntax: "<angle>";
|
||||
inherits: true;
|
||||
initial-value: 0turn;
|
||||
}
|
||||
</style>
|
||||
@@ -5,10 +5,7 @@
|
||||
<style scoped lang="scss">
|
||||
.icon {
|
||||
height: 20px;
|
||||
width: 20px;
|
||||
display: inline-flex;
|
||||
align-self: center;
|
||||
justify-self: center;
|
||||
min-width: 20px;
|
||||
background: center url("../../assets/icons/ai-agent.svg#file");
|
||||
|
||||
html.light & {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<template>
|
||||
<div @click="handleClick" class="d-flex my-2 p-2 rounded element" :class="{'moved': moved}">
|
||||
<div class="me-2 icon">
|
||||
<div v-if="props.parentPathComplete !== 'inputs'" class="me-2 icon">
|
||||
<TaskIcon :cls="element.type" :icons only-icon />
|
||||
</div>
|
||||
|
||||
@@ -85,6 +85,7 @@
|
||||
|
||||
<style scoped lang="scss">
|
||||
@import "../../styles/code.scss";
|
||||
@import "@kestra-io/ui-libs/src/scss/_color-palette";
|
||||
|
||||
.element {
|
||||
cursor: pointer;
|
||||
@@ -107,7 +108,8 @@
|
||||
}
|
||||
|
||||
.playground-run-task{
|
||||
background-color: blue;
|
||||
color: $base-white;
|
||||
background-color: $base-blue-400;
|
||||
height: 16px;
|
||||
width: 16px;
|
||||
font-size: 4px;
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {onMounted, computed, inject, ref, provide} from "vue";
|
||||
import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import {useStore} from "vuex";
|
||||
import {usePluginsStore} from "../../../stores/plugins";
|
||||
@@ -73,6 +73,10 @@
|
||||
return !complexObject
|
||||
}
|
||||
|
||||
onActivated(() => {
|
||||
pluginsStore.updateDocumentation();
|
||||
});
|
||||
|
||||
function onTaskUpdateField(key: string, val: any) {
|
||||
const realValue = val === null || val === undefined ? undefined :
|
||||
// allow array to be created with null values (specifically for metadata)
|
||||
@@ -160,11 +164,8 @@
|
||||
task: parsedFlow.value,
|
||||
})
|
||||
|
||||
|
||||
const fieldsFromSchemaTop = computed(() => MAIN_KEYS.map(key => getFieldFromKey(key, "main")))
|
||||
|
||||
|
||||
|
||||
const fieldsFromSchemaRest = computed(() => {
|
||||
return Object.keys(pluginsStore.flowRootProperties ?? {})
|
||||
.filter((key) => !MAIN_KEYS.includes(key) && !HIDDEN_FIELDS.includes(key))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
<template>
|
||||
<Header v-if="header" :dashboard />
|
||||
|
||||
<section id="filter">
|
||||
<section id="filter" :class="{filterPadding: padding}">
|
||||
<KestraFilter
|
||||
:prefix="`dashboard__${dashboard.id}`"
|
||||
:language
|
||||
@@ -14,11 +14,11 @@
|
||||
/>
|
||||
</section>
|
||||
|
||||
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" padding />
|
||||
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed, onBeforeMount, ref} from "vue";
|
||||
import {computed, onBeforeMount, ref, watch} from "vue";
|
||||
|
||||
import type {Dashboard, Chart} from "./composables/useDashboards";
|
||||
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
|
||||
@@ -60,6 +60,8 @@
|
||||
isNamespace: {type: Boolean, default: false},
|
||||
});
|
||||
|
||||
const padding = computed(() => !props.isFlow && !props.isNamespace);
|
||||
|
||||
const dashboard = ref<Dashboard>({id: "", charts: []});
|
||||
const charts = ref<Chart[]>([]);
|
||||
|
||||
@@ -102,12 +104,14 @@
|
||||
if (props.isFlow && ID === "default") load("default", processFlowYaml(YAML_FLOW, route.params.namespace as string, route.params.id as string));
|
||||
else if (props.isNamespace && ID === "default") load("default", YAML_NAMESPACE);
|
||||
});
|
||||
|
||||
watch(route, async (_) => refreshCharts());
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
@import "@kestra-io/ui-libs/src/scss/variables";
|
||||
|
||||
section#filter {
|
||||
.filterPadding {
|
||||
margin: 2rem 0.25rem 0;
|
||||
padding: 0 2rem;
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ export function useChartGenerator(props: {chart: Chart; filters: string[]; showD
|
||||
const data = ref();
|
||||
const generate = async (id: string, pagination?: { pageNumber: number; pageSize: number }) => {
|
||||
const filters = props.filters.concat(decodeSearchParams(route.query, undefined, []) ?? []);
|
||||
const parameters: Parameters = {...(pagination ?? {}), ...(filters ?? {})};
|
||||
const parameters: Parameters = {...(pagination ?? {}), filters: (filters ?? {})};
|
||||
|
||||
if (!props.showDefault) {
|
||||
data.value = await dashboardStore.generate(id, props.chart.id, parameters);
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
v-bind="props"
|
||||
:title="props.data.flowId"
|
||||
:state="props.data.state"
|
||||
:icon-component="iconVNode"
|
||||
@expand-dependencies="expand"
|
||||
@mouseover="onMouseOver"
|
||||
@mouseleave="onMouseLeave"
|
||||
@@ -35,7 +36,7 @@
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch} from "vue";
|
||||
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch, h, computed} from "vue";
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
import {
|
||||
VueFlow,
|
||||
@@ -51,6 +52,20 @@
|
||||
import {cssVariable} from "@kestra-io/ui-libs";
|
||||
import BasicNode from "@kestra-io/ui-libs/src/components/nodes/BasicNode.vue";
|
||||
|
||||
import TaskIcon from "@kestra-io/ui-libs/src/components/misc/TaskIcon.vue";
|
||||
const icon = computed(() => {
|
||||
const GRAY = "#2f3342";
|
||||
|
||||
return window.btoa(`
|
||||
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="25" viewBox="0 0 24 25" fill="none">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd"
|
||||
d="M4.34546 9.63757C4.74074 10.5277 5.31782 11.3221 6.03835 11.9681L7.03434 10.8209C6.4739 10.3185 6.02504 9.70059 5.71758 9.00824C5.41012 8.3159 5.25111 7.56496 5.25111 6.80532C5.25111 6.04568 5.41012 5.29475 5.71758 4.6024C6.02504 3.91006 6.4739 3.29216 7.03434 2.78977L6.03835 1.64258C5.31782 2.28851 4.74074 3.08293 4.34546 3.97307C3.95019 4.86321 3.74575 5.82867 3.74575 6.80532C3.74575 7.78197 3.95019 8.74744 4.34546 9.63757ZM16.955 4.38931C17.4802 3.97411 18.1261 3.74777 18.7913 3.74576C19.5894 3.74576 20.3547 4.06807 20.919 4.64177C21.4833 5.21548 21.8004 5.9936 21.8004 6.80494C21.8004 7.61628 21.4833 8.3944 20.919 8.96811C20.3547 9.54181 19.5894 9.86412 18.7913 9.86412C18.2559 9.86126 17.7312 9.71144 17.2725 9.43048L12.3325 14.4529L11.2688 13.3715L16.2088 8.34906C16.0668 8.10583 15.9592 7.84348 15.8891 7.56973H11.2688V6.04014H15.8891C16.055 5.38511 16.4298 4.80451 16.955 4.38931ZM17.9555 8.07674C18.2029 8.24482 18.4938 8.33453 18.7913 8.33453C19.1902 8.33412 19.5727 8.17284 19.8548 7.88607C20.1368 7.59931 20.2955 7.21049 20.2959 6.80494C20.2959 6.50241 20.2076 6.20668 20.0423 5.95514C19.877 5.70361 19.642 5.50756 19.3671 5.39178C19.0922 5.27601 18.7897 5.24572 18.4978 5.30474C18.206 5.36376 17.9379 5.50944 17.7275 5.72336C17.5171 5.93727 17.3738 6.20982 17.3157 6.50653C17.2577 6.80324 17.2875 7.11079 17.4014 7.39029C17.5152 7.66978 17.7081 7.90867 17.9555 8.07674ZM3.74621 15.2177V16.7473H7.19606L2.2417 21.7842L3.30539 22.8656L8.25975 17.8287V21.336H9.76427V15.2177H3.74621ZM15.7823 18.2769H12.7733V19.8064H15.7823V22.1008H21.8004V15.9825H15.7823V18.2769ZM17.2868 20.5712V17.5121H20.2959V20.5712H17.2868ZM8.02885 9.67292C7.62863 9.31407 7.30809 8.87275 7.08853 8.37827C6.86897 7.88378 6.75542 7.34747 6.75542 6.80494C6.75542 6.26241 6.86897 5.72609 7.08853 5.23161C7.30809 4.73713 7.62863 4.29581 8.02885 3.93696L9.02484 5.08415C8.78458 5.29946 8.59215 5.5643 8.46034 5.86106C8.32853 6.15782 8.26035 6.47971 8.26035 6.80532C8.26035 7.13094 8.32853 7.45282 8.46034 7.74958C8.59215 8.04634 8.78458 8.31118 9.02484 8.52649L8.02885 9.67292Z"
|
||||
fill="${GRAY}" />
|
||||
</svg>
|
||||
`);
|
||||
});
|
||||
const iconVNode = h(TaskIcon, {customIcon: {icon: icon.value}});
|
||||
|
||||
import {apiUrl} from "override/utils/route";
|
||||
|
||||
import {linkedElements} from "../../utils/vueFlow";
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
<template>
|
||||
<div class="execution-pending">
|
||||
<EmptyTemplate class="queued">
|
||||
<img src="../../assets/queued_visual.svg" alt="Queued Execution">
|
||||
<h5 class="mt-4 fw-bold">
|
||||
{{ $t('execution_status') }}
|
||||
<span
|
||||
class="ms-2 px-2 py-1 rounded fs-7 fw-normal"
|
||||
:style="getStyle(execution.state.current)"
|
||||
>
|
||||
{{ execution.state.current }}
|
||||
</span>
|
||||
</h5>
|
||||
<p class="mt-4 mb-0">
|
||||
{{ $t('no_tasks_running') }}
|
||||
</p>
|
||||
<p>
|
||||
{{ $t('execution_starts_progress') }}
|
||||
</p>
|
||||
</EmptyTemplate>
|
||||
</div>
|
||||
<EmptyTemplate class="queued">
|
||||
<img src="../../assets/queued_visual.svg" alt="Queued Execution">
|
||||
<h5 class="mt-4 fw-bold">
|
||||
{{ $t('execution_status') }}
|
||||
<span
|
||||
class="ms-2 px-2 py-1 rounded fs-7 fw-normal"
|
||||
:style="getStyle(execution.state.current)"
|
||||
>
|
||||
{{ execution.state.current }}
|
||||
</span>
|
||||
</h5>
|
||||
<p class="mt-4 mb-0">
|
||||
{{ $t('no_tasks_running') }}
|
||||
</p>
|
||||
<p>
|
||||
{{ $t('execution_starts_progress') }}
|
||||
</p>
|
||||
</EmptyTemplate>
|
||||
</template>
|
||||
|
||||
<script setup>
|
||||
|
||||
@@ -80,13 +80,6 @@
|
||||
this.executionsStore.followExecution(this.$route.params, this.$t);
|
||||
},
|
||||
getTabs() {
|
||||
|
||||
},
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapStores(useCoreStore, useExecutionsStore),
|
||||
tabs() {
|
||||
return [
|
||||
{
|
||||
name: undefined,
|
||||
@@ -135,6 +128,13 @@
|
||||
locked: true
|
||||
}
|
||||
];
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
...mapStores(useCoreStore, useExecutionsStore),
|
||||
tabs() {
|
||||
return this.getTabs();
|
||||
},
|
||||
routeInfo() {
|
||||
const ns = this.$route.params.namespace;
|
||||
|
||||
@@ -260,7 +260,7 @@
|
||||
class-name="shrink"
|
||||
>
|
||||
<template #default="scope">
|
||||
<code>{{ scope.row.flowRevision }}</code>
|
||||
<code class="code-text">{{ scope.row.flowRevision }}</code>
|
||||
</template>
|
||||
</el-table-column>
|
||||
|
||||
@@ -293,7 +293,7 @@
|
||||
</el-tooltip>
|
||||
</template>
|
||||
<template #default="scope">
|
||||
<code>
|
||||
<code class="code-text">
|
||||
{{ scope.row.taskRunList?.slice(-1)[0].taskId }}
|
||||
{{
|
||||
scope.row.taskRunList?.slice(-1)[0].attempts?.length > 1 ? `(${scope.row.taskRunList?.slice(-1)[0].attempts.length})` : ""
|
||||
@@ -625,9 +625,6 @@
|
||||
}
|
||||
this.displayColumns = localStorage.getItem("columns_executions")?.split(",")
|
||||
|| this.optionalColumns.filter(col => col.default).map(col => col.prop);
|
||||
if (this.isConcurrency) {
|
||||
this.emitStateCount([State.RUNNING, State.PAUSED])
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
...mapState("auth", ["user"]),
|
||||
@@ -796,6 +793,11 @@
|
||||
queryFilter["filters[flowId][EQUALS]"] = this.flowId;
|
||||
}
|
||||
|
||||
const hasStateFilters = Object.keys(queryFilter).some(key => key.startsWith("filters[state]")) || queryFilter.state;
|
||||
if (!hasStateFilters && this.statuses?.length > 0) {
|
||||
queryFilter["filters[state][IN]"] = this.statuses.join(",");
|
||||
}
|
||||
|
||||
return _merge(base, queryFilter)
|
||||
},
|
||||
loadData(callback) {
|
||||
@@ -806,7 +808,11 @@
|
||||
page: parseInt(this.$route.query.page || this.internalPageNumber),
|
||||
sort: this.$route.query.sort || "state.startDate:desc",
|
||||
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
|
||||
})).finally(callback);
|
||||
})).then(() => {
|
||||
if (this.isConcurrency) {
|
||||
this.emitStateCount();
|
||||
}
|
||||
}).finally(callback);
|
||||
},
|
||||
durationFrom(item) {
|
||||
return (+new Date() - new Date(item.state.startDate).getTime()) / 1000
|
||||
@@ -1070,15 +1076,12 @@
|
||||
}
|
||||
})
|
||||
},
|
||||
emitStateCount(states) {
|
||||
this.executionsStore.findExecutions(this.loadQuery({
|
||||
size: parseInt(this.$route.query.size || this.internalPageSize),
|
||||
page: parseInt(this.$route.query.page || this.internalPageNumber),
|
||||
sort: this.$route.query.sort || "state.startDate:desc",
|
||||
state: states
|
||||
})).then(() => {
|
||||
this.$emit("state-count", this.executionsStore.total);
|
||||
});
|
||||
emitStateCount() {
|
||||
const runningCount = this.executionsStore.executions.filter(execution =>
|
||||
execution.state.current === State.RUNNING
|
||||
)?.length;
|
||||
const totalCount = this.executionsStore.total;
|
||||
this.$emit("state-count", {runningCount, totalCount});
|
||||
}
|
||||
},
|
||||
watch: {
|
||||
@@ -1119,6 +1122,9 @@
|
||||
color: #ffb703;
|
||||
}
|
||||
}
|
||||
.code-text {
|
||||
color: var(--ks-content-primary);
|
||||
}
|
||||
</style>
|
||||
|
||||
<style lang="scss">
|
||||
|
||||
@@ -45,7 +45,7 @@
|
||||
</el-tooltip>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-button-group class="min-w-auto">
|
||||
<el-button-group class="ks-b-group">
|
||||
<restart :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
|
||||
<el-button @click="downloadContent()">
|
||||
<kicon :tooltip="$t('download logs')">
|
||||
@@ -60,7 +60,7 @@
|
||||
</el-button-group>
|
||||
</el-form-item>
|
||||
<el-form-item>
|
||||
<el-button-group class="min-w-auto">
|
||||
<el-button-group class="ks-b-group">
|
||||
<el-button @click="loadLogs()">
|
||||
<kicon :tooltip="$t('refresh')">
|
||||
<refresh />
|
||||
@@ -361,4 +361,9 @@
|
||||
align-items: flex-start;
|
||||
}
|
||||
}
|
||||
|
||||
.ks-b-group {
|
||||
min-width: auto!important;
|
||||
max-width: max-content !important;
|
||||
}
|
||||
</style>
|
||||
@@ -37,13 +37,14 @@
|
||||
</div>
|
||||
|
||||
<div class="d-flex flex-column p-3 debug">
|
||||
<editor
|
||||
<Editor
|
||||
ref="debugEditor"
|
||||
:full-height="false"
|
||||
:custom-height="20"
|
||||
:input="true"
|
||||
:navbar="false"
|
||||
:model-value="computedDebugValue"
|
||||
@update:model-value="editorValue = $event"
|
||||
@confirm="onDebugExpression($event)"
|
||||
class="w-100"
|
||||
/>
|
||||
@@ -53,7 +54,7 @@
|
||||
:icon="Refresh"
|
||||
@click="
|
||||
onDebugExpression(
|
||||
debugEditor.editor.getValue(),
|
||||
editorValue.length > 0 ? editorValue : computedDebugValue,
|
||||
)
|
||||
"
|
||||
class="mt-3"
|
||||
@@ -61,7 +62,7 @@
|
||||
{{ $t("eval.render") }}
|
||||
</el-button>
|
||||
|
||||
<editor
|
||||
<Editor
|
||||
v-if="debugExpression"
|
||||
:read-only="true"
|
||||
:input="true"
|
||||
@@ -98,7 +99,7 @@
|
||||
|
||||
<VarValue
|
||||
v-if="selectedValue && displayVarValue()"
|
||||
:value="selectedValue.uri ? selectedValue.uri : selectedValue"
|
||||
:value="selectedValue?.uri ? selectedValue?.uri : selectedValue"
|
||||
:execution="execution"
|
||||
/>
|
||||
</div>
|
||||
@@ -129,8 +130,9 @@
|
||||
}>();
|
||||
|
||||
const cascader = ref<any>(null);
|
||||
const debugEditor = ref<any>(null);
|
||||
const debugEditor = ref<InstanceType<typeof Editor>>();
|
||||
const selected = ref<string[]>([]);
|
||||
const editorValue = ref("");
|
||||
const debugExpression = ref("");
|
||||
const debugError = ref("");
|
||||
const debugStackTrace = ref("");
|
||||
@@ -425,4 +427,4 @@
|
||||
font-size: var(--el-font-size-base);
|
||||
}
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
@@ -80,6 +80,7 @@
|
||||
:input="true"
|
||||
:navbar="false"
|
||||
:model-value="computedDebugValue"
|
||||
@update:model-value="editorValue = $event"
|
||||
@confirm="onDebugExpression($event)"
|
||||
class="w-100"
|
||||
/>
|
||||
@@ -88,8 +89,9 @@
|
||||
type="primary"
|
||||
@click="
|
||||
onDebugExpression(
|
||||
debugEditor.editor.getValue(),
|
||||
editorValue.length > 0 ? editorValue : computedDebugValue,
|
||||
)
|
||||
|
||||
"
|
||||
class="mt-3"
|
||||
>
|
||||
@@ -163,8 +165,9 @@
|
||||
import CopyToClipboard from "../../layout/CopyToClipboard.vue";
|
||||
|
||||
import Editor from "../../inputs/Editor.vue";
|
||||
const editorValue = ref("");
|
||||
const debugCollapse = ref("");
|
||||
const debugEditor = ref(null);
|
||||
const debugEditor = ref<InstanceType<typeof Editor>>();
|
||||
const debugExpression = ref("");
|
||||
const computedDebugValue = computed(() => {
|
||||
const formatTask = (task) => {
|
||||
@@ -422,7 +425,7 @@
|
||||
const displayVarValue = () =>
|
||||
isFile(selectedValue.value) ||
|
||||
selectedValue.value !== debugExpression.value;
|
||||
|
||||
|
||||
const leftWidth = ref(70);
|
||||
const startDragging = (event: MouseEvent) => {
|
||||
const startX = event.clientX;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<template>
|
||||
<template v-if="flow.concurrency">
|
||||
<div v-if="runningCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
|
||||
<div v-if="totalCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
|
||||
<el-card class="mb-3">
|
||||
<div class="row mb-3">
|
||||
<span class="col d-flex align-items-center">
|
||||
@@ -50,13 +50,20 @@
|
||||
data() {
|
||||
return {
|
||||
runningCount: 0,
|
||||
totalCount: 0,
|
||||
runningCountSet: false,
|
||||
}
|
||||
},
|
||||
methods: {
|
||||
setRunningCount(count) {
|
||||
this.runningCount = count
|
||||
this.runningCountSet = true
|
||||
if (typeof count === "object") {
|
||||
this.runningCount = count.runningCount;
|
||||
this.totalCount = count.totalCount;
|
||||
} else {
|
||||
this.runningCount = count;
|
||||
this.totalCount = count;
|
||||
}
|
||||
this.runningCountSet = true;
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
@@ -89,4 +96,8 @@
|
||||
border-radius: var(--bs-border-radius);
|
||||
}
|
||||
}
|
||||
|
||||
:deep(.el-card) {
|
||||
background-color: var(--ks-background-panel);
|
||||
}
|
||||
</style>
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user